Source code for signals

from __future__ import annotations
import numpy as np
from PyQt5.QtCore import QObject, pyqtSignal
from typing import TYPE_CHECKING
from my_logger import setup_logger

from enum import IntEnum, auto

if TYPE_CHECKING:
    from tree_model import DatasetTreeNode
    from multiprocessing import JoinableQueue
    from processes import ProgressTracker, ProcessProgressTask

[docs] logger = setup_logger(__name__)
[docs] class WorkerSignals(QObject): """A QObject with attributes of pyqtSignal's that can be used to communicate between worker threads and the main thread."""
[docs] resolve_finished = pyqtSignal(str)
[docs] fitting_finished = pyqtSignal(str)
[docs] grouping_finished = pyqtSignal(str)
[docs] openfile_finished = pyqtSignal(bool)
[docs] save_file_version_outdated = pyqtSignal()
[docs] error = pyqtSignal(Exception)
[docs] result = pyqtSignal(object)
[docs] progress = pyqtSignal()
[docs] auto_progress = pyqtSignal(int, str)
[docs] start_progress = pyqtSignal(int)
[docs] end_progress = pyqtSignal()
[docs] status_message = pyqtSignal(str)
[docs] add_datasetindex = pyqtSignal(object)
[docs] add_particlenode = pyqtSignal(object, int)
[docs] add_all_particlenodes = pyqtSignal(list)
[docs] reset_tree = pyqtSignal()
[docs] data_loaded = pyqtSignal()
[docs] bin_size = pyqtSignal(int)
[docs] add_irf = pyqtSignal(np.ndarray, np.ndarray, object) # H5dataset)
[docs] level_resolved = pyqtSignal()
[docs] reset_gui = pyqtSignal()
[docs] set_start = pyqtSignal(float)
[docs] set_tmin = pyqtSignal(float)
[docs] plot_trace = pyqtSignal(object, bool)
[docs] plot_trace_lock = pyqtSignal(object, bool, bool)
[docs] plot_trace_export = pyqtSignal(object, bool, str)
[docs] plot_trace_export_lock = pyqtSignal(object, bool, str, bool)
[docs] plot_levels = pyqtSignal(object, bool)
[docs] plot_levels_lock = pyqtSignal(object, bool, bool)
[docs] plot_levels_export = pyqtSignal(object, bool, str, object)
[docs] plot_levels_export_lock = pyqtSignal(object, bool, str, bool)
# plot_group_bounds = pyqtSignal(object, bool)
[docs] plot_group_bounds_export = pyqtSignal(object, bool, str)
[docs] plot_group_bounds_export_lock = pyqtSignal(object, bool, str, bool)
# plot_grouping_bic = pyqtSignal(object, bool)
[docs] plot_grouping_bic_export = pyqtSignal(object, bool, str)
[docs] plot_grouping_bic_export_lock = pyqtSignal(object, bool, str, bool)
[docs] plot_decay = pyqtSignal(int, object, bool, bool)
[docs] plot_decay_lock = pyqtSignal(int, object, bool, bool, bool)
[docs] plot_decay_export = pyqtSignal(int, object, bool, bool, str)
[docs] plot_decay_export_lock = pyqtSignal(object, object, bool, bool, str, bool)
[docs] plot_convd = pyqtSignal(int, object, bool)
[docs] plot_convd_lock = pyqtSignal(int, object, bool, bool, bool)
[docs] plot_convd_export_lock = pyqtSignal(int, object, bool, bool, str, bool)
[docs] plot_decay_convd_export = pyqtSignal(object, str, bool, bool)
[docs] plot_decay_convd_export_lock = pyqtSignal(object, str, bool, bool, bool)
[docs] plot_decay_convd_residuals_export = pyqtSignal(object, str, bool, bool)
[docs] plot_decay_convd_residuals_export_lock = pyqtSignal(object, str, bool, bool, bool)
[docs] show_residual_widget = pyqtSignal(bool)
[docs] show_residual_widget_lock = pyqtSignal(bool, bool)
[docs] plot_residuals_export = pyqtSignal(int, object, bool, str)
[docs] plot_residuals_export_lock = pyqtSignal(int, object, bool, str, bool)
[docs] plot_spectra_export = pyqtSignal(object, bool, str)
[docs] plot_spectra_export_lock = pyqtSignal(object, bool, str, bool)
[docs] plot_raster_scan_export = pyqtSignal(object, object, bool, str)
[docs] plot_raster_scan_export_lock = pyqtSignal(object, object, bool, str, bool)
[docs] plot_corr = pyqtSignal(object, bool)
[docs] plot_corr_lock = pyqtSignal(object, bool, bool)
[docs] plot_corr_export = pyqtSignal(object, bool, str)
[docs] plot_corr_export_lock = pyqtSignal(object, bool, str, bool)
[docs] class WorkerSigPassType(IntEnum):
[docs] resolve_finished = auto()
[docs] fitting_finished = auto()
[docs] grouping_finished = auto()
[docs] openfile_finished = auto()
[docs] error = auto()
[docs] result = auto()
[docs] progress = auto()
[docs] auto_progress = auto()
[docs] start_progress = auto()
[docs] end_progress = auto()
[docs] progress_decrement = auto()
[docs] status_message = auto()
[docs] add_datasetindex = auto()
[docs] add_particlenode = auto()
[docs] add_all_particlenodes = auto()
[docs] reset_tree = auto()
[docs] data_loaded = auto()
[docs] bin_size = auto()
[docs] add_irf = auto()
[docs] level_resolved = auto()
[docs] reset_gui = auto()
[docs] set_start = auto()
[docs] set_tmin = auto()
[docs] def worker_sig_pass(signals: WorkerSignals, sig_type: WorkerSigPassType, args=None): assert type(signals) is WorkerSignals, "signals is incorrect type." assert type(sig_type) is WorkerSigPassType, "sig_type is incorrect type." if type(args) is not tuple: args = (args,) if sig_type is WorkerSigPassType.resolve_finished: signals.resolve_finished.emit(*args) elif sig_type is WorkerSigPassType.fitting_finished: signals.fitting_finished.emit(*args) elif sig_type is WorkerSigPassType.grouping_finished: signals.grouping_finished.emit(*args) elif sig_type is WorkerSigPassType.openfile_finished: signals.openfile_finished.emit(*args) elif sig_type is WorkerSigPassType.error: signals.error.emit(*args) elif sig_type is WorkerSigPassType.result: signals.result.emit(*args) elif sig_type is WorkerSigPassType.progress: signals.progress.emit(*args) elif sig_type is WorkerSigPassType.auto_progress: signals.auto_progress.emit(*args) elif sig_type is WorkerSigPassType.start_progress: signals.start_progress.emit(*args) elif sig_type is WorkerSigPassType.status_message: signals.status_message.emit(*args) elif sig_type is WorkerSigPassType.add_datasetindex: signals.add_datasetindex.emit(*args) elif sig_type is WorkerSigPassType.add_particlenode: signals.add_particlenode.emit(*args) elif sig_type is WorkerSigPassType.add_all_particlenodes: signals.add_all_particlenodes.emit(*args) elif sig_type is WorkerSigPassType.reset_tree: signals.reset_tree.emit() elif sig_type is WorkerSigPassType.data_loaded: signals.data_loaded.emit() elif sig_type is WorkerSigPassType.bin_size: signals.bin_size.emit(*args) elif sig_type is WorkerSigPassType.add_irf: signals.add_irf.emit(*args) elif sig_type is WorkerSigPassType.level_resolved: signals.level_resolved.emit(*args) elif sig_type is WorkerSigPassType.reset_gui: signals.reset_gui.emit(*args) elif sig_type is WorkerSigPassType.set_start: signals.set_start.emit(*args) elif sig_type is WorkerSigPassType.set_tmin: signals.set_tmin.emit(*args) else: logger.error(f"Feedback return not configured for: {sig_type}")
[docs] class ProcessThreadSignals(QObject): """ Defines the signals available from the running worker thread """
[docs] finished = pyqtSignal(object)
# single_result = pyqtSignal(object)
[docs] results = pyqtSignal(object)
[docs] start_progress = pyqtSignal(int)
[docs] set_progress = pyqtSignal(int)
[docs] step_progress = pyqtSignal(float)
[docs] step_one_progress = pyqtSignal()
[docs] add_progress = pyqtSignal(int)
[docs] end_progress = pyqtSignal()
[docs] status_update = pyqtSignal(str)
[docs] error = pyqtSignal(Exception)
[docs] passthrough = pyqtSignal(object)