from __future__ import annotations
import numpy as np
from PyQt5.QtCore import QObject, pyqtSignal
from typing import TYPE_CHECKING
from my_logger import setup_logger
from enum import IntEnum, auto
if TYPE_CHECKING:
from tree_model import DatasetTreeNode
from multiprocessing import JoinableQueue
from processes import ProgressTracker, ProcessProgressTask
[docs]
logger = setup_logger(__name__)
[docs]
class WorkerSignals(QObject):
"""A QObject with attributes of pyqtSignal's that can be used
to communicate between worker threads and the main thread."""
[docs]
resolve_finished = pyqtSignal(str)
[docs]
fitting_finished = pyqtSignal(str)
[docs]
grouping_finished = pyqtSignal(str)
[docs]
openfile_finished = pyqtSignal(bool)
[docs]
save_file_version_outdated = pyqtSignal()
[docs]
error = pyqtSignal(Exception)
[docs]
result = pyqtSignal(object)
[docs]
progress = pyqtSignal()
[docs]
auto_progress = pyqtSignal(int, str)
[docs]
start_progress = pyqtSignal(int)
[docs]
end_progress = pyqtSignal()
[docs]
status_message = pyqtSignal(str)
[docs]
add_datasetindex = pyqtSignal(object)
[docs]
add_particlenode = pyqtSignal(object, int)
[docs]
add_all_particlenodes = pyqtSignal(list)
[docs]
reset_tree = pyqtSignal()
[docs]
data_loaded = pyqtSignal()
[docs]
bin_size = pyqtSignal(int)
[docs]
add_irf = pyqtSignal(np.ndarray, np.ndarray, object) # H5dataset)
[docs]
level_resolved = pyqtSignal()
[docs]
reset_gui = pyqtSignal()
[docs]
set_start = pyqtSignal(float)
[docs]
set_tmin = pyqtSignal(float)
[docs]
plot_trace = pyqtSignal(object, bool)
[docs]
plot_trace_lock = pyqtSignal(object, bool, bool)
[docs]
plot_trace_export = pyqtSignal(object, bool, str)
[docs]
plot_trace_export_lock = pyqtSignal(object, bool, str, bool)
[docs]
plot_levels = pyqtSignal(object, bool)
[docs]
plot_levels_lock = pyqtSignal(object, bool, bool)
[docs]
plot_levels_export = pyqtSignal(object, bool, str, object)
[docs]
plot_levels_export_lock = pyqtSignal(object, bool, str, bool)
# plot_group_bounds = pyqtSignal(object, bool)
[docs]
plot_group_bounds_export = pyqtSignal(object, bool, str)
[docs]
plot_group_bounds_export_lock = pyqtSignal(object, bool, str, bool)
# plot_grouping_bic = pyqtSignal(object, bool)
[docs]
plot_grouping_bic_export = pyqtSignal(object, bool, str)
[docs]
plot_grouping_bic_export_lock = pyqtSignal(object, bool, str, bool)
[docs]
plot_decay = pyqtSignal(int, object, bool, bool)
[docs]
plot_decay_lock = pyqtSignal(int, object, bool, bool, bool)
[docs]
plot_decay_export = pyqtSignal(int, object, bool, bool, str)
[docs]
plot_decay_export_lock = pyqtSignal(object, object, bool, bool, str, bool)
[docs]
plot_convd = pyqtSignal(int, object, bool)
[docs]
plot_convd_lock = pyqtSignal(int, object, bool, bool, bool)
[docs]
plot_convd_export_lock = pyqtSignal(int, object, bool, bool, str, bool)
[docs]
plot_decay_convd_export = pyqtSignal(object, str, bool, bool)
[docs]
plot_decay_convd_export_lock = pyqtSignal(object, str, bool, bool, bool)
[docs]
plot_decay_convd_residuals_export = pyqtSignal(object, str, bool, bool)
[docs]
plot_decay_convd_residuals_export_lock = pyqtSignal(object, str, bool, bool, bool)
[docs]
plot_residuals_export = pyqtSignal(int, object, bool, str)
[docs]
plot_residuals_export_lock = pyqtSignal(int, object, bool, str, bool)
[docs]
plot_spectra_export = pyqtSignal(object, bool, str)
[docs]
plot_spectra_export_lock = pyqtSignal(object, bool, str, bool)
[docs]
plot_raster_scan_export = pyqtSignal(object, object, bool, str)
[docs]
plot_raster_scan_export_lock = pyqtSignal(object, object, bool, str, bool)
[docs]
plot_corr = pyqtSignal(object, bool)
[docs]
plot_corr_lock = pyqtSignal(object, bool, bool)
[docs]
plot_corr_export = pyqtSignal(object, bool, str)
[docs]
plot_corr_export_lock = pyqtSignal(object, bool, str, bool)
[docs]
class WorkerSigPassType(IntEnum):
[docs]
resolve_finished = auto()
[docs]
fitting_finished = auto()
[docs]
grouping_finished = auto()
[docs]
openfile_finished = auto()
[docs]
start_progress = auto()
[docs]
progress_decrement = auto()
[docs]
status_message = auto()
[docs]
add_datasetindex = auto()
[docs]
add_particlenode = auto()
[docs]
add_all_particlenodes = auto()
[docs]
level_resolved = auto()
[docs]
def worker_sig_pass(signals: WorkerSignals, sig_type: WorkerSigPassType, args=None):
assert type(signals) is WorkerSignals, "signals is incorrect type."
assert type(sig_type) is WorkerSigPassType, "sig_type is incorrect type."
if type(args) is not tuple:
args = (args,)
if sig_type is WorkerSigPassType.resolve_finished:
signals.resolve_finished.emit(*args)
elif sig_type is WorkerSigPassType.fitting_finished:
signals.fitting_finished.emit(*args)
elif sig_type is WorkerSigPassType.grouping_finished:
signals.grouping_finished.emit(*args)
elif sig_type is WorkerSigPassType.openfile_finished:
signals.openfile_finished.emit(*args)
elif sig_type is WorkerSigPassType.error:
signals.error.emit(*args)
elif sig_type is WorkerSigPassType.result:
signals.result.emit(*args)
elif sig_type is WorkerSigPassType.progress:
signals.progress.emit(*args)
elif sig_type is WorkerSigPassType.auto_progress:
signals.auto_progress.emit(*args)
elif sig_type is WorkerSigPassType.start_progress:
signals.start_progress.emit(*args)
elif sig_type is WorkerSigPassType.status_message:
signals.status_message.emit(*args)
elif sig_type is WorkerSigPassType.add_datasetindex:
signals.add_datasetindex.emit(*args)
elif sig_type is WorkerSigPassType.add_particlenode:
signals.add_particlenode.emit(*args)
elif sig_type is WorkerSigPassType.add_all_particlenodes:
signals.add_all_particlenodes.emit(*args)
elif sig_type is WorkerSigPassType.reset_tree:
signals.reset_tree.emit()
elif sig_type is WorkerSigPassType.data_loaded:
signals.data_loaded.emit()
elif sig_type is WorkerSigPassType.bin_size:
signals.bin_size.emit(*args)
elif sig_type is WorkerSigPassType.add_irf:
signals.add_irf.emit(*args)
elif sig_type is WorkerSigPassType.level_resolved:
signals.level_resolved.emit(*args)
elif sig_type is WorkerSigPassType.reset_gui:
signals.reset_gui.emit(*args)
elif sig_type is WorkerSigPassType.set_start:
signals.set_start.emit(*args)
elif sig_type is WorkerSigPassType.set_tmin:
signals.set_tmin.emit(*args)
else:
logger.error(f"Feedback return not configured for: {sig_type}")
[docs]
class ProcessThreadSignals(QObject):
"""
Defines the signals available from the running worker thread
"""
[docs]
finished = pyqtSignal(object)
# single_result = pyqtSignal(object)
[docs]
results = pyqtSignal(object)
[docs]
start_progress = pyqtSignal(int)
[docs]
set_progress = pyqtSignal(int)
[docs]
step_progress = pyqtSignal(float)
[docs]
step_one_progress = pyqtSignal()
[docs]
add_progress = pyqtSignal(int)
[docs]
end_progress = pyqtSignal()
[docs]
status_update = pyqtSignal(str)
[docs]
error = pyqtSignal(Exception)
[docs]
passthrough = pyqtSignal(object)