Source code for convert_pt3

import sys
import os
import struct
import codecs

import numpy as np

# import progressbar
from PyQt5.QtWidgets import QDialog, QFileDialog, QMessageBox
from PyQt5 import uic
import h5py

import file_manager as fm
from my_logger import setup_logger

[docs] logger = setup_logger(__name__)
[docs] convert_pt3_dialog_file = fm.path(name="convert_pt3_dialog.ui", file_type=fm.Type.UI)
UI_Convert_Pt3_Dialog, _ = uic.loadUiType(convert_pt3_dialog_file)
[docs] class Pt3Reader: def __init__(self, pt3_file_path): self._file_path = pt3_file_path file_size = os.stat(pt3_file_path).st_size with open(self._file_path, "rb") as f: def fr(num_bytes: int = 1): read_bytes = f.read(num_bytes) if read_bytes != "": return read_bytes else: return False def fr_str(num_chars: int = 1): str_bytes = fr(num_chars) if str_bytes: converted_string = str() for char_byte in str_bytes: if char_byte != 0: converted_string += codecs.decode(char_byte.to_bytes(1, "little"), "ascii") return converted_string else: return False def fr_int32(): int_bytes = fr(4) if int_bytes: return int.from_bytes(int_bytes, byteorder="little", signed=True) else: return False def fr_uint32(): uint_bytes = fr(4) if uint_bytes: return int.from_bytes(uint_bytes, byteorder="little", signed=False) else: return False def fr_float(): float_bytes = fr(4) if float_bytes: return struct.unpack("<f", float_bytes)[0] else: return False self.ident = fr_str(16) self.format_version = fr_str(6) self.creator_name = fr_str(18) self.creator_version = fr_str(12) self.file_time = fr_str(18) self.crlf = fr_str(2) self.comment_field = fr_str(256) self.curves = fr_int32() self.bits_per_record = fr_int32() self.num_routing_channels = fr_int32() self.number_of_boards = fr_int32() self.active_curve = fr_int32() self.measurement_mode = fr_int32() self.sub_mode = fr_int32() self.range_no = fr_int32() self.offset = fr_int32() self.acquisition_time = fr_int32() self.stop_at = fr_int32() self.stop_on_overflow = fr_int32() self.restart = fr_int32() self.display_lin_log = fr_int32() self.display_time_from = fr_int32() self.display_time_to = fr_int32() self.display_count_from = fr_int32() self.display_cout_to = fr_int32() self.display_curve_map_to = [None] * 8 self.display_curve_show = [None] * 8 for i in range(8): self.display_curve_map_to[i] = fr_int32() self.display_curve_show[i] = fr_int32() self.parameter_start = [None] * 3 self.parameter_step = [None] * 3 self.parameter_end = [None] * 3 for i in range(3): self.parameter_start[i] = fr_float() self.parameter_step[i] = fr_float() self.parameter_end[i] = fr_float() self.repeat_mode = fr_int32() self.repeats_per_curve = fr_int32() self.repeat_time = fr_int32() self.repeat_wait = fr_int32() self.script_name = fr_str(20) self.hardware_ident = fr_str(16) self.hardware_version = fr_str(8) self.hardware_serial = fr_int32() self.sync_devider = fr_int32() self.cfd_zero_cross_0 = fr_int32() self.cfd_level_0 = fr_int32() self.cfd_zero_cross_1 = fr_int32() self.cfd_level_1 = fr_int32() self.resolution = fr_float() self.router_mode_code = fr_int32() self.router_enabled = fr_int32() class RouterChannel: input_type = None input_level = None input_edge = None cfd_present = None cfd_level = None cfd_zero_cross = None self.channels_info = [RouterChannel()] * self.num_routing_channels for channel in self.channels_info: channel.input_type = fr_int32() channel.input_level = fr_int32() channel.input_edge = fr_int32() channel.cfd_present = fr_int32() channel.cfd_level = fr_int32() channel.cfd_zero_cross = fr_int32() self.external_devices = fr_int32() self.reserved_1 = fr_int32() self.reserved_2 = fr_int32() self.count_rate_0 = fr_int32() self.count_rate_1 = fr_int32() self.stop_after = fr_int32() self.stop_reason = fr_int32() self.num_records = fr_uint32() self.image_header_size = fr_int32() self.image_header = [fr_int32() for _ in range(self.image_header_size)] self._overflow_time = 0 # self._counter_1 = 0 # self._counter_2 = 0 # self._counter_3 = 0 # self._counter_4 = 0 # self._counter_err = 0 self._wrap_around = 65536 self._sync_period = 1e9 / self.count_rate_0 # ns self._delay_times = np.zeros(self.num_records) class Records: def __init__(self, num_records: int): self.count = 0 self.micro_times = np.empty(num_records, dtype=np.uint16) * np.nan self.macro_times = np.empty(num_records, dtype=np.uint16) * np.nan def trim_empty(self): if self.count == 0: self.micro_times = None self.macro_times = None else: self.micro_times = np.delete(self.micro_times, np.s_[self.count :]) self.macro_times = np.delete(self.macro_times, np.s_[self.count :]) self.channel_records = [Records(self.num_records) for _ in range(self.num_routing_channels)] overflow = 0 bar = False if "--dev" in sys.argv: # bar = progressbar.ProgressBar(max_value=file_size) pass for i in range(self.num_records): # +---------------+ +---------------+ +---------------+ +---------------+ # |x|x|x|x|x|x|x|x| |x|x|x|x|x|x|x|x| |x|x|x|x|x|x|x|x| |x|x|x|x|x|x|x|x| # +---------------+ +---------------+ +---------------+ +---------------+ t3_record = fr_uint32() if t3_record is False: if bar: bar.update(f.tell()) break # print('{0:32b}'.format(t3_record)) # +---------------+ +---------------+ +---------------+ +---------------+ # |x|x|x|x| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | # +---------------+ +---------------+ +---------------+ +---------------+ channel = t3_record >> (32 - 4) # -1 # Not sure about the -1? # print('{0:32b}'.format(t3_record >> (32 - 4))) if channel != 15: # Valid record, not overflow c_r = self.channel_records[channel - 1] # +---------------+ +---------------+ +---------------+ +---------------+ # | | | | | | | | | | | | | | | | | | |x|x|x|x|x|x|x|x| |x|x|x|x|x|x|x|x| # +---------------+ +---------------+ +---------------+ +---------------+ nsync = t3_record & int("11111111" * 2, 2) # print('{0:32b}'.format(t3_record & int('11111111'*2, 2))) # +---------------+ +---------------+ +---------------+ +---------------+ # | | | | |x|x|x|x| |x|x|x|x|x|x|x|x| | | | | | | | | | | | | | | | | | | # +---------------+ +---------------+ +---------------+ +---------------+ dtime = t3_record >> 16 & int("00001111" + "11111111", 2) # print('{0:32b}'.format(t3_record >> 16 & int('00001111' + '11111111', 2))) c_r.micro_times[c_r.count] = dtime * self.resolution true_sync = overflow + nsync macro_time = (true_sync * self._sync_period) + c_r.micro_times[c_r.count] c_r.macro_times[c_r.count] = macro_time c_r.count += 1 else: # +---------------+ +---------------+ +---------------+ +---------------+ # | | | | | | | | | | | | | |x|x|x|x| | | | | | | | | | | | | | | | | | | # +---------------+ +---------------+ +---------------+ +---------------+ markers = t3_record >> 16 & int("00000000" + "00001111", 2) # print('{0:32b}'.format(t3_record >> 16 & int('00000000' + '00001111', 2))) if markers == int("0000", 2): # then this is a overflow record overflow += self._wrap_around if bar: bar.update(f.tell()) if bar: bar.finish() for channel in self.channel_records: channel.trim_empty()
[docs] def add_particle_to_h5(self, h5_file): pass
[docs] class ConvertPt3Dialog(QDialog, UI_Convert_Pt3_Dialog): def __init__(self, mainwindow): QDialog.__init__(self) UI_Convert_Pt3_Dialog.__init__(self) self.setupUi(self) self.mainwindow = mainwindow self.parent = mainwindow self.btnSourceFolder.clicked.connect(self.set_source_folder) self.btnExportFile.clicked.connect(self.set_export_file) self.btnConvert.clicked.connect(self.convert) self.cbxHasSpectra.stateChanged.connect(self.change_spectra_edt) self.edtFileNames_pt3.textChanged.connect(self.check_ready) self.edtFileNames_Spectra.textChanged.connect(self.check_ready) self._source_path = None self._export_path = None
[docs] def set_source_folder(self): f_dir = QFileDialog.getExistingDirectory(self) if f_dir != ("", ""): self._source_path = f_dir display_path = f_dir if len(f_dir) > 48: display_path = f_dir[:22] + "..." + f_dir[-22:] self.edtSourceFolder.setText(display_path) self.check_ready()
[docs] def set_export_file(self): f_dir, _ = QFileDialog.getSaveFileName(filter="HDF5 file (*.h5)") if f_dir != ("", ""): if not os.path.exists(f_dir): self._export_path = f_dir display_path = f_dir if len(f_dir) > 48: display_path = f_dir[:22] + "..." + f_dir[-22:] self.edtExportFile.setText(display_path) else: msg = QMessageBox( QMessageBox.Warning, "Convert pt3", "File exists!", buttons=QMessageBox.Ok, parent=self, ) msg.exec_() self.check_ready()
[docs] def change_spectra_edt(self): has_spectra = self.cbxHasSpectra.isChecked() self.edtFileNames_Spectra.setEnabled(has_spectra) self.spbExposure.setEnabled(has_spectra)
[docs] def check_ready(self): is_ready = False if self._export_path and self._source_path: pt3_names = self.edtFileNames_pt3.text() spectra_names = self.edtFileNames_Spectra.text() if pt3_names != "" and spectra_names != "": is_ready = True self.btnConvert.setEnabled(is_ready)
[docs] def convert(self): self.setEnabled(False) all_source_files = [ f for f in os.listdir(self._source_path) if os.path.isfile(os.path.join(self._source_path, f)) ] pt3_f_name = self.edtFileNames_pt3.text() pt3_fs = [file for file in all_source_files if file.startswith(pt3_f_name) and file.endswith(".pt3")] pt3_fs.sort() num_files = len(pt3_fs) all_okay = False if all([file[-4:] == ".pt3" for file in pt3_fs]): pt3_nums = [file[len(pt3_f_name) : file.find(".")] for file in pt3_fs] if all([num.isalnum() for num in pt3_nums]): pt3_nums = [int(num) for num in pt3_nums] pt3_nums.sort() all_okay = True spec_fs = None add_spec = self.cbxHasSpectra.isChecked() if add_spec: all_okay = False spec_file_name = self.edtFileNames_Spectra.text() spec_fs = [file for file in all_source_files if file.startswith(spec_file_name)] spec_fs.sort() if len(pt3_fs) == len(spec_fs): spec_file_ext = spec_fs[0].find(".") if spec_file_ext == -1: spec_nums = [file[len(spec_file_name) :] for file in spec_fs] else: spec_nums = [file[len(spec_file_name) : file.find(".")] for file in spec_fs] if all([num.isalnum() for num in spec_nums]): spec_nums = [int(num) for num in spec_nums] spec_nums.sort() if pt3_nums == spec_nums: all_okay = True if not all_okay: message = "pt3 files and spectra files dont match, please check" msg = QMessageBox( QMessageBox.Warning, "Convert pt3", message, buttons=QMessageBox.Ok, parent=self, ) msg.exec_() return with h5py.File(self._export_path, "w") as h5_f: h5_f.attrs.create(name="# Particles", data=num_files) channel = self.spbChannel.value() - 1 try: for num in range(num_files): pt3_reader = Pt3Reader(os.path.join(self._source_path, pt3_fs[num])) part_group = h5_f.create_group("Particle " + str(num + 1)) part_group.attrs.create("Date", pt3_reader.file_time) part_group.attrs.create("Discription", pt3_reader.comment_field) part_group.attrs.create("Intensity?", 1) part_group.attrs.create("RS Coord. (um)", [0, 0]) part_group.attrs.create("Spectra?", int(add_spec)) part_group.attrs.create("User", pt3_reader.creator_name) abs_times = pt3_reader.channel_records[channel].macro_times micro_times = pt3_reader.channel_records[channel].micro_times part_group.create_dataset("Absolute Times (ns)", dtype=np.uint64, data=abs_times) part_group.create_dataset("Micro Times (s)", dtype=np.float64, data=micro_times) if add_spec: spec_data = np.loadtxt(os.path.join(self._source_path, spec_fs[num])) wavelengths = spec_data[:, 0] spec_data = np.delete(spec_data, 0, axis=1).T exposure = self.spbExposure.value() spec_t_series = np.array([(n + 1) * exposure for n in range(spec_data.shape[0])]) spec_dataset = part_group.create_dataset( "Spectra (counts\s)", dtype=np.float64, data=spec_data ) spec_dataset.attrs.create("Exposure Times (s)", exposure) spec_dataset.attrs.create( "Spectra Abs. Times (s)", dtype=np.float64, data=spec_t_series, ) spec_dataset.attrs.create("Wavelengths", dtype=np.float64, data=wavelengths) except Exception as e: logger.error(e) self.setEnabled(True)
if __name__ == "__main__":
[docs] pt3_file = Pt3Reader("C:\\Google Drive\\Current_Projects\\Full_SMS\\test_data\\trace0.pt3")
# print('here')