Source code for grouping

"""Module for handling performing Agglomerative Hierarchical Clustering Algorithm.

Based on 'Detection of Intensity Change Points in Time-Resolved Single-Molecule Measurements'
from Watkins nad Yang, J. Phys. Chem. B 2005, 109, 617-628 (http://pubs.acs.org/doi/abs/10.1021/jp0467548)

Joshua Botha
University of Pretoria
2018
"""

from __future__ import annotations

# from math import lgamma

from typing import Union, List, Tuple, TYPE_CHECKING
import numpy as np
from scipy.stats import poisson

# from matplotlib import pyplot as plt
from change_point import Level
from my_logger import setup_logger
from processes import ProcessProgressTask, ProcessProgressCmd

if TYPE_CHECKING:
    from smsh5 import Particle, GlobalParticle
    from change_point import ParticleMicrotimesSubset
    from multiprocessing import JoinableQueue

import dbg

[docs] logger = setup_logger(__name__)
[docs] def max_jm(array: np.ndarray): """Finds the j and m index for the max value.""" max_n = np.argmax(array) (xD, yD) = array.shape if max_n >= xD: max_j = max_n // xD max_m = max_n % xD else: max_m = max_n max_j = 0 return max_j, max_m
[docs] class Group: def __init__(self, lvls_inds: List[int_p_s] = None, particle: Particle = None, group_ind: int = None): self.lvls_inds = lvls_inds self.group_ind = group_ind self.lvls = None self.histogram = None if self.lvls_inds is not None and particle is not None: if not particle.use_roi_for_grouping: self.lvls = [particle.cpts.levels[i] for i in self.lvls_inds] else: self.lvls = [particle.levels_roi[i] for i in self.lvls_inds] @property
[docs] def num_photons(self) -> int_p_s: return int(np.sum([level.num_photons for level in self.lvls]))
@property
[docs] def dwell_time_s(self) -> float: return float(np.sum([level.dwell_time_s for level in self.lvls]))
@property
[docs] def int_p_s(self) -> float: return self.num_photons / self.dwell_time_s
@property
[docs] def group_times_ns(self): times_ns = np.array([]) for level in self.lvls: times_ns = np.append(times_ns, level.times_ns) return times_ns
@property
[docs] def group_microtimes(self): microtimes = np.array([]) for level in self.lvls: microtimes = np.append(microtimes, level.microtimes) return microtimes
[docs] class ClusteringStep: def __init__( self, particle: Particle, first: bool = False, seed_groups: List[Group] = None, single_level: bool = False, ): self._particle: Particle = particle if not self._particle.use_roi_for_grouping: self._num_levels = particle.cpts.num_levels else: self._num_levels = particle.num_levels_roi self.first = first self.single_level = single_level self.last = False or single_level self._log_l_em = None self._em_p_mj = None self._em_log_l = None self._ahc_p_mj = None self._ahc_groups = None self.groups = None self.bic = None self.num_groups = None self.level_group_ind = None self.group_levels = None if self.first or self.single_level: self._seed_groups = [Group(lvls_inds=[i], particle=particle, group_ind=i) for i in range(self._num_levels)] self._seed_p_mj = np.identity(n=self._num_levels) if self.single_level: self.groups = self._seed_groups self.group_levels = self._particle.cpts.levels self.num_groups = self._num_levels self.level_group_ind = list(range(self._num_levels)) for level in self.group_levels: level.group_ind = 0 else: assert seed_groups is not None, "ClusteringStep: parameters not provided" self._seed_groups = seed_groups seed_p_mj = np.zeros(shape=(len(self._seed_groups), self._num_levels)) for m, group in enumerate(seed_groups): seed_p_mj[m, group.lvls_inds] = 1 self._seed_p_mj = seed_p_mj self._num_prev_groups = len(self._seed_groups) self.groups_have_hists = False @property
[docs] def group_ints(self) -> List[float]: if self.groups is not None: return [group.int_p_s for group in self.groups]
[docs] def calc_int_bounds(self, order: str = "descending") -> List[Tuple[float, float]]: """Calculates the bounds between the groups. Parameters ---------- order : str Option are 'descending' and 'ascending' """ if self.groups is not None and self.single_level is False: assert order in [ "descending", "ascending", ], "Solution: Order provided not valid" g_ints = self.group_ints.copy() g_ints.sort(reverse=(order == "descending")) int_bounds = [] for i in range(self.num_groups): if i == self.num_groups - 1: if order == "descending": int_bounds.append((0, prev_mid_int)) else: int_bounds.append((prev_mid_int, np.inf)) else: mid_int = (g_ints[i + 1] + g_ints[i]) / 2 if i == 0: if order == "descending": int_bounds.append((mid_int, np.inf)) else: int_bounds.append((0, mid_int)) prev_mid_int = mid_int else: if order == "descending": int_bounds.append((mid_int, prev_mid_int)) else: int_bounds.append((prev_mid_int, mid_int)) prev_mid_int = mid_int return int_bounds
[docs] def ahc(self): """Agglomerative Hierarchical Clustering""" # Calculate merge merit value for each possible merging situation merge_merit = np.full( shape=(self._num_prev_groups, self._num_prev_groups), fill_value=-np.inf ) for j, group_j in enumerate(self._seed_groups): # Row for m, group_m in enumerate(self._seed_groups): # Column if j < m: n_m = group_m.num_photons # Number of photons in group m n_j = group_j.num_photons # Number of photons in group j t_m = group_m.dwell_time_s # Dwell time of group m in seconds t_j = group_j.dwell_time_s # Dwell time of group j in seconds merge_merit[ j, m ] = ( # Log-Likelihood Merge merit function for joining groups j and m, eq. 11 (n_m + n_j) * np.log( (n_m + n_j) / (t_m + t_j) ) # (n_m - n_j)\ln[\frac{n_m + n_j}{T_m + T_j}] - n_m * np.log(n_m / t_m) # - n_m\ln[\frac{n_m}{T_m}] - n_j * np.log(n_j / t_j) # - n_j\ln[\frac{n_j}{T_j}] ) max_j, max_m = max_jm(merge_merit) # Find most probably merge # Perform merging of groups max_j and max_m new_groups = [] p_mj = np.zeros(shape=(self._num_prev_groups - 1, self._num_levels)) group_num = -1 for i, group_i in enumerate(self._seed_groups): if i != max_m: group_num += 1 if i == max_j: # Then merge merge_levels = group_i.lvls_inds.copy() merge_levels.extend(self._seed_groups[max_m].lvls_inds.copy()) merge_levels.sort() # New group made up of all the levels in groups max_j and max_m new_groups.append( Group(lvls_inds=merge_levels, particle=self._particle, group_ind=group_num) ) else: new_groups.append(group_i) # Set up new assignment matrix, to be as initial start for EM clustering for ind in new_groups[-1].lvls_inds: p_mj[group_num, ind] = 1 self._ahc_p_mj = p_mj self._ahc_groups = new_groups
# return ClusteringStep(particle=self.particle, first=False, groups=new_groups, seed_p_mj=new_p_mj)
[docs] def emc(self): """Expectation Maximisation clustering""" p_mj = self._ahc_p_mj.copy() # Initial state, as provided by AHC prev_p_mj = p_mj.copy() if not self._particle.use_roi_for_grouping: levels = self._particle.cpts.levels else: levels = self._particle.levels_roi # M-Step ###################################################################### i = 0 diff_p_mj = 1 p_hat_g = None while diff_p_mj > 1e-5 and i < 50: i += 1 if not self._particle.use_roi_for_grouping: cap_t = self._particle.dwell_time_s else: cap_t = self._particle.dwell_time_roi t_hat = np.zeros(shape=(self._num_prev_groups - 1,)) n_hat = np.zeros_like(t_hat) p_hat = np.zeros_like(t_hat) i_hat = np.zeros_like(t_hat) p_hat_g = np.zeros(shape=(self._num_prev_groups - 1, self._num_levels)) denom = np.zeros(shape=(self._num_levels,)) for m, group in enumerate(self._ahc_groups): # As in Fig. 9 # \hat{T}_m = \sum\limits_{j=1}^{J+1}\bar{p}_{mj}T_j t_hat[m] = np.sum( [p_mj[m, j] * l.dwell_time_s for j, l in enumerate(levels)] ) # \hat{n}_m = \sum\limits_{j=1}^{J+1}\bar{p}_{mj}n_j n_hat[m] = np.sum( [p_mj[m, j] * l.num_photons for j, l in enumerate(levels)] ) # \hat{p}_m = \frac{\hat{T}_m}{T} p_hat[m] = t_hat[m] / cap_t # \hat{I}_m = \sum\limits_{j=1}^{J+1}\frac{\bar{p}_{mj}n_j}{\hat{T}_m} i_hat[m] = np.sum( [ p_mj[m, j] * l.num_photons / t_hat[m] for j, l in enumerate(levels) ] ) # Let \bar{p}_{mj} = \frac{\alpha_{mj}}{\sum_{m=1}^G\alpha_{mj}} # where \alpha_{mj} = \hat{p}_mg(n_j;\hat{I}_m,T_j) for j, l in enumerate(levels): p_hat_g[m, j] = p_hat[m] * poisson.pmf( l.num_photons, i_hat[m] * l.dwell_time_s ) # \alpha_{mj} # \sum_{m=1}^G\alpha_{mj} = \sum_{m=1}^G\hat{p}_mg(n_j;\hat{I}_m,T_j) for j in range(self._num_levels): denom[j] = np.sum(p_hat_g[:, j]) for m in range(self._num_prev_groups - 1): try: p_mj[m, j] = ( p_hat_g[m, j] / denom[j] ) # \bar{p}_{mj}=\frac{\alpha_{mj}}{\sum_{m=1}^G\alpha_{mj}} except: # print('here') # TODO: Fix div by zero pass diff_p_mj = np.sum(np.abs(prev_p_mj - p_mj)) prev_p_mj = p_mj.copy() level_p_max = np.argmax(p_mj, 0) eff_p_mj = np.zeros_like(p_mj) for j in range(self._num_levels): eff_p_mj[level_p_max[j], j] = 1 self._em_p_mj = eff_p_mj # E-Step ###################################################################### log_l = 0 for m in range(self._num_prev_groups - 1): for j in range(self._num_levels): if ( p_hat_g[m, j] > 1e-200 and eff_p_mj[m, j] != 0 ): # Close to smallest value that doesn't result in -inf log_l += eff_p_mj[m, j] * np.log(p_hat_g[m, j]) self._em_log_l = log_l new_groups = [] for m in range(self._num_prev_groups - 1): g_m_levels = list(np.nonzero(self._em_p_mj[m, :])[0]) if len(g_m_levels): new_groups.append(Group(lvls_inds=g_m_levels, particle=self._particle, group_ind=m)) new_groups.sort(key=lambda group: group.int_p_s) self.groups = new_groups self.num_groups = len(new_groups) if self.num_groups == 1: self.last = True level_group_ind = [None] * self._num_levels for group_num, group in enumerate(self.groups): for level in group.lvls_inds: level_group_ind[level] = group_num self.level_group_ind = level_group_ind
[docs] def calc_bic(self): num_cp = self._particle.cpts.num_cpts num_g = self.num_groups self.bic = ( 2 * self._em_log_l - (2 * num_g - 1) * np.log(num_cp) - num_cp * np.log(self._particle.num_photons) )
[docs] def group_2_levels(self): if not self._particle.use_roi_for_grouping: part_levels = self._particle.cpts.levels else: part_levels = self._particle.levels_roi group_levels = [] busy_joining = False start_ind = None is_global = hasattr(self._particle, "is_global") and self._particle.is_global prev_particle_dataset_ind = -1 global_part_levels = set() start_time_ns = None prev_start_time = 0 num_levels = -1 for i, part_level in enumerate(part_levels): if is_global: global_part_levels.add(part_level) if not busy_joining: start_ind = part_level.level_inds[0] if not is_global else None # if is_global: # start_time_ns = prev_start_time if i == len(part_levels) - 1: busy_joining = False else: if self.level_group_ind[i] == self.level_group_ind[i + 1]: busy_joining = True else: busy_joining = False if not busy_joining: num_levels += 1 if not is_global: end_ind = part_level.level_inds[1] group_levels.append( Level( particle=self._particle, particle_ind=num_levels, level_inds=(start_ind, end_ind), int_p_s=self.group_ints[self.level_group_ind[i]], group_ind=self.level_group_ind[i], ) ) else: group_ind = self.level_group_ind[i] parent_particle_dataset_ind = part_level.parent_particle_dataset_ind if parent_particle_dataset_ind != prev_particle_dataset_ind: start_time_offset_ns = ( 0 if len(group_levels) == 0 else group_levels[-1].times_ns[1] ) prev_particle_dataset_ind = parent_particle_dataset_ind group_levels.append( GlobalLevel( global_particle=self._particle, parent_particle_dataset_ind=parent_particle_dataset_ind, particle_levels=list(global_part_levels), int_p_s=self.group_ints[group_ind], group_ind=self.level_group_ind[i], start_time_offset_ns=start_time_offset_ns, dwell_time_ns=self.groups[group_ind].dwell_time_s * 1e9, num_photons=self.groups[group_ind].num_photons, ) ) global_part_levels = set() self.group_levels = group_levels
@property
[docs] def group_level_dwelltimes(self): return [level.dwell_time_s for level in self.group_levels]
@property
[docs] def group_total_dwelltime(self): return np.sum(self.group_level_dwelltimes)
@property
[docs] def group_num_levels(self): return len(self.group_levels)
@property
[docs] def group_level_ints(self): return np.array([level.int_p_s for level in self.group_levels])
[docs] def setup_next_step(self) -> ClusteringStep: return ClusteringStep(self._particle, first=False, seed_groups=self.groups)
[docs] class AHCA: """ Class for executing Agglomerative Hierarchical Clustering Algorithm and storing the results. """ def __init__(self, particle): """ Parameters ---------- particle: smsh5.Particle """ self.backup = None self.has_groups = False self._particle = particle self.uuid = self._particle.uuid self.steps = None self.best_step_ind = None self.bics = None self.selected_step_ind = None self.num_steps = None self.plots_need_to_be_updated = False self.use_roi_for_grouping = True # Changed default to True on GUI as well self.grouped_with_roi = False @property
[docs] def selected_step(self) -> ClusteringStep: if self.has_groups: return self.steps[self.selected_step_ind]
@selected_step.setter def selected_step(self, step_ind): assert 0 < step_ind < self.num_steps, "AHCA: Provided step index out of range." self.selected_step_ind = step_ind @property
[docs] def best_step(self) -> ClusteringStep: if self.has_groups: return self.steps[self.best_step_ind]
@property
[docs] def steps_num_groups(self) -> List[int]: if self.has_groups: return [step.num_groups for step in self.steps]
[docs] def clear_and_backup_results(self) -> None: if self.has_groups: backup = dict() self.has_groups = False backup["best_step_ind"] = self.best_step_ind self.best_step_ind = None backup["bics"] = self.bics self.bics = None backup["num_steps"] = self.num_steps self.num_steps = None backup["selected_step"] = self.selected_step_ind self.selected_step_ind = None backup["steps"] = self.steps self.steps = None self.backup = backup
[docs] def restore_and_delete_backup(self) -> None: if self.backup is not None: self.best_step_ind = self.backup["best_step_ind"] self.bics = self.backup["bics"] self.num_steps = self.backup["num_steps"] self.selected_step_ind = self.backup["selected_step"] self.steps = self.backup["steps"] self.has_groups = True self.backup = None
[docs] def run_grouping(self, feedback_queue: JoinableQueue = None): """ Run grouping Returns ------- """ try: if self.has_groups: self.clear_and_backup_results() if self._particle.has_levels: steps = [] if self._particle.num_levels == 1: self.steps = [ClusteringStep(self._particle, single_level=True)] self.num_steps = 1 self.best_step_ind = 0 self.selected_step_ind = 0 self.has_groups = True logger.info(f"{self._particle.name} has only one level") logger.info(self.steps) else: c_step = ClusteringStep(self._particle, first=True) if not self.use_roi_for_grouping: current_num_groups = self._particle.num_levels else: current_num_groups = self._particle.num_levels_roi if feedback_queue is not None: feedback_queue.put( ProcessProgressTask( task_cmd=ProcessProgressCmd.Start, args=(current_num_groups,), ) ) inital_num_groups = current_num_groups while current_num_groups != 1: c_step.ahc() c_step.emc() c_step.calc_bic() c_step.group_2_levels() # print([lvl.int_p_s for lvl in c_step.group_levels]) steps.append(c_step) current_num_groups = c_step.num_groups if current_num_groups != 1: # print(current_num_groups) c_step = c_step.setup_next_step() if feedback_queue is not None: feedback_queue.put( ProcessProgressTask( task_cmd=ProcessProgressCmd.SetValue, args=(inital_num_groups - current_num_groups,), ) ) if feedback_queue is not None: feedback_queue.put( ProcessProgressTask(task_cmd=ProcessProgressCmd.Complete) ) self.steps = steps self.num_steps = len(steps) self.bics = [step.bic for step in steps] self.best_step_ind = np.argmax(self.bics) self.selected_step_ind = self.best_step_ind self.has_groups = True self.grouped_with_roi = self.use_roi_for_grouping if self.backup is not None: self.backup = None self.plots_need_to_be_updated = True logger.info(f"{self._particle.name} levels grouped") else: logger.info(f"{self._particle.name} has no levels to group") except Exception as e: self.restore_and_delete_backup() logger.error(e) pass
[docs] def set_selected_step(self, step_ind: int): assert 0 <= step_ind < self.num_steps, "AHCA: Provided step index out of range." self.selected_step_ind = step_ind if not all( [lvl.histogram is not None for lvl in self.steps[step_ind].group_levels] ): if not self.selected_step.groups_have_hists: self._particle.makelevelhists(force_group_levels=True) if not all( [group.histogram is not None for group in self.steps[step_ind].groups] ): self._particle.makegrouphists()
[docs] def reset_selected_step(self): self.selected_step_ind = self.best_step_ind
[docs] class GlobalLevel: def __init__( self, global_particle: Union[Particle, GlobalParticle], parent_particle_dataset_ind: int, particle_levels: List[Union[Level, GlobalLevel]], int_p_s: float, group_ind: int, start_time_offset_ns: int, dwell_time_ns: float, num_photons: int, ): self.is_global_level = True self.particle = global_particle self.parent_particle_dataset_ind = parent_particle_dataset_ind # self.particle_levels = particle_levels self.start_time_offset_ns = start_time_offset_ns self.times_ns = ( particle_levels[0].times_ns[0], particle_levels[-1].times_ns[1], ) self.int_p_s = int_p_s self.group_ind = group_ind self.dwell_time_ns = dwell_time_ns self.num_photons = num_photons microtimes_particle_inds_pairs = tuple() for level in particle_levels: if type(particle_levels) is Level: microtimes_particle_inds_pairs = ( level._particle.dataset_ind, particle_levels.level_inds, ) elif type(particle_levels) is GlobalLevel: microtimes_particle_inds_pairs.extend( level.microtimes_particle_inds_pairs ) self.microtimes_particle_inds_pairs = microtimes_particle_inds_pairs # self.microtimes = particle_levels.microtimes # self.histogram = particle_levels.histogram @property
[docs] def times_s(self): return self.times_ns[0] / 1e9, self.times_ns[1] / 1e9
@property
[docs] def dwell_time_s(self): if self.dwell_time_ns is not None: return self.dwell_time_ns / 1e9 else: return None