from __future__ import annotations
import os
import numpy as np
import multiprocessing as mp
from processes import (
ProgressTracker,
ProcessProgressTask as PProgTask,
ProcessSigPassTask as PSigTask,
ProcessProgressCmd as PProgCmd,
ProcessProgress,
ProcessProgFeedback,
PassSigFeedback,
)
from signals import WorkerSigPassType as WSType # , PassSigFeedback
import multiprocessing as mp
import smsh5
from my_logger import setup_logger
from tree_model import DatasetTreeNode
# if TYPE_CHECKING:
[docs]
logger = setup_logger(__name__)
[docs]
class OpenFile:
"""A QRunnable class to create a worker thread for opening h5 file."""
def __init__(
self,
file_path: str,
is_irf: bool = False,
tmin=None,
progress_tracker: ProgressTracker = None,
):
"""
Initiate Open File Worker
Creates a QRunnable object (worker) to be run by a QThreadPool thread.
This worker is intended to call the given function to open a h5 file
and populate the tree in the mainwindow g
Parameters
----------
fname : str
The name of the file.
is_irf : bool
Whether the thread is loading an IRF or not.
"""
self._file_path = file_path
self._is_irf = is_irf
self._tmin = tmin
self.progress_tracker = progress_tracker
@property
[docs]
def file_path(self) -> str:
return self._file_path
@file_path.setter
def file_path(self, file_path: str):
assert type(file_path) is str, "file_path must be of type str"
assert os.path.exists(file_path) and os.path.isfile(file_path), "file_path is not a path to a valid file"
self._file_path = file_path
@property
[docs]
def irf(self) -> bool:
return self._is_irf
@irf.setter
def irf(self, is_irf: bool):
assert type(is_irf) is bool, "is_irf is not of type bool"
self._is_irf = is_irf
@property
[docs]
def tmin(self):
return self._tmin
@tmin.setter
def tmin(self, tmin):
# assert
self._tmin = tmin
[docs]
def open_h5(self, feedback_queue: mp.JoinableQueue) -> smsh5.H5dataset:
"""
Read the selected h5 file and populates the tree on the gui with the file and the particles.
Accepts a function that will be used to indicate the current progress.
As this function is designed to be called from a thread other than the main one, no GUI code
should be called here.
Parameters
----------
fname : str
Path name to h5 file.
feedback_queue : multiprocessing.JoinableQueue
Queue to send feedback to ProcessThread
"""
try:
logger.info("Running Open File Thread Task")
sig_fb = PassSigFeedback(feedback_queue=feedback_queue)
prog_fb = ProcessProgFeedback(feedback_queue=feedback_queue)
dataset = self.load_data(fname=self.file_path, sig_fb=sig_fb, prog_fb=prog_fb)
datasetnode = DatasetTreeNode(
self.file_path[0][self.file_path[0].rfind("/") + 1 : -3],
dataset,
"dataset",
)
# sig_fb.add_datasetnode(node=datasetnode)
prog_fb.set_status(status="Adding particles...")
# prog_fb.start(max_value=dataset.numpart)
all_nodes = [(datasetnode, -1)]
for i, particle in enumerate(dataset.particles):
if not particle.is_secondary_part: # "secondary" particles contain data from second TCSPC card
particlenode = DatasetTreeNode(particle.name, particle, "particle")
all_nodes.append((particlenode, i))
sig_fb.add_all_particlenodes(all_nodes=all_nodes)
sig_fb.reset_tree()
starttimes = []
tmins = []
for particle in dataset.particles:
if not particle.is_secondary_part:
# Find max, then search backward for first zero to find the best startpoint
decay = particle.histogram.decay
histmax_ind = np.argmax(decay)
reverse = decay[:histmax_ind][::-1]
zeros_rev = np.where(reverse == 0)[0]
if len(zeros_rev) != 0:
length = 0
start_ind_rev = zeros_rev[0]
for i, val in enumerate(zeros_rev[:-1]):
if zeros_rev[i + 1] - val > 1:
length = 0
continue
length += 1
if length >= 10:
start_ind_rev = val
break
start_ind = histmax_ind - start_ind_rev
# starttime = particle.histogram.t[start_ind]
starttime = start_ind
else:
starttime = 0
starttimes.append(starttime)
try:
tmin = np.min(particle.histogram.microtimes)
except ValueError:
tmin = 0
tmins.append(tmin)
av_start = np.average(starttimes)
sig_fb.set_start(start=av_start)
global_tmin = np.min(tmins)
for particle in dataset.particles:
particle.tmin = global_tmin
sig_fb.set_tmin(tmin=global_tmin)
sig_fb.data_loaded()
prog_fb.set_status(status="Done")
except Exception as err:
logger.error(err, exc_info=True)
raise err
[docs]
def open_irf(self, feedback_queue: mp.JoinableQueue) -> None:
"""
Read the selected h5 file and populates the tree on the gui with the file and the particles.
Accepts a function that will be used to indicate the current progress.
As this function is designed to be called from a thread other than the main one, no GUI code
should be called here.
Parameters
----------
fname : str
Path name to h5 file.
"""
try:
sig_fb = PassSigFeedback(feedback_queue=feedback_queue)
prog_fb = ProcessProgFeedback(feedback_queue=feedback_queue)
dataset = self.load_data(fname=self.file_path, sig_fb=sig_fb, prog_fb=prog_fb)
for particle in dataset.particles:
particle.tmin = self.tmin
# particle.tmin = np.min(particle.histogram.microtimes)
irfhist = dataset.particles[0].histogram
irfhist._particle = None
# irfhist.t -= irfhist.t.min()
sig_fb.add_irf(irfhist.decay, irfhist.t, dataset)
prog_fb.set_status(status="Done")
# start_progress_sig.emit(100)
# status_sig.emit("Done")
except Exception as err:
self.signals.error.emit(err)
[docs]
def load_data(self, fname: str, sig_fb: PassSigFeedback, prog_fb: ProcessProgFeedback):
dataset = smsh5.H5dataset(fname[0], sig_fb=sig_fb, prog_fb=prog_fb)
bin_all(
dataset=dataset,
bin_size=100,
for_irf=self.irf,
sig_fb=sig_fb,
prog_fb=prog_fb,
)
dataset.makehistograms()
return dataset
[docs]
class BinAll:
def __init__(
self,
dataset: smsh5.H5dataset,
bin_size: float,
progress_tracker: ProgressTracker = None,
):
self.dataset = dataset
self.bin_size = bin_size
self.bin_all_func = bin_all
self.progress_tracker = progress_tracker
[docs]
def run_bin_all(self, feedback_queue: mp.JoinableQueue):
sig_fb = PassSigFeedback(feedback_queue=feedback_queue)
prog_fb = ProcessProgFeedback(feedback_queue=feedback_queue)
self.bin_all_func(dataset=self.dataset, bin_size=self.bin_size, sig_fb=sig_fb, prog_fb=prog_fb)
[docs]
def bin_all(
dataset: smsh5.H5dataset,
bin_size: float,
for_irf: bool = False,
sig_fb: PassSigFeedback = None,
prog_fb: ProcessProgFeedback = None,
) -> None:
"""
Parameters
----------
bin_size
dataset
sig_fb
prog_fb
"""
if prog_fb:
prog_fb.start(max_value=0)
if not for_irf:
prog_fb.set_status(status="Binning traces...")
else:
prog_fb.set_status(status="Binning IRF trace...")
dataset.bin_all_ints(bin_size, sig_fb=sig_fb, prog_fb=prog_fb)
if sig_fb:
sig_fb.bin_size(bin_size=int(bin_size))