Source code for change_point

"""Module for handling analysis of change points and creation of consequent levels.

Based on 'Detection of Intensity Change Points in Time-Resolved Single-Molecule Measurements'
from Watkins nad Yang, J. Phys. Chem. B 2005, 109, 617-628 (http://pubs.acs.org/doi/abs/10.1021/jp0467548)

Joshua Botha
University of Pretoria
2019
"""

from __future__ import annotations

[docs] __docformat__ = "NumPy"
import os from typing import Tuple, Optional, TYPE_CHECKING, List import numpy as np from h5py import Dataset from statsmodels.stats.weightstats import DescrStatsW import dbg import file_manager as fm from my_logger import setup_logger from settings_dialog import Settings import sys import smsh5_file_reader as sms_fr if TYPE_CHECKING: from smsh5 import Particle from generate_sums import CPSums # replaced by settings_dialog # MIN_PHOTONS = 20 # MIN_EDGE_PHOTONS = 7 # BURST_MIN_DWELL = 0.001 # Seconds # BURST_INT_SIGMA = 3
[docs] logger = setup_logger(__name__)
[docs] class ParticleMicrotimesSubset: """A container for a custom list that accesses a subset of a H5 dataset""" def __init__(self, particle: Particle, start_ind: int, end_ind: int): self._particle = particle if not 0 <= start_ind < len(self._micro_dataset) - 1: raise IndexError("Start index out of bounds") if not 0 < end_ind <= len(self._micro_dataset) - 1: raise IndexError("End index out of bounds") self._start_ind = start_ind self._end_ind = end_ind @property
[docs] def _micro_dataset(self): if self._particle.file is not None: return sms_fr.microtimes(self._particle)
[docs] def __len__(self): return 1 + self._end_ind - self._start_ind
[docs] def _get_ind(self, ind) -> int: if ind >= 0: return self._start_ind + ind else: return self._end_ind + ind + 1
[docs] def __getitem__(self, i): if type(i) is int: ind = self._get_ind(i) if not self._start_ind <= ind <= self._end_ind: raise IndexError("Index out of defined range") return self._micro_dataset[ind] elif type(i) is slice: slice_start = i.start slice_stop = i.stop slice_step = i.step if slice_start is not None: slice_start = self._get_ind(slice_start) if not self._start_ind <= slice_start <= self._end_ind: raise IndexError("Index out of defined dataset range") else: slice_start = self._start_ind if slice_stop is not None: slice_stop = self._get_ind(slice_stop) if not self._start_ind <= slice_stop <= self._end_ind + 1: raise IndexError("Index out of defined dataset range") else: slice_stop = self._end_ind + 1 new_slice = slice(slice_start, slice_stop, slice_step) return self._micro_dataset[new_slice]
[docs] def __iter__(self): for elem in self._micro_dataset[self._start_ind : self._end_ind]: yield elem
[docs] def __repr__(self): return ( f"<{self.__class__.__name__} with index range ({self._start_ind}, " f"{self._end_ind}) of {self._micro_dataset.__repr__()[1:-1]}>" )
[docs] def __eq__(self, other): return self.__getitem__(slice(None)) == other
[docs] class ChangePoints: """Contains all the attributes to describe the found change points in an analysed particles.""" def __init__(self, particle, confidence=None, run_levels: bool = None): """ Creates an instance of ChangePoints for the particle object provided. If the confidence argument is given the change point analysis will be run. If run_levels is set to True the analysed change points will be used to define the resulting levels. Parameters ---------- particle: smsh5.Particle An the parent Particle instance. confidence: run_levels """ self.bursts_deleted = None self._particle = particle self.uuid = self._particle.uuid # self.cpa_has_run = False self._cpa = ChangePointAnalysis(particle, confidence) self.has_burst = False self.burst_levels = None if run_levels is not None: self._run_levels = run_levels else: self._run_levels = False @property
[docs] def has_levels(self): return self._cpa.has_levels
@property
[docs] def levels(self) -> List[Level]: return self._cpa.levels
@property
[docs] def num_levels(self): return self._cpa.num_levels
@property
[docs] def level_ints(self): return self._cpa.level_ints
@property
[docs] def level_dwelltimes(self): return self._cpa.level_dwelltimes
@property
[docs] def cpa_has_run(self): return self._cpa.has_run
# Confidence property ###################### @property
[docs] def confidence(self): return self._cpa.confidence
@confidence.setter def confidence(self, confidence): self._cpa.confidence = confidence # Change Point Indexes property ################################ @property
[docs] def cpt_inds(self): return self._cpa.cpt_inds
@cpt_inds.setter def cpt_inds(self, cpt_inds): self._cpa.cpt_inds = cpt_inds # Number of change points property ############################## @property
[docs] def num_cpts(self): return self._cpa.num_cpts
@num_cpts.setter def num_cpts(self, num_cpts): self._cpa.num_cpts = num_cpts # Confidence regions property ############################## @property
[docs] def conf_regions(self): return self._cpa.conf_regions
@conf_regions.setter def conf_regions(self, conf_regions): self._cpa.cpt_inds = conf_regions # Time uncertainty property ############################## # @property # def dt_uncertainty(self): # if self.cpa_has_run: # return self._cpa.dt_uncertainty # else: # return None # # @dt_uncertainty.setter # def dt_uncertainty(self, dt_uncertainty): # if self.cpa_has_run: # self._cpa.dt_uncertainty = dt_uncertainty
[docs] def run_cpa( self, all_sums: CPSums, confidence=None, run_levels=None, end_time_s=None ): """ Run change point analysis. Performs the change point analysis on the parent particle object with the confidence either provided as an argument here or in the __init__ method. Parameters ---------- confidence : Confidence level with which to resolve the change points with. Must be 0.99, 0.95, 0.90 or 0.69. run_levels : If true the change point analysis will be used to add a list of levels to the parent particle object by running its add_levels method. end_time_s """ if run_levels is not None: self._run_levels = run_levels if confidence is not None: if confidence in [69, 90, 95, 99]: confidence = confidence / 100 self.confidence = confidence assert ( self.confidence is not None ), "ChangePoint\tConfidence not set, can not run cpa" if self.cpa_has_run: self._cpa.reset(confidence) self._cpa.run_cpa( all_sums=all_sums, confidence=confidence, end_time_s=end_time_s ) if self._run_levels: self._cpa.define_levels() if self.has_levels: self.check_burst() # self.level_ints, self.level_dwelltimes logger.info(msg=f"{self._particle.name} levels resolved")
[docs] def calc_mean_std(self): # , intensities: np.ndarray = None assert ( self.has_levels ), "ChangePoints\tNeeds to have levels to calculate mean and standard deviation." # num_levels = self._particle.num_levels # if intensities is None: # intensities = np.array([level.int_p_s for level in self._particle.levels]) dwell_weights = np.array(self.level_dwelltimes) / np.sum(self.level_dwelltimes) weighted_stats = DescrStatsW(self.level_ints, dwell_weights) self._particle.avg_int_weighted = weighted_stats.mean self._particle.int_std_weighted = weighted_stats.std
[docs] def check_burst(self): # , intensities: np.ndarray = None, dwell_times: list = None # settings min_dwell_time = self._cpa.settings.pb_min_dwell_time use_sigma = self._cpa.settings.pb_use_sigma_thresh sigam_int_thresh = self._cpa.settings.pb_sigma_int_thresh defined_int_thresh = self._cpa.settings.pb_defined_int_thresh # assert ( # self._particle.has_levels # ), "ChangePoints\tNeeds to have levels to check photon bursts." if self._particle.has_levels: if self.bursts_deleted is not None: self.restore_bursts() if use_sigma: self.calc_mean_std() # self.level_ints burst_def = self._particle.avg_int_weighted + ( self._particle.int_std_weighted * sigam_int_thresh ) else: burst_def = defined_int_thresh # Chaning to logical_and, instead of logical_or burst_bools = np.logical_and( self.level_ints > burst_def, np.array(self.level_dwelltimes) < min_dwell_time, ) burst_levels = np.where(burst_bools)[0] if len(burst_levels): self.has_burst = True self.burst_levels = burst_levels
[docs] def remove_bursts(self): assert self.has_burst, "Particle\tNo bursts to remove." try: cpt_inds = self.cpt_inds conf_regions = self.conf_regions num_ctps_max = self.num_cpts photon_bursts_deleted = [] for ind, burst_ind in enumerate(np.flip(self.burst_levels)): # print(burst_ind) merge_left = None if burst_ind == num_ctps_max: merge_left = True elif burst_ind == 0: merge_left = False elif self.level_ints[burst_ind + 1] > self.level_ints[burst_ind - 1]: merge_left = False else: merge_left = True merge_ind = 0 if merge_left: del_ind = burst_ind - 1 num_ctps_max -= 1 else: del_ind = burst_ind deleted_cpt = cpt_inds[del_ind] cpt_inds = np.delete(cpt_inds, del_ind) deleted_conf_region = conf_regions.pop(del_ind) photon_bursts_deleted.append( { "pos_ind": del_ind, "cpt_ind": deleted_cpt, "conf_region": deleted_conf_region, } ) except IndexError as e: logger.error( f"Index error while removing photon burst for {self._particle}" ) logger.error(e) else: self.cpt_inds = cpt_inds self._cpa.conf_regions = conf_regions self.num_cpts = len(self.cpt_inds) self._cpa.define_levels(remove_prev=True) self.bursts_deleted = photon_bursts_deleted
[docs] def restore_bursts(self): if self.bursts_deleted is not None: for burst in self.bursts_deleted: self._cpa.cpt_inds = np.insert( self.cpt_inds, burst["pos_ind"], burst["cpt_ind"] ) self._cpa.conf_regions.insert(burst["pos_ind"], burst["conf_region"]) self.bursts_deleted = None self.num_cpts = len(self.cpt_inds) self._cpa.define_levels(remove_prev=True)
[docs] class Err: def __init__(self, lower: float, upper: float): self.lower = lower self.upper = upper self.sum = lower + upper
[docs] class Level: """Defines the start, end and intensity of a single level.""" def __init__( self, particle: Particle, particle_ind: int, level_inds: Tuple[int, int], int_p_s: float = None, group_ind: int = None, ): """ Initiate Level Initiates attributes that define a single resolved level. These attributes are the start and end indexes, the start and end time in ns, the number of photons in the level, the dwell time and the intensity. :param abs_times: Dataset of absolute arrival times (ns) in a h5 file as read by h5py. :type abs_times: HDF5 Dataset :param level_inds: A tuples that contain the start and end of the level (ns). :type level_inds: tuple """ assert particle is not None, 'Levels:\tParameter "particle" not provided' self._particle = particle # assert abs_times is not None, "Levels:\tParameter 'abstimes' not given." assert level_inds is not None, "Levels:\tParameter 'level_inds' not given." assert type(level_inds) is tuple, ( "Level:\tLevel indexes argument is not a " "tuple (start, end)." ) self.particle_ind = particle_ind self.level_inds = level_inds # (first_ind, last_ind) self.num_photons = self.level_inds[1] - self.level_inds[0] + 1 if len(self.abs_times) < self.level_inds[1]: print("oops") self.times_ns = ( self.abs_times[self.level_inds[0]], self.abs_times[self.level_inds[1]], ) self.dwell_time_ns = self.times_ns[1] - self.times_ns[0] if int_p_s is not None: self.int_p_s = int_p_s else: try: self.int_p_s = self.num_photons / self.dwell_time_s except RuntimeWarning as e: print("here") self.microtimes = ParticleMicrotimesSubset( particle=self._particle, start_ind=self.level_inds[0], end_ind=self.level_inds[1], ) # self.microtimes = microtimes[self.level_inds[0]:self.level_inds[1]] self.group_ind = group_ind self.histogram = None # TODO: Incorporate error margins # conf_ind_lower = conf_regions[0] # conf_ind_upper = conf_regions[1] # # dwell_err_lower = abs_times[conf_ind_lower] - self.times_ns[0] # dwell_err_upper = abs_times[conf_ind_upper] - self.times_ns[0] # self.dwell_err_ns = Err(lower=dwell_err_lower, upper=dwell_err_upper) # # num_ph_err_lower = conf_ind_lower - self.level_inds[0] # + 1 # num_ph_err_upper = conf_ind_upper - self.level_inds[0] # + 1 # int_err_lower = self.int_p_s - (1E9 * num_ph_err_lower / dwell_err_lower) # int_err_upper = self.int_p_s - (1E9 * num_ph_err_upper / dwell_err_upper) # self.int_err_p_s = Err(lower=int_err_lower, upper=int_err_upper) @property
[docs] def abs_times(self): if self._particle.file is not None: return sms_fr.abstimes(particle=self._particle)
@property
[docs] def times_s(self): return self.times_ns[0] / 1e9, self.times_ns[1] / 1e9
@property
[docs] def dwell_time_s(self): if self.dwell_time_ns is not None: return self.dwell_time_ns / 1e9 else: return None
[docs] class TauData: """Loads and stores the tau_a and tau_b files from text files for specific confidence and stores as attributes.""" def __init__(self, confidence=None): """ Initialise TauData instance. Reads local tau_a and tau_b text files and stores data in attributes for a specific confidence interval. :param confidence: Confidence of tau_a and tau_b to retrieve. Valid values are 0.99, 0.95, 0.90 and 0.69. :type confidence: float """ try: tau_data_path = fm.folder_path( folder_name="tau_data", resource_type=fm.Type.Data ) assert os.path.isdir( tau_data_path ), "TauData:\tTau data directory not found." tau_data_files = { "99_a": "Ta-99.txt", "99_b": "Tb-99.txt", "95_a": "Ta-95.txt", "95_b": "Tb-95.txt", "90_a": "Ta-90.txt", "90_b": "Tb-90.txt", "69_a": "Ta-69.txt", "69_b": "Tb-69.txt", } assert confidence in [ 0.99, 0.95, 0.90, 0.69, ], "TauData:\tInvalid confidence provided. Can not provide confidence key." if confidence == 0.99: conf_keys = ["99_a", "99_b"] elif confidence == 0.95: conf_keys = ["95_a", "95_b"] elif confidence == 0.90: conf_keys = ["90_a", "90_b"] else: conf_keys = ["69_a", "69_b"] for tau_type in conf_keys: file_name = tau_data_files[tau_type] full_path = tau_data_path + os.path.sep + file_name assert os.path.isfile(full_path), ( "TauData:\tTau data file " + file_name + " does not exist. Look in" + full_path + "." ) data = np.loadtxt(full_path, usecols=1) if tau_type[-1] == "a": self._a = data else: self._b = data except Exception as err: logger.error(err)
[docs] def get_tau_a(self, num_data_points=None): """ Get tau_a value for n = num_data_points. Retrieve the a tau data for the given number of data points. :param num_data_points: Number of data points that the tau data is needed for. **Note**, only use values up to and smaller than 1000 for accuracy. :return: tau_a value :rtype: float """ assert num_data_points is not None, "TauData:\tNumber of data points not given." return self._a[num_data_points]
[docs] def get_tau_b(self, num_data_points=None): """ Get tau_a value for n = num_data_points. Retrieve the a tau data for the given number of data points. :param num_data_points: Number of data points that the tau data is needed for. **Note**, only use values up to and smaller than 1000 for accuracy. :return: tau_a value :rtype: float """ assert num_data_points is not None, "TauData:\tNumber of data points not given." return self._b[num_data_points]
[docs] class ChangePointAnalysis: """Perform analysis of particle abstimes data to resolve change points.""" def __init__(self, particle: Particle = None, confidence=None): """ Initiate ChangePointAnalysis instance. :param particle: Object containing particle data :type particle: Class Particle in smsh5 module :param confidence: Confidence interval. Valid values are 0.99, 0.95, 0.90 and 0.69. :type confidence: float """ # assert h5file is not None, "No HDF5 has been given" # To ensure that a h5file is given # assert type(particle) is smsh5.Particle, "ChangePoints:\tNo Particle object given." # assert confidence is not None, "ChangePoints:\tNo confidence parameter given." self._particle = particle self.has_run = False self.end_at_photon = None self.num_photons = particle.num_photons self.confidence = confidence self.cpt_inds = np.array([], dtype=int) self.conf_regions = list(tuple()) # [(start, end)] # self.dt_uncertainty = np.array([]) # dt self._finding = False self.found_cpts = False self.num_cpts = None self.has_levels = False self.levels = None self.num_levels = None self._i = None if confidence is not None: self._tau = TauData(self.confidence) self.settings = Settings() @property
[docs] def _abstimes(self): if self._particle.file is not None and self._particle.file.__bool__() is True: return self._particle.abstimes
@property
[docs] def _microtimes(self): if self._particle.file is not None and self._particle.file.__bool__() is True: return self._particle.microtimes
[docs] def load_settings(self): settings_file_path = fm.path("settings.json", fm.Type.ProjectRoot) with open(settings_file_path, "r") as settings_file: if not hasattr(self, "settings"): self.settings = Settings() self.settings.load_settings_from_file(file_or_path=settings_file)
[docs] def reset(self, confidence: float = None): self.has_run = False self.confidence = confidence self.cpt_inds = np.array([], dtype=int) self.conf_regions = list(tuple()) # [(start, end)] # self.dt_uncertainty = np.array([]) # dt self.has_levels = False self.levels = None self.num_levels = None self._finding = False self.found_cpts = False self.num_cpts = None self._i = None if confidence is not None: self._tau = TauData(self.confidence)
@property
[docs] def level_ints(self): if self.has_run: return np.array([level.int_p_s for level in self.levels]) else: return None
@property
[docs] def level_dwelltimes(self): if self.has_run: return [level.dwell_time_s for level in self.levels] else: return None
[docs] def __weighted_likelihood_ratio( self, all_sums: CPSums, seg_inds=None ) -> Tuple[bool, Optional[int]]: """ Calculates the Weighted & Standardised Likelihood ratio and detects the possible change point. Based on 'Detection of Intensity Change Points in Time-Resolved Single-Molecule Measurements' from Watkins nad Yang, J. Phys. Chem. B 2005, 109, 617-628 (http://pubs.acs.org/doi/abs/10.1021/jp0467548) If the possible change point is greater than the tau_a value for the corresponding confidence interval and number of data points the detected change points, it's confidence region (as defined by tau_b), and the corresponding uncertainty in time is added to this instance of ChangePointAnalysis. Parameters ---------- seg_inds : (int, int), optional Segment indexes (start, end). Returns ------- cpt_found : bool True if a change point was detected. cpt : int, Optional The index of the change point, if one was detected. """ # settings min_num_photons = self.settings.cpa_min_num_photons min_boundary_offset = self.settings.cpa_min_boundary_offset assert ( type(seg_inds) is tuple ), "ChangePointAnalysis:\tSegment index's not given." start_ind, end_ind = seg_inds n = end_ind - start_ind assert ( n <= 1000 ), "ChangePointAnalysis:\tIndex's given result in more than a segment of more than 1000 points." if n < min_num_photons: cpt_found = False return cpt_found, None time_data = self._abstimes[start_ind:end_ind] ini_time = time_data[0] period = time_data[-1] - ini_time wlr = np.zeros(n, float) # sig_e = np.pi ** 2 / 6 - sum(1 / j ** 2 for j in range(1, (n - 1) + 1)) sig_e = all_sums.get_sig_e(n) for k in range(2, (n - 2) + 1): # Remember!!!! range(1, N) = [1, ... , N-1] sum_set = all_sums.get_set(n, k) cap_v_k = (time_data[k] - ini_time) / period # Just after eq. 4 # u_k = -sum(1 / j for j in range(k, (n - 1) + 1)) # Just after eq. 6 u_k = sum_set["u_k"] # u_n_k = -sum(1 / j for j in range(n - k, (n - 1) + 1)) # Just after eq. 6 u_n_k = sum_set["u_n_k"] l0_minus_expec_l0 = ( -2 * k * np.log(cap_v_k) + 2 * k * u_k - 2 * (n - k) * np.log(1 - cap_v_k) + 2 * (n - k) * u_n_k ) # Just after eq. 6 # v_k2 = sum(1 / j ** 2 for j in range(k, (n - 1) + 1)) # Just before eq. 7 v2_k = sum_set["v2_k"] # v2_n_k = sum(1 / j ** 2 for j in range(n - k, (n - 1) + 1)) # Just before eq. 7 v2_n_k = sum_set["v2_n_k"] sigma_k = np.sqrt( 4 * (k**2) * v2_k + 4 * ((n - k) ** 2) * v2_n_k - 8 * k * (n - k) * sig_e ) # Just before eq. 7, and note errata w_k = (1 / 2) * np.log((4 * k * (n - k)) / n**2) # Just after eq. 6 wlr.itemset( k, l0_minus_expec_l0 / sigma_k + w_k ) # Eq. 6 and just after eq. 6 max_ind_local = int(wlr.argmax()) cpt = None if ( max_ind_local >= min_boundary_offset and n - max_ind_local >= min_boundary_offset and wlr[max_ind_local] >= self._tau.get_tau_a(n) ): cpt = max_ind_local + start_ind self.cpt_inds = np.append(self.cpt_inds, max_ind_local + start_ind) tau_b_inv = wlr.max() - self._tau.get_tau_b(n) region_all_local = np.where(wlr >= tau_b_inv)[0] region = (region_all_local[0] + start_ind, region_all_local[-1] + start_ind) dt = self._abstimes[region[1]] - self._abstimes[region[0]] self.conf_regions.append(region) # list, not ndarray # self.dt_uncertainty = np.append(self.dt_uncertainty, dt) cpt_found = True else: cpt_found = False return cpt_found, cpt
[docs] def _next_seg_ind( self, prev_seg_inds: Tuple[int, int] = None, side: str = None, rights_cpt: int = None, ) -> Tuple[int, int]: """ Calculates the next segments indexes. Uses the indexes of the previous segment, as well as the latest change point to calculate the index values of the next segment. See code2flow.com for flow diagram of if statements. https://code2flow.com/svLn85 Parameters ---------- prev_seg_inds : (int, int) Contains the start and end of the previous segment (start, end) side: str, Optional If a change point was detected in the previous segment choose left or right of it. Possible values are 'left' or 'right'. rights_cpt : int, Optional The index of the change point for the right leg (the next_seg_start if side = 'left'). Returns ------- next_seg : (int, int) The next segments indexes. """ min_num_photons = self.settings.cpa_min_num_photons if self.end_at_photon is not None: last_photon_ind = self.end_at_photon else: last_photon_ind = self.num_photons - 1 if prev_seg_inds is None: # Data sets need to be larger than 200 photons # assert self.num_photons >= 200, 'ChangePointAnalysis:\tData set needs to ' \ # 'be at least 200 photons for change point detection.' if self.num_photons < 200: logger.info( "ChangePointAnalysis:\tData set needs to be at least 200 photons for change point detection." ) next_start_ind, next_end_ind = None, None return next_start_ind, next_end_ind if self.num_photons > 1000: next_start_ind, next_end_ind = 0, 1000 else: next_start_ind, next_end_ind = 0, last_photon_ind else: prev_start_ind, prev_end_ind = prev_seg_inds if len(self.cpt_inds) == 0: if last_photon_ind >= prev_end_ind + 800: next_start_ind, next_end_ind = ( prev_end_ind - 200, prev_end_ind + 800, ) elif ( last_photon_ind - prev_end_ind >= self.settings.cpa_min_num_photons ): # Next segment needs to be at least 10 photons large. next_start_ind, next_end_ind = prev_end_ind - 200, last_photon_ind else: next_start_ind, next_end_ind = None, None dbg.p( f"Warning, last photon segment smaller than {self.settings.cpa_min_num_photons} photons and was not tested", "Change Point", ) elif side is not None: # or prev_start_ind < self.cpts[-1] < prev_end_ind if side is not None: assert side in [ "left", "right", ], "ChangePointAnalysis:\tSide of change point invalid or not specified" if side == "left": next_start_ind, next_end_ind = prev_start_ind, int( self.cpt_inds[-1] - 1 ) else: assert ( rights_cpt is not None ), "ChangePointAnalysis\tRight side's change point not provided." next_start_ind = rights_cpt # if len(self.cpt_inds) > 1: # i = -1 # while self.cpt_inds[i - 1] > self.cpt_inds[i]: # i -= 1 # if self.cpt_inds[i] == prev_end_ind + 1: # break # next_start_ind = self.cpt_inds[i] next_end_ind = prev_end_ind elif last_photon_ind >= prev_end_ind + 800: if prev_end_ind - 200 < self.cpt_inds[-1] < prev_end_ind: if last_photon_ind >= self.cpt_inds[-1] + 1000: next_start_ind, next_end_ind = ( int(self.cpt_inds[-1]), int(self.cpt_inds[-1]) + 1000, ) else: next_start_ind, next_end_ind = ( int(self.cpt_inds[-1]), last_photon_ind, ) else: next_start_ind, next_end_ind = ( prev_end_ind - 200, prev_end_ind + 800, ) elif ( last_photon_ind - prev_end_ind >= 10 ): # Next segment needs to be at least 10 photons large. if prev_end_ind - 200 < self.cpt_inds[-1] < prev_end_ind: next_start_ind, next_end_ind = ( int(max(self.cpt_inds)), last_photon_ind, ) else: next_start_ind, next_end_ind = prev_end_ind - 200, last_photon_ind else: next_start_ind, next_end_ind = None, None if prev_end_ind != last_photon_ind: dbg.p( "Warning, last photon segment smaller than 10 photons and was not tested", "Change Point", ) return next_start_ind, next_end_ind
[docs] def _find_all_cpts( self, all_sums: CPSums, _seg_inds: Tuple[int, int] = None, _side: str = None, _right_cpt: int = None, ): """ Find all change points in particle. Recursive function that finds all change points that meets the confidence criteria. .. note:: The first call doesn't need to be called with any parameters. .. note:: The top level assigns the number of detected change points to the .num_cpts attribute of this instance of ChangePointAnalysis. Parameters ---------- _right_cpt : int, Optional The index of the change point for the right leg (the next_seg_start if side = 'left'). _seg_inds : (int, int), Optional The index of the segment that is to be searched. Calculated by _next_seg_ind method. _side : str, Optional Determines current segment is left or right of a previously detected change point. Valid values are 'left' or 'right'. """ is_top_level = False if self._finding is False: is_top_level = True self._finding = True assert _seg_inds is None, ( "ChangePointAnalysis:\tDo not provide seg_inds when calling, it's used for " "recursive calling only. " ) if is_top_level: _seg_inds = self._next_seg_ind() self._i = 0 else: _seg_inds = self._next_seg_ind( prev_seg_inds=_seg_inds, side=_side, rights_cpt=_right_cpt ) self._i += 1 if _seg_inds != (None, None): cpt_found, _right_cpt = self.__weighted_likelihood_ratio( all_sums, _seg_inds ) if cpt_found: # Left side of change point self._find_all_cpts(all_sums, _seg_inds, _side="left") # Right side of change point self._find_all_cpts( all_sums, _seg_inds, _side="right", _right_cpt=_right_cpt ) pass # Exits if recursive if self.end_at_photon is not None: end_ind = self.end_at_photon else: end_ind = self.num_photons if _seg_inds[1] <= end_ind + 9 and _side is None: self._find_all_cpts(all_sums, _seg_inds) if is_top_level: self._finding = False self.found_cpts = True sort_inds = np.argsort(self.cpt_inds) cpt_inds = np.zeros_like(self.cpt_inds) conf_regions = list() # dt_uncertainty = np.zeros_like(cpt_inds) for i, sort_i in enumerate(sort_inds): cpt_inds[i] = self.cpt_inds[sort_i] conf_regions.append(self.conf_regions[sort_i]) # dt_uncertainty[i] = self.dt_uncertainty[sort_i] # TODO: Revisit necessity of duplicate removal cpt_inds, unique_inds = np.unique(cpt_inds, return_inverse=True) dups = np.append( [False], [ unique_inds[i] == unique_inds[i - 1] for i in range(len(unique_inds)) if i > 0 ], ) dups = np.flip(np.where(dups)[0].tolist()) if len(dups) != 0: for i in dups: del conf_regions[i] self.num_cpts = len(cpt_inds) self.cpt_inds = cpt_inds self.conf_regions = conf_regions
[docs] def define_levels(self, remove_prev: bool = None) -> None: """ Creates a list of levels as defined by the detected change points. Uses the detected change points to create a list of Level instances that contain attributes that define each resolved level. This method populate the .levels attribute of the parent particle instance by using its add_levels method. Parameters ---------- remove_prev : bool, Optional If true, previous levels will be removed. """ if len(self.cpt_inds) != 0: if remove_prev is None: remove_prev = False if remove_prev: self.levels = None self.num_levels = None assert self.found_cpts, ( "ChangePointAnalysis:\tChange point analysis " "not done, or found no change points. " ) self.num_levels = self.num_cpts + 1 self.levels = [object()] * self.num_levels if self.num_cpts != 1: for num, cpt in enumerate(self.cpt_inds): if num == 0: # First change point level_inds = (0, cpt - 1) elif num == self.num_cpts - 1: # Last change point if self.end_at_photon is not None: end_ind = self.end_at_photon else: end_ind = self.num_photons level_inds = (cpt, end_ind - 1) self.levels[num + 1] = Level( particle=self._particle, particle_ind=num + 1, level_inds=level_inds, ) level_inds = (self.cpt_inds[num - 1], cpt - 1) else: level_inds = (self.cpt_inds[num - 1], cpt) self.levels[num] = Level( particle=self._particle, particle_ind=self._particle.dataset_ind, level_inds=level_inds, ) else: self.levels[0] = Level( particle=self._particle, particle_ind=0, level_inds=(0, self.cpt_inds[0] - 1), ) self.levels[1] = Level( particle=self._particle, particle_ind=1, level_inds=(self.cpt_inds[0], self.num_photons - 1), ) self.has_levels = True elif self.has_run: # Has run, no cpts -> One single level if self.num_photons >= 200: self.levels = [ Level( particle=self._particle, particle_ind=0, level_inds=(0, self.num_photons - 1), ) ] self.num_levels = 1 self.num_cpts = 0 self.has_levels = True else: self.num_levels = 0 self.num_cpts = 0 self.has_levels = False
# @dbg.profile
[docs] def run_cpa(self, all_sums: CPSums, confidence=None, end_time_s=None): """ Runs the change point analysis. If the ChangePointAnalysis wasn't initialised with a confidence interval, or if the analysis is to be rerun with a new confidence interval, this method starts said analysis. Parameters ---------- end_time_s: float Time at which to end analysis. If not provided the whole trace will be used. confidence: float Confidence interval. Valid values are 0.99, 0.95, 0.90 and 0.69. Returns ------- num_cpts: int Number of change points detected cpt_inds: ndarray Indexes of change points conf_regions: list(tuple(int, int)) Index region corresponding to confidence interval dt_uncertainty: ndarray Array of uncertainty in time corresonding to confidence interval """ self.load_settings() if confidence is not None: assert confidence in [ 0.99, 0.95, 0.90, 0.69, ], "ChangePointAnalysis:\tConfidence value given not valid." self.confidence = confidence self._tau = TauData(confidence) else: assert ( self.confidence is not None ), "ChangePointAnalysis:\tNo confidence value provided." if end_time_s is not None: if self._abstimes.size != 0: self.end_at_photon = np.argmax(self._abstimes[:] > (end_time_s * 1e9)) else: self.end_at_photon = 0 if self.end_at_photon == 0: self.end_at_photon = self.num_photons self._find_all_cpts(all_sums=all_sums) self.has_run = True
[docs] def main(): """ Tests ChangePoints init """ pass
if __name__ == "__main__": main()