Source code for threads

# import copy
from __future__ import annotations
import time
from typing import Union, List, TYPE_CHECKING

from PyQt5.QtCore import QRunnable, pyqtSlot

# import processes as prcs
from my_logger import setup_logger
from signals import WorkerSignals, ProcessThreadSignals, worker_sig_pass
from smsh5 import H5dataset, Particle

# from thread_commands import StatusCmd, ProgressCmd
from processes import (
    ProcessProgressCmd as PPCmd,
    ProcessSigPassTask as PSCmd,
    ProcessProgressTask as PPTask,
    ProcessSigPassTask as PSTask,
    ProcessTask,
    create_queue,
    get_max_num_processes,
    get_empty_queue_exception,
    SingleProcess,
    ProcessTaskResult,
    prog_sig_pass,
    ProcessProgress,
    ProcessProgFeedback,
    apply_autoproxy_fix,
    create_manager,
)
from tempfile import TemporaryDirectory

if TYPE_CHECKING:
    from generate_sums import CPSums

[docs] logger = setup_logger(__name__)
[docs] class ProcessThread(QRunnable): """ Worker thread """ def __init__( self, num_processes: int = None, tasks: Union[ProcessTask, List[ProcessTask]] = None, signals: ProcessThreadSignals = None, worker_signals: WorkerSignals = None, task_buffer_size: int = None, status_message: str = None, temp_dir: TemporaryDirectory = None, ): # logger.info("Inside ProcessThread __init__") super().__init__() # logger.info("After super().__init__()") self._processes = [] # logger.info("About to create manager") self._manager = create_manager() # logger.info("About to create queues") self.task_queue = create_queue() self.result_queue = create_queue() self.feedback_queue = self._manager.Queue() # self.feedback_queue = create_queue() self.force_stop = False self.is_running = False self._status_message = status_message self._temp_dir = temp_dir if num_processes: # assert type(num_processes) is int, 'Provided num_processes is ' \ # 'not int' if type(num_processes) is not int: raise TypeError("Provided num_processes must be of type int") self.num_processes = num_processes else: self.num_processes = get_max_num_processes() - 1 if not task_buffer_size: task_buffer_size = self.num_processes // 2 self.task_buffer_size = task_buffer_size if not signals: # logger.info("About to create ProcessThreadsSignals object") self.signals = ProcessThreadSignals() else: # assert type(signals) is ThreadSignals, 'Provided signals wrong ' \ # 'type' if type(signals) is not ProcessThreadSignals: raise TypeError("Provided signals must be of type ProcessThreadSignals") self.signals = signals if not worker_signals: self.worker_signals = WorkerSignals() else: if type(worker_signals) is not WorkerSignals: raise TypeError("Provided signals must be of type ProcessThreadSignals") self.worker_signals = worker_signals # self.tasks = self._manager.list() self.tasks = list() if tasks: self.add_task(tasks) self.results = [] @property
[docs] def status_message(self): return self._status_message
@status_message.setter def status_message(self, message: str): assert type(message) is str, "status_message must be str" self._status_message = message
[docs] def add_tasks(self, tasks: Union[ProcessTask, List[ProcessTask]]): if type(tasks) is not List: tasks = [tasks] all_valid = all([type(task) is ProcessTask for task in tasks]) # assert all_valid, "At least some provided tasks are not correct type" if not all_valid: raise TypeError("At least some of provided tasks are not of " "type ProcessTask") self.tasks.extend(tasks)
[docs] def add_tasks_from_methods(self, objects: Union[object, List[object]], method_name: str, args=None): if type(objects) is not list: objects = [objects] # assert type(method_name) is str, 'Method_name is not str' if type(method_name) is not str: raise TypeError("Provided method_name must be of type str") all_valid = all([hasattr(obj, method_name) for obj in objects]) # assert all_valid, 'Some or all objects do not have specified method' if not all_valid: raise TypeError("Some or all objects do not have " "the specified method") if args is not None and type(args) is not tuple: args = (args,) for obj in objects: self.tasks.append(ProcessTask(obj=obj, method_name=method_name, args=args))
@pyqtSlot()
[docs] def run(self, num_processes: int = None): """ Your code goes in this function """ logger.info("Running Process Thread") self.is_running = True num_active_processes = 0 try: self.results = [None] * len(self.tasks) # self.signals.status_update.emit("Testing") # prog_tracker = prcs.ProgressTracker(len(self.tasks)) num_init_tasks = len(self.tasks) if not num_init_tasks: raise TypeError("No tasks were provided") if num_init_tasks > 1: if self._status_message is None: status_message = "Busy..." else: status_message = self._status_message self.signals.status_update.emit(status_message) self.signals.start_progress.emit(num_init_tasks) elif self._status_message is not None: self.signals.status_update.emit(self._status_message) task_uuids = [task.uuid for task in self.tasks] num_used_processes = self.num_processes if num_init_tasks < self.num_processes: num_used_processes = num_init_tasks num_active_processes = 0 for _ in range(num_used_processes): process = SingleProcess( task_queue=self.task_queue, result_queue=self.result_queue, feedback_queue=self.feedback_queue, temp_dir=self._temp_dir, ) self._processes.append(process) process.start() num_active_processes += 1 num_task_left = len(self.tasks) single_task = num_task_left == 1 init_num = num_used_processes + self.task_buffer_size rest = len(self.tasks) - init_num if rest < 0: init_num += rest process_progress = None if not single_task: prog_fb = ProcessProgFeedback(feedback_queue=self.feedback_queue) process_progress = ProcessProgress(prog_fb=prog_fb, num_iterations=num_init_tasks) process_progress.start_progress() next_task_ind = 0 for _ in range(init_num): next_task = self.tasks.pop(0) self.task_queue.put(next_task) del next_task next_task_ind += 1 while num_task_left and not self.force_stop: try: if not self.feedback_queue.empty(): while not self.feedback_queue.empty(): self.check_fbk_queue() result = self.result_queue.get(timeout=0.01) except get_empty_queue_exception(): pass else: # if next_task_ind != len(self.tasks): if len(self.tasks): self.task_queue.put(self.tasks.pop(0)) # self.task_queue.put(self.tasks[next_task_ind]) next_task_ind += 1 if type(result) is not ProcessTaskResult: if isinstance(result, Exception): raise result else: raise TypeError("Task result is not of type ProcessTaskResult") elif not result.dont_send: self.signals.results.emit(result) del result # ind = task_uuids.index(result.task_uuid) # self.results[ind] = result self.result_queue.task_done() if not single_task: process_progress.iterate() num_task_left -= 1 except Exception as exception: self.signals.error.emit(exception) # else: # self.signals.result.emit(self.tasks) finally: if self.force_stop: while not self.task_queue.empty(): self.task_queue.get() self.task_queue.task_done() while not self.result_queue.empty(): self.result_queue.get() self.result_queue.task_done() else: time.sleep(1) try: if not self.feedback_queue.empty(): for _ in range(self.feedback_queue.qsize()): self.check_fbk_queue() except BrokenPipeError as exception: print("Warning: Broken Pipe") self.signals.error.emit(exception) if process in self._processes: for _ in range(num_used_processes): self.task_queue.put(None) for _ in range(num_used_processes): result = self.result_queue.get() if result is True: self.result_queue.task_done() num_active_processes -= 1 while any([p.is_alive() for p in self._processes]): time.sleep(1) # self.signals.results.emit(self.results) self.signals.end_progress.emit() self.is_running = False self.signals.finished.emit(self) # self.task_queue.join() # self.result_queue.join() # self.feedback_queue.join() self._manager.shutdown()
[docs] def check_fbk_queue(self): fbk_return = self.feedback_queue.get() if type(fbk_return) is PPTask: prog_sig_pass(signals=self.signals, cmd=fbk_return.task_cmd, args=fbk_return.args) elif type(fbk_return) is PSTask: worker_sig_pass( signals=self.worker_signals, sig_type=fbk_return.sig_pass_type, args=fbk_return.sig_args, )
[docs] class WorkerFitLifetimes(QRunnable): """A QRunnable class to create a worker thread for fitting lifetimes.""" def __init__( self, fit_lifetimes_func, data, currentparticle, fitparam, mode: str, resolve_selected=None, ) -> None: """ Initiate Resolve Levels Worker Creates a QRunnable object (worker) to be run by a QThreadPool thread. This worker is intended to call the given function to resolve a single, the selected, or all the particles'. Parameters ---------- resolve_levels_func : function The function that will be called to perform the resolving of the levels. mode : {'current', 'selected', 'all'} Determines the mode that the levels need to be resolved on. Options are 'current', 'selected' or 'all' resolve_selected : list[smsh5.Particle], optional The provided instances of the class Particle in smsh5 will be resolved. """ super(WorkerFitLifetimes, self).__init__() self.mode = mode self.signals = WorkerSignals() self.fit_lifetimes_func = fit_lifetimes_func self.resolve_selected = resolve_selected self.data = data self.currentparticle = currentparticle self.fitparam = fitparam @pyqtSlot()
[docs] def run(self) -> None: """The code that will be run when the thread is started.""" try: self.fit_lifetimes_func( self.signals.start_progress, self.signals.progress, self.signals.status_message, self.signals.reset_gui, self.data, self.currentparticle, self.fitparam, self.mode, self.resolve_selected, ) except Exception as err: self.signals.error.emit(err) finally: self.signals.fitting_finished.emit(self.mode)
[docs] class WorkerGrouping(QRunnable): def __init__( self, data: H5dataset, grouping_func, mode: str, currentparticle: Particle = None, group_selected=None, ) -> None: """ Initiate Resolve Levels Worker Creates a QRunnable object (worker) to be run by a QThreadPool thread. This worker is intended to call the given function to resolve a single, the selected, or all the particles'. Parameters ---------- resolve_levels_func : function The function that will be called to perform the resolving of the levels. mode : {'current', 'selected', 'all'} Determines the mode that the levels need to be resolved on. Options are 'current', 'selected' or 'all' resolve_selected : list[smsh5.Particle], optional The provided instances of the class Particle in smsh5 will be resolved. """ super(WorkerGrouping, self).__init__() self.mode = mode self.signals = WorkerSignals() self.grouping_func = grouping_func self.group_selected = group_selected self.data = data self.currentparticle = currentparticle # self.fitparam = fitparam @pyqtSlot()
[docs] def run(self) -> None: """The code that will be run when the thread is started.""" try: self.grouping_func( start_progress_sig=self.signals.start_progress, progress_sig=self.signals.progress, status_sig=self.signals.status_message, reset_gui_sig=self.signals.reset_gui, data=self.data, mode=self.mode, currentparticle=self.currentparticle, group_selected=self.group_selected, ) except Exception as err: self.signals.error.emit(err) finally: self.signals.grouping_finished.emit(self.mode) pass
[docs] class WorkerResolveLevels(QRunnable): """A QRunnable class to create a worker thread for resolving levels.""" def __init__( self, resolve_levels_func, conf: Union[int, float], data: H5dataset, currentparticle: Particle, mode: str, resolve_selected=None, end_time_s=None, ) -> None: """ Initiate Resolve Levels Worker Creates a QRunnable object (worker) to be run by a QThreadPool thread. This worker is intended to call the given function to resolve a single, the selected, or all the particles'. Parameters ---------- resolve_levels_func : function The function that will be called to perform the resolving of the levels. mode : {'current', 'selected', 'all'} Determines the mode that the levels need to be resolved on. Options are 'current', 'selected' or 'all' resolve_selected : list[smsh5.Particle], optional The provided instances of the class Particle in smsh5 will be resolved. """ super(WorkerResolveLevels, self).__init__() self.mode = mode self.signals = WorkerSignals() self.resolve_levels_func = resolve_levels_func self.resolve_selected = resolve_selected self.conf = conf self.data = data self.currentparticle = currentparticle self.end_time_s = end_time_s # print(self.currentparticle) @pyqtSlot()
[docs] def run(self) -> None: """The code that will be run when the thread is started.""" try: self.resolve_levels_func( self.signals.start_progress, self.signals.progress, self.signals.status_message, self.signals.reset_gui, self.signals.level_resolved, self.conf, self.data, self.currentparticle, self.mode, self.resolve_selected, self.end_time_s, ) except Exception as err: self.signals.error.emit(err) finally: self.signals.resolve_finished.emit(self.mode)
[docs] class WorkerBinAll(QRunnable): """A QRunnable class to create a worker thread for binning all the data.""" def __init__(self, dataset, binall_func, bin_size): """ Initiate Open File Worker Creates a QRunnable object (worker) to be run by a QThreadPool thread. This worker is intended to call the given function to open a h5 file and populate the tree in the mainwindow g Parameters ---------- fname : str The name of the file. binall_func : function Function to be called that will read the h5 file and populate the tree on the g """ super(WorkerBinAll, self).__init__() self.dataset = dataset self.binall_func = binall_func self.signals = WorkerSignals() self.bin_size = bin_size @pyqtSlot()
[docs] def run(self) -> None: """The code that will be run when the thread is started.""" try: self.binall_func(self.dataset, self.bin_size) except Exception as err: self.signals.error.emit(err) finally: # self.signals.resolve_finished.emit(False) ????? pass