"""Module for handling SMS data from HDF5 files
Bertus van Heerden and Joshua Botha
University of Pretoria
2018
"""
from __future__ import annotations
import ast
import os
import re
import traceback
from typing import List, Union, TYPE_CHECKING, Tuple
from uuid import uuid1
import h5pickle
import h5py
import numpy as np
from pyqtgraph import ScatterPlotItem, SpotItem
import dbg
import grouping
import tcspcfit
from change_point import ChangePoints
from generate_sums import CPSums
from grouping import AHCA, GlobalLevel
from my_logger import setup_logger
from processes import ProcessProgFeedback, ProcessProgress, PassSigFeedback
from tcspcfit import FittingParameters
import smsh5_file_reader as h5_fr
from antibunching import AntibunchingAnalysis
if TYPE_CHECKING:
from change_point import Level
from grouping import GlobalLevel
[docs]
logger = setup_logger(__name__)
[docs]
class H5dataset:
"""Represents an entire HDF5 dataset
Parameters
----------
filename : str
HDF5 file path
sig_fb : PassSigFeedback, optional
feedback queue for signals
prog_fb : ProcessProgFeedback, optional
feedback queue for updating progress bar
"""
def __init__(
self,
filename,
sig_fb: PassSigFeedback = None,
prog_fb: ProcessProgFeedback = None,
):
[docs]
self.cpa_has_run = False
[docs]
self.use_parallel = False
if prog_fb is not None:
prog_fb.set_status(status="Reading file...")
[docs]
self._file = h5pickle.File(self.name, "r")
[docs]
self.file_version = h5_fr.file_version(dataset=self)
all_keys = self.file.keys()
part_keys = [part_key for part_key in all_keys if "Particle " in part_key]
natural_p_names = [None] * len(part_keys)
natural_key = []
for name in part_keys:
for seg in re.split("(\d+)", name):
if seg.isdigit():
natural_key.append(int(seg))
for num, key_num in enumerate(natural_key):
natural_p_names[key_num - 1] = part_keys[num]
[docs]
self.all_sums = CPSums(n_min=10, n_max=1000, prog_fb=prog_fb)
[docs]
self.all_raster_scans = list()
map_particle_indexes = self.get_all_raster_scans(particle_names=natural_p_names)
if map_particle_indexes is not None:
self.has_raster_scans = True
else:
self.has_raster_scans = False
[docs]
self.particles = []
for num, particle_name in enumerate(natural_p_names):
if map_particle_indexes is not None:
this_raster_scan_index = map_particle_indexes[num]
this_raster_scan = self.all_raster_scans[map_particle_indexes[num]]
else:
this_raster_scan_index = None
this_raster_scan = None
prim_part = Particle(
name=particle_name,
dataset_ind=num,
dataset=self,
raster_scan_dataset_index=this_raster_scan_index,
raster_scan=this_raster_scan,
)
if (
h5_fr.abstimes2(prim_part) is not None
): # if second card data exists, create secondary particle
# print(particle_name)
sec_part = Particle(
name=particle_name,
dataset_ind=num,
dataset=self,
raster_scan_dataset_index=this_raster_scan_index,
raster_scan=this_raster_scan,
is_secondary_part=True,
prim_part=prim_part,
)
prim_part.sec_part = sec_part
self.particles.append(prim_part)
self.particles.append(sec_part)
else:
self.particles.append(prim_part)
self.num_parts += 1
assert self.num_parts == h5_fr.num_parts(dataset=self)
[docs]
self.channelwidth = None
[docs]
self.save_selected = None
[docs]
self.has_levels = False
[docs]
self.has_groups = False
[docs]
self.has_lifetimes = False
[docs]
self.has_spectra = False
[docs]
self.global_particle = None
@property
[docs]
def file(self):
"""HDF5 file."""
if self._file is not None and self._file.__bool__() is True:
return self._file
else:
raise Warning("File not set")
[docs]
def unload_file(self, should_close: bool = True, should_delete: bool = True):
"""Remove file reference and close and/or delete the file."""
if should_close:
if self._file is not None and self._file.__bool__() is True:
self._file.close()
if should_delete:
del self._file
self._file = None
[docs]
def load_file(self, new_file: h5pickle.File):
"""Load pickled HDF file."""
if type(new_file) is h5pickle.File and new_file.__bool__() is True:
self._file = new_file
self.name = new_file.filename
return True
else:
if type(new_file) is h5pickle.File and new_file.__bool__() is False:
logger.error("Provided H5 file file is closed.")
else:
logger.error("Provided H5 file invalid.")
[docs]
def get_all_raster_scans(self, particle_names: List[str]) -> list:
"""Get all the raster scans from the file and handle appropriately.
Each raster scan has one or more particles associated with it. In the HDF5
file, this is manifested as having the same raster scan duplicated across
particle objects. This function reads out all unique raster scans and assigns
them each their particles. The return value is a map from particle numbers to
'raster scan numbers - if there's a raster scan for particles 1 and 2
and another one for particles 3-5, the map would be: [1, 1, 2, 2, 2]
Arguments
---------
particle_names : list[str]
list of particle names to read raster scan data from
Returns
-------
map_particle_index : list or None
maps particle numbers to raster scan numbers.
"""
raster_scans = list()
file_keys = self.file.keys()
for num, particle_name in enumerate(particle_names):
if particle_name in file_keys:
particle = h5_fr.particle(particle_num=num, dataset=self)
if h5_fr.has_raster_scan(particle=particle):
raster_scans.append((h5_fr.raster_scan(particle=particle), num))
if len(raster_scans) != 0:
prev_raster_scan = None
group_indexes = list()
map_particle_index = list()
raster_scan_counter = 0
for raster_scan, num in raster_scans:
if raster_scan == prev_raster_scan and num != len(raster_scans) - 1:
# Must not add new RasterScan, because same raster scan as previous and not last
group_indexes.append(num)
raster_scan_num = len(self.all_raster_scans)
else: # Must add new RasterScan, because raster scan different from previous, or last
if num != 0: # Not first one
if num != len(raster_scans) - 1: # Not last one
# Save previous raster scan with previous group indexes
print(raster_scan_num)
print(group_indexes)
self.all_raster_scans.append(
RasterScan(
h5dataset=self,
particle_num=num-1,
h5dataset_index=raster_scan_num,
particle_indexes=group_indexes,
)
)
raster_scan_counter += 1
raster_scan_num = len(self.all_raster_scans)
else: # Last one
# Save this raster scan with updated group indexes
if raster_scan == prev_raster_scan:
# Last one is part of previous group
group_indexes.append(num)
raster_scan_num = len(self.all_raster_scans)
else:
# Last one part of new group
self.all_raster_scans.append(
RasterScan(
h5dataset=self,
particle_num=num,
h5dataset_index=raster_scan_num,
particle_indexes=group_indexes,
)
)
raster_scan_counter += 1
group_indexes = [num]
raster_scan_num = len(self.all_raster_scans)
self.all_raster_scans.append(
RasterScan(
h5dataset=self,
particle_num=num,
h5dataset_index=raster_scan_counter,
particle_indexes=group_indexes,
)
)
raster_scan_counter += 1
else: # first one
raster_scan_num = 0
if len(raster_scans) == 1:
group_indexes = [0]
self.all_raster_scans.append(
RasterScan(
h5dataset=self,
particle_num=num,
h5dataset_index=raster_scan_counter,
particle_indexes=group_indexes,
)
)
group_indexes = [num]
prev_raster_scan = raster_scan
map_particle_index.append(raster_scan_num)
return map_particle_index
else:
return None
[docs]
def makehistograms(self, remove_zeros=True, startpoint=None, channel=True):
"""Put the (micro) arrival times into histograms."""
for particle in self.particles:
particle.makehistograms(remove_zeros, startpoint, channel)
[docs]
def bin_all_ints(
self,
binsize: float,
sig_fb: PassSigFeedback = None,
prog_fb: ProcessProgFeedback = None,
):
"""Bin the absolute times into intensity traces.
Arguments
---------
binsize : float
Time bin size in ms
sig_fb : PassSigFeedback, optional
feedback queue for signals
prog_fb : ProcessProgFeedback, optional
feedback queue for updating progress bar
"""
if prog_fb:
proc_tracker = ProcessProgress(
prog_fb=prog_fb, num_iterations=len(self.particles)
)
# if proc_tracker:
# if not proc_tracker.has_num_iterations:
# proc_tracker.num_iterations = len(self.particles)
for particle in self.particles:
particle.binints(binsize)
if prog_fb:
proc_tracker.iterate()
if prog_fb:
prog_fb.end()
# dbg.p('Binning all done', 'H5Dataset')
[docs]
def save_particles(self, file_path, selected_nums: List[int]):
"""Save selected particle to a new or existing HDF5 file.
Arguments
---------
file_path : str
Path to existing file, or to file that will be created.
selected_nums : List[int]
Particle numbers to be written to HDF5 file.
"""
add = os.path.exists(file_path)
if add:
new_h5file = h5pickle.File(file_path, mode="r+")
num_existing = new_h5file.attrs.get("# Particles")
else:
new_h5file = h5pickle.File(file_path, mode="w")
num_existing = 0
for i, selected in enumerate(selected_nums):
new_h5file.copy(
self.file[f"/Particle {selected}"],
new_h5file,
name=f"/Particle {num_existing + i + 1}",
)
if add:
new_h5file.attrs.modify("# Particles", num_existing + len(selected_nums))
else:
new_h5file.attrs.create("# Particles", len(selected_nums))
new_h5file.close()
[docs]
class Particle:
"""Represents a particle in an `H5dataset`.
Parameters
----------
name : str
The name of the particle
dataset_ind : H5dataset
The index of the particle in the dataset
dataset : H5dataset
The instance of the dataset to which this particle belongs
raster_scan_dataset_index : int
The index of the raster scan connected to the particle
raster_scan : RasterScan
The raster scan object this particle is connected to
is_secondary_part : bool
Whether this is a "secondary particle" that contains the data from a second TCSCPC card
prim_part : Particle
If this particle is a secondary particle, the corresponding primary particle
sec_part : Particle
If this particle is a primary particle, the corresponding secondary particle
tmin : int, optional
Minimum photon micro time in ns
tmax : int, optional
Maximum photon micro time in ns
channelwidth : float, optional
TCSPC histogram channelwidth in ns. Normally automatically determined.
is_global : bool = False
TODO: What is this?
"""
def __init__(
self,
name: str,
dataset_ind: int,
dataset: H5dataset,
raster_scan_dataset_index: int = None,
raster_scan: RasterScan = None,
is_secondary_part: bool = False,
prim_part: Particle = None,
sec_part: Particle = None,
tmin=None,
tmax=None,
channelwidth=None,
is_global: bool = False,
):
[docs]
self.uuid = uuid1()
[docs]
self.dataset = dataset
[docs]
self.dataset_ind: int = dataset_ind
# self.get_file = lambda: dataset.get_file()
[docs]
self.file_version = h5_fr.file_version(dataset=dataset)
# self.dataset_particle = self.file[self.name]
if not is_global:
self.is_secondary_part = is_secondary_part
self.prim_part = prim_part
self.sec_part = sec_part
if not self.is_secondary_part:
# self.microtimes = h5_fr.microtimes(particle=self)
# self.abstimes = h5_fr.abstimes(particle=self)
self.num_photons = len(self.abstimes)
else:
# self.microtimes = h5_fr.microtimes2(particle=self)
# self.abstimes = h5_fr.abstimes2(particle=self)
self.num_photons = len(self.abstimes)
self.tcspc_card = h5_fr.tcspc_card(particle=self)
self.int_trace = h5_fr.int_trace(particle=self)
self.cpts = ChangePoints(
self
) # Added by Josh: creates an object for Change Point Analysis (cpa)
self.ahca = AHCA(
self
) # Added by Josh: creates an object for Agglomerative Hierarchical Clustering Algorithm
self.avg_int_weighted = None
self.int_std_weighted = None
if self.is_secondary_part:
self.spectra = self.prim_part.spectra
self._raster_scan_dataset_index = (
self.prim_part._raster_scan_dataset_index
)
self.raster_scan = self.prim_part.raster_scan
self.has_raster_scan = self.prim_part.has_raster_scan
self.description = self.prim_part.description
# self.ab_analysis = self.prim_part.ab_analysis
else:
self.spectra = Spectra(self)
self._raster_scan_dataset_index = raster_scan_dataset_index
self.raster_scan = raster_scan
self.has_raster_scan = raster_scan is not None
self.description = h5_fr.description(particle=self)
self._ab_analysis = AntibunchingAnalysis(self)
self.irf = None
if channelwidth is None and not (
len(self.microtimes) == 0 and len(self.abstimes) == 0
):
differences = np.diff(np.sort(self.microtimes[:]))
possible_channelwidths = np.unique(np.diff(np.unique(differences)))
print(possible_channelwidths)
# filter out very small values that are due to rounding errors:
possible_channelwidths = possible_channelwidths[possible_channelwidths>1e-4]
if len(possible_channelwidths) != 1:
# channelwidth = np.float64(0.01220703125)
channelwidth = possible_channelwidths[0]
logger.warning(
f"Channel width might be wrong. Inspect {self.name}. Used channel width of {channelwidth} ns."
)
else:
channelwidth = possible_channelwidths[0]
else:
channelwidth = np.float64(0.01220703125)
logger.warning(
f"Channel width could not be determined. Inspect {self.name}. A default of {channelwidth} used."
)
self.channelwidth = channelwidth
if tmin is None:
self.tmin = 0
else:
self.tmin = tmin
if tmax is None:
self.tmax = 25
else:
self.tmax = tmax
self.measured = None
self.t = None
self.ignore = False
self.bg = False
self._histogram = None
self._histogram_roi = None
self.use_roi_for_histogram = False
self.binnedtrace = None
self.bin_size = None
try:
self.roi_region = (0, self.abstimes[-1] / 1e9)
except IndexError:
self.roi_region = (0, 0)
self.startpoint = None
self.level_or_group_selected = None
self.using_group_levels = False
self.has_fit_a_lifetime = False
self.has_exported = False
self.is_global = False
else:
self.cpts = ChangePoints(
self
) # Added by Josh: creates an object for Change Point Analysis (cpa)
self.ahca = AHCA(
self
) # Added by Josh: creates an object for Agglomerative Hierarchical Clustering Algorithm
self.is_global = False
@property
[docs]
def file(self):
"""HDF5 file."""
return self.dataset.file
@property
[docs]
def file_group(self):
"""The particle's group in the HDF5 file."""
if self.file is not None:
return self.file[self.name]
@property
[docs]
def microtimes(self) -> h5pickle.Dataset:
"""The particle's microtimes."""
if self.file is not None and self.file.__bool__() is True:
if not self.is_secondary_part:
return h5_fr.microtimes(particle=self)
else:
return h5_fr.microtimes2(particle=self)
@property
[docs]
def abstimes(self) -> h5pickle.Dataset:
"""The particle's absolute times."""
if self.file is not None and self.file.__bool__() is True:
if not self.is_secondary_part:
return h5_fr.abstimes(particle=self)
else:
return h5_fr.abstimes2(particle=self)
@property
[docs]
def histogram(self) -> Histogram:
"""The particle's `Histogram`."""
if not self.use_roi_for_histogram:
hist = self._histogram
else:
hist = self._histogram_roi
return hist
# TODO: These 2 functions have no usages
[docs]
def set_histgram(self, histogram: Histogram) -> None:
self._histogram = histogram
[docs]
def set_histgram_roi(self, histogram: Histogram) -> None:
self._histogram_roi = histogram
@property
[docs]
def use_roi_for_grouping(self) -> bool:
"""Whether to use the particle's ROI for grouping."""
if not self.is_secondary_part:
return self.ahca.use_roi_for_grouping
@property
[docs]
def grouped_with_roi(self) -> bool:
"""Whether the particle's `ahca` used ROI."""
if not self.is_secondary_part:
return self.ahca.grouped_with_roi
@property
[docs]
def roi_region_photon_inds(self) -> Tuple[int, int]:
"""Indices of photons at ROI boundaries."""
first_photon = 0
last_photon = self.num_photons - 1
roi_start = self.roi_region[0]
roi_end = self.roi_region[1]
epsilon_t = 0.1
end_t = self.abstimes[-1] / 1e9
if (
roi_start >= 0 + epsilon_t / 2
or not roi_end - epsilon_t / 2 <= end_t <= roi_end + epsilon_t / 2
):
times = self.abstimes[:] / 1e9
first_photon = np.argmin(roi_start > times)
last_photon = np.argmin(roi_end > times)
return first_photon, last_photon
@property
[docs]
def num_photons_roi(self) -> int:
"""Number of photons in the particle ROI."""
first_photon, last_photon = self.roi_region_photon_inds
return last_photon - first_photon
@property
[docs]
def has_spectra(self) -> bool:
"""Whether the particle has spectra."""
return self.spectra._has_spectra
@property
[docs]
def microtimes_roi(self) -> np.ndarray:
"""Microtimes from the particle ROI."""
times = np.array(self.abstimes) / 1e9
if self.roi_region[0] == 0:
first_ind = 0
else:
where_start = np.where(self.roi_region[0] <= times)
if len(where_start[0]):
first_ind = where_start[0][0]
else:
first_ind = 0
where_end = np.where(self.roi_region[1] <= times)
if len(where_end[0]):
last_ind = where_end[0][0] + 1
else:
last_ind = len(times)
return self.microtimes[first_ind:last_ind]
@property
[docs]
def first_level_ind_in_roi(self):
"""Index of the first level in the particle's ROI."""
if self.has_levels:
end_times = np.array([level.times_s[1] for level in self.cpts.levels])
first_roi_ind = np.argmax(end_times > self.roi_region[0])
return int(first_roi_ind)
@property
[docs]
def last_level_ind_in_roi(self):
"""Index of the last level in the particle's ROI."""
if len(self.roi_region) == 3:
last_roi_ind = self.roi_region[2]
else:
end_times = np.array([level.times_s[1] for level in self.cpts.levels])
last_roi_ind = np.argmax(
np.round(end_times, 3) >= np.round(self.roi_region[1], 3)
)
return int(last_roi_ind)
@property
[docs]
def first_group_level_ind_in_roi(self):
"""Index of the first grouped level in the particle's ROI."""
if not self.is_secondary_part:
if self.has_groups and self.group_levels is not None:
end_times = np.array([level.times_s[1] for level in self.group_levels])
first_group_roi_ind = np.argmax(end_times > self.roi_region[0])
return int(first_group_roi_ind)
@property
[docs]
def last_group_level_ind_in_roi(self):
"""Index of the last grouped level in the particle's ROI."""
# if self.has_groups and self.group_levels is not None:
if not self.is_secondary_part:
if self.group_levels is not None:
# if len(self.roi_region) == 3:
# last_roi_ind = self.roi_region[2]
# else:
end_times = np.array([level.times_s[1] for level in self.group_levels])
last_group_roi_ind = np.argmax(
np.round(end_times, 3) >= np.round(self.roi_region[1], 3)
)
return int(last_group_roi_ind)
@property
[docs]
def raster_scan_coordinates(self) -> tuple:
"""The particle's RS coordinates."""
if self.has_raster_scan:
coords = h5_fr.raster_scan_coord(particle=self)
return coords[1], coords[0]
else:
return None, None
@property
[docs]
def has_levels(self):
"""Whether the particle has levels."""
return self.cpts.has_levels
@property
[docs]
def has_groups(self):
"""Whether the particle has groups."""
if not self.is_secondary_part:
return self.ahca.has_groups
@property
[docs]
def has_corr(self):
"""Whether the particle has a second-order correlation."""
return self.ab_analysis.has_corr
@property
[docs]
def ab_analysis(self):
if self.is_secondary_part:
return self.prim_part._ab_analysis
else:
return self._ab_analysis
@ab_analysis.setter
def ab_analysis(self, ab_analysis):
if not self.is_secondary_part:
self._ab_analysis = ab_analysis
@property
[docs]
def groups(self) -> List[grouping.Group]:
"""The particle's grouped levels."""
if not self.is_secondary_part and self.has_groups:
return self.ahca.selected_step.groups
@property
[docs]
def num_groups(self):
"""The number of level groups."""
if not self.is_secondary_part and self.has_groups:
return self.ahca.selected_step.num_groups
@property
[docs]
def groups_bounds(self):
"""The particle's groups' bounds."""
if not self.is_secondary_part:
return self.ahca.selected_step.calc_int_bounds()
@property
[docs]
def groups_ints(self):
"""The particle's group intensities."""
if not self.is_secondary_part:
return self.ahca.selected_step.group_ints
@property
[docs]
def grouping_bics(self):
"""The particle's group BIC's."""
if not self.is_secondary_part:
return self.ahca.bics
@property
[docs]
def grouping_selected_ind(self):
"""The currently selected group index."""
if not self.is_secondary_part:
return self.ahca.selected_step_ind
@property
[docs]
def best_grouping_ind(self):
"""The index of the grouping step with max BIC."""
if not self.is_secondary_part:
return self.ahca.best_step_ind
@grouping_selected_ind.setter
def grouping_selected_ind(self, ind: int):
"""The index of the selected grouping step."""
if not self.is_secondary_part:
self.ahca.selected_step_ind = ind
@property
[docs]
def grouping_num_groups(self):
"""The number of groups in each AHCA step"""
if not self.is_secondary_part:
return self.ahca.steps_num_groups
[docs]
def reset_grouping_ind(self):
"""Reset the grouping step selection."""
if not self.is_secondary_part:
self.ahca.reset_selected_step()
@property
[docs]
def levels(self):
"""The particle's raw or grouped levels."""
if not self.is_secondary_part and self.has_groups and self.using_group_levels:
return self.group_levels
else:
return self.cpts.levels
# TODO: The following 4 functions don't seem to be entirely sensible
@property
[docs]
def levels_roi(self):
"""The particle's raw or grouped levels, from ROI."""
if not self.is_secondary_part and self.has_groups and self.using_group_levels:
return self.group_levels
else:
return self.cpts.levels[
self.first_level_ind_in_roi : self.last_level_ind_in_roi + 1
]
@property
[docs]
def levels_roi_force(self):
"""The particle's raw levels, from ROI."""
return self.cpts.levels[
self.first_level_ind_in_roi : self.last_level_ind_in_roi + 1
]
@property
[docs]
def group_levels(self) -> List[Union[Level, GlobalLevel]]:
"""The particle's grouped levels."""
if not self.is_secondary_part and self.has_groups:
return self.ahca.selected_step.group_levels
@property
[docs]
def group_levels_roi(self) -> List[Level]:
"""The particle's grouped levels, from ROI."""
if not self.is_secondary_part and self.has_groups:
group_levels = self.group_levels
return group_levels[
self.first_group_level_ind_in_roi : self.last_group_level_ind_in_roi + 1
]
@property
[docs]
def num_levels(self):
"""Number of raw or grouped particle levels."""
if self.has_groups and self.using_group_levels:
return self.ahca.selected_step.group_num_levels
else:
return self.cpts.num_levels
@property
[docs]
def num_levels_roi(self):
"""Number of raw particle levels in ROI."""
return (self.last_level_ind_in_roi - self.first_level_ind_in_roi) + 1
@property
[docs]
def dwell_time_s(self):
"""The particle's total measurement time."""
return (self.abstimes[-1] - self.abstimes[0]) / 1e9
@property
[docs]
def dwell_time_roi(self):
"""The particle's ROI measurement time."""
if self.has_levels:
return self.levels_roi[-1].times_s[1] - self.levels_roi[0].times_s[0]
else:
first_photon_ind, last_photon_ind = self.roi_region_photon_inds
return (
self.abstimes[last_photon_ind] - self.abstimes[first_photon_ind]
) / 1e9
@property
[docs]
def level_ints(self):
"""The particle's raw or grouped level intensities."""
if not self.is_secondary_part and self.has_groups and self.using_group_levels:
return self.ahca.selected_step.group_level_ints
else:
return self.cpts.level_ints
@property
[docs]
def level_ints_roi(self):
"""The particle's ROI raw or grouped level intensities."""
# TODO: shouldn't grouped levels also use ROI?
if not self.is_secondary_part and self.has_groups and self.using_group_levels:
return self.ahca.selected_step.group_level_ints
else:
return self.cpts.level_ints[
self.first_level_ind_in_roi : self.last_level_ind_in_roi
]
@property
[docs]
def level_dwelltimes(self):
"""The particle's raw or grouped level dwelltimes."""
if not self.is_secondary_part and self.has_groups and self.using_group_levels:
return self.ahca.selected_step.group_level_dwelltimes
else:
return self.cpts.level_dwelltimes
@property
[docs]
def level_dwelltimes_roi(self):
"""The particle's ROI raw or grouped level dwelltimes."""
if not self.is_secondary_part and self.has_groups and self.using_group_levels:
return self.ahca.selected_step.group_level_dwelltimes
else:
return self.cpts.level_dwelltimes[
self.first_level_ind_in_roi : self.last_level_ind_in_roi
]
@property
[docs]
def has_burst(self) -> bool:
"""Whether the particle has a 'photon burst'."""
return self.cpts.has_burst
@property
[docs]
def burst_levels(self) -> np.ndarray:
"""Levels containing a 'photon burst'."""
return self.cpts.burst_levels
@has_burst.setter
def has_burst(self, value: bool):
self.cpts.has_burst = value
@burst_levels.setter
def burst_levels(self, value: np.ndarray):
self.cpts.burst_levels = value
@property
[docs]
def numexp(self):
"""Number of exponents in fitted decay model."""
return self.histogram.numexp
@property
[docs]
def unique_name(self):
"""Unique particle name in the case of dual-channel measurement."""
if self.is_secondary_part:
return self.name + "_2"
else:
return self.name
@property
[docs]
def has_global_grouping(self) -> bool:
"""Whether the H5dataset has global groups."""
if hasattr(self, "dataset") and hasattr(self.dataset, "global_particle"):
gp = self.dataset.global_particle
return (
gp is not None
and self.dataset_ind in gp.contributing_particles_dataset_inds
)
else:
return False
@property
[docs]
def global_particle(self) -> GlobalParticle:
"""The H5dataset's global particle."""
if self.has_global_grouping:
return self.dataset.global_particle
@property
[docs]
def global_group_levels(self):
"""The H5dataset's globally grouped levels."""
if self.has_global_grouping:
gp = self.global_particle
return list(
filter(
lambda l: l.parent_particle_dataset_ind == self.dataset_ind,
gp.global_levels,
)
)
[docs]
def remove_and_reset_grouping(self):
"""Re-initialize grouping and remove current data."""
if not self.is_secondary_part:
self.ahca = AHCA(particle=self)
self.using_group_levels = False
[docs]
def levels2data(
self,
use_grouped: bool = None,
use_roi: bool = False,
use_global_groups: bool = False,
) -> Tuple[np.ndarray, np.ndarray]:
"""Level plotting data.
Uses the Particle objects' levels to generate two arrays for
plotting the levels.
Arguments
---------
use_grouped : bool, optional
Whether to use grouped levels - if not provided, defaults to
True if groups exist and false if they don't.
use_global_groups : bool = False
whether to use globally grouped levels
use_roi : bool = False
whether to use ROI
Returns
-------
ints, times : Tuple[np.ndarray, np.ndarray]
Intensities as a function of time for plotting.
"""
assert self.has_levels, "ChangePointAnalysis:\tNo levels to convert to data."
# if self.is_secondary_part:
# return None, None
if use_grouped is None:
use_grouped = self.has_groups and self.using_group_levels
did_use_global = False
if use_grouped or use_global_groups:
if use_global_groups:
levels = self.global_group_levels
did_use_global = True
elif not use_roi:
levels = self.group_levels
else:
levels = self.group_levels_roi
else:
if not use_roi:
levels = self.cpts.levels
else:
levels = self.levels_roi
# if not use_global_groups:
times = np.array([[level.times_s[0], level.times_s[1]] for level in levels])
# else:
# times = np.array(
# [
# [
# level.times_s[0] - level.start_time_offset_ns / 1e9,
# level.times_s[1] - level.start_time_offset_ns / 1e9,
# ]
# for level in levels
# ]
# )
times = times.flatten()
ints = np.array([[level.int_p_s, level.int_p_s] for level in levels])
ints = ints.flatten()
return ints, times
[docs]
def lifetimes2data(
self, use_grouped: bool = None, use_roi: bool = False
) -> Tuple[np.ndarray, np.ndarray]:
"""Level lifetime plotting data.
Uses the Particle object's levels to generate two arrays for
plotting the level lifetimes.
Arguments
---------
use_grouped : bool, optional
Whether to use grouped levels - if not provided, defaults to
True if groups exist and false if they don't.
use_roi : bool = False
Whether to use ROI.
Returns
-------
lifetimes, times : Tuple[np.ndarray, np.ndarray]
Lifetime as a function of time for plotting.
"""
assert (
self.has_fit_a_lifetime
), "ChangePointAnalysis:\tNo levels to convert to data."
if use_grouped is None:
use_grouped = self.has_groups and self.using_group_levels
if not use_grouped:
if not use_roi:
levels = self.cpts.levels
else:
levels = self.levels_roi
else:
if not use_roi:
levels = self.group_levels
else:
levels = self.group_levels_roi
times = np.array(
[
[level.times_s[0], level.times_s[1]]
for level in levels
if level.histogram.fitted
]
)
times = times.flatten()
lifetimes = np.array(
[
[level.histogram.avtau, level.histogram.avtau]
for level in levels
if level.histogram.fitted
]
)
lifetimes = lifetimes.flatten()
return lifetimes, times
[docs]
def current2data(
self, level_ind: int, use_roi: bool = False
) -> Tuple[np.ndarray, np.ndarray]:
"""Current level plotting data.
Uses the Particle object's selected level to generate
two arrays for plotting level.
Arguments
---------
level_ind : int
Index of the level to be plotted.
use_roi : bool = False
Whether to use the ROI.
Returns
-------
levels_data, times : Tuple[np.ndarray, np.ndarray]
Intensity as a function of time for plotting.
"""
# TODO: Cleanup this function anc the one above it
assert self.has_levels, "ChangePointAnalysis:\tNo levels to convert to data."
if not use_roi:
level = self.levels[level_ind]
else:
level = self.levels_roi[level_ind]
times = np.array(level.times_ns) / 1e9
levels_data = np.array([level.int_p_s, level.int_p_s])
return levels_data, times
[docs]
def current_group2data(self, group_ind: int) -> [np.ndarray, np.ndarray]:
"""Current group plotting data.
Uses the Particle object's selected group to generate
two arrays for plotting the group.
Arguments
---------
group_ind : int
Index of the level to be plotted.
Returns
-------
group_int, times : Tuple[np.ndarray, np.ndarray]
Intensity as a function of time for plotting.
"""
assert self.has_groups, "ChangePointAnalysis:\tNo groups to convert to data."
if self.is_secondary_part:
return
group = self.groups[group_ind]
times = np.array([self.abstimes[0], self.abstimes[-1]]) / 1e9
group_int = np.array([group.int_p_s, group.int_p_s])
return group_int, times
[docs]
def makehistograms(self, remove_zeros, startpoint, channel):
"""Make all histograms - whole trace and levels.
Arguments
---------
remove_zeros : bool
Whether to remove zeros at the start of the decay.
startpoint : int
Startpoint of the decay in number of time steps.
channel : bool
Whether to use the hardware channelwidth
TODO: remove this parameter including downstream as it is never used
"""
self.startpoint = startpoint
self.makehistogram(channel=channel, add_roi=True)
self.makelevelhists(channel=channel)
if remove_zeros:
maxim = 0
try:
maxim = max(self.histogram.decaystart, maxim)
except AttributeError:
maxim = 0
self.histogram.decay = self.histogram.decay[maxim:]
self.histogram.t = self.histogram.t[maxim:]
[docs]
def makehistogram(self, channel=True, add_roi: bool = False):
"""Put the arrival times into a histogram.
Arguments
---------
channel : bool = True
Whether to use the hardware channelwidth.
add_roi : bool = True
Whether to create the ROI Histogram as well.
"""
self._histogram = Histogram(self, start_point=self.startpoint, channel=channel)
if add_roi:
self._histogram_roi = Histogram(
self, start_point=self.startpoint, channel=channel, is_for_roi=True
)
[docs]
def makelevelhists(
self,
channel: bool = True,
force_cpts_levels: bool = False,
force_group_levels: bool = False,
):
"""Make level histograms.
Arguments
---------
channel : bool = True
Whether to use the hardware channelwidth.
force_cpts_levels : bool = False
Use self.cpts.levels instead of self.levels.
force_group_levels : bool = False
Use self.group_levels instead of self.levels.
"""
if self.has_levels:
if force_cpts_levels or force_group_levels:
levels = list()
if force_cpts_levels:
levels.extend(self.cpts.levels)
if force_group_levels:
levels.extend(self.group_levels)
else:
levels = self.levels
for level in levels:
level.histogram = Histogram(
self, level, self.startpoint, channel=channel
)
[docs]
def makegrouplevelhists(self):
"""Make grouped level histograms."""
if (
not self.is_secondary_part
and self.has_groups
and self.ahca.selected_step.groups_have_hists
):
if self.ahca.num_steps == 1:
self.groups[0].histogram = self.ahca.steps[0].groups[0].histogram
else:
groups = self.groups
for group_level in self.group_levels:
g_ind = group_level.group_ind
group_level.histogram = groups[g_ind].histogram
[docs]
def makegrouphists(self, channel=True):
"""Make group histograms."""
if not self.is_secondary_part and self.has_groups:
for group in self.groups:
group.histogram = Histogram(
self, group.lvls_inds, self.startpoint, channel=channel
)
self.ahca.selected_step.groups_have_hists = True
[docs]
def binints(self, binsize: int):
"""Bin the absolute times into intensity trace.
Arguments
---------
binsize : int
Size of intensity trace time bins.
"""
self.bin_size = binsize
self.binnedtrace = Trace(self, self.bin_size)
[docs]
def trim_trace(
self, min_level_dwell_time: float, min_level_int: int, reset_roi: bool = True
):
"""Trim the intensity trace.
This function trims the intensity trace to remove the photobleached end.
Arguments
---------
min_level_dwell_time : float
Minimum dwell time of bleached level for trimming.
min_level_int : int
Minimum intensity to classify level as not bleached.
reset_roi : bool = True
Whether to update the ROI to the trimmed area.
"""
trimmed = None
if self.has_levels and self.level_ints[-1] < min_level_int:
trimmed = False
trim_time_total = 0
first_valid_reversed_ind = None
for ind_reverse in reversed(range(0, self.num_levels)):
if self.level_ints[ind_reverse] <= min_level_int:
trim_time_total += self.level_dwelltimes[ind_reverse]
first_valid_reversed_ind = ind_reverse
else:
first_valid_reversed_ind = ind_reverse
break
if trim_time_total >= min_level_dwell_time and first_valid_reversed_ind > 1:
last_active_time = self.levels[first_valid_reversed_ind].times_s[1]
min_time = 0
if not reset_roi:
if (
last_active_time > self.roi_region[0]
and last_active_time - self.roi_region[0] > 0.5
):
min_time = self.roi_region[0]
else:
min_time = last_active_time - 0.5
if last_active_time > self.roi_region[1]:
last_active_time = self.roi_region[1]
if min_time >= 0:
self.roi_region = (
min_time,
last_active_time,
first_valid_reversed_ind,
)
trimmed = True
return trimmed
[docs]
class FakeCpts:
"""Fake ChangePoints object for GlobalParticle.
Parameters
----------
num_levels : int
number of intensity levels
levels : List[GlobalLevel]
list of global levels
"""
def __init__(self, num_levels: int, levels: list):
[docs]
self.num_levels = num_levels
[docs]
self.num_cpts = num_levels - 1
[docs]
class GlobalParticle:
"""Particle-like object containing levels from all particles.
Parameters
----------
particles : List[Particle]
Particles to include in global particle.
use_roi : bool = False
Whether to use ROI's.
"""
def __init__(self, particles: List[Particle], use_roi: bool = False):
[docs]
self.is_global = True
[docs]
self.name = "Global Particle"
levels = []
start_time_offset_ns = 0
for p in particles:
p_levels = p.levels_roi if use_roi else p.levels
for l in p_levels:
level = GlobalLevel(
global_particle=self,
parent_particle_dataset_ind=p.dataset_ind,
particle_levels=[l],
int_p_s=l.int_p_s,
group_ind=l.group_ind,
start_time_offset_ns=start_time_offset_ns,
dwell_time_ns=l.dwell_time_ns,
num_photons=l.num_photons,
)
levels.append(level)
start_time_offset_ns += p_levels[-1].times_ns[1]
[docs]
self.contributing_particles_dataset_inds = [p.dataset_ind for p in particles]
[docs]
self.levels = levels
[docs]
self.num_levels = len(levels)
[docs]
self.num_levels_roi = self.num_levels
[docs]
self.dwell_time = np.sum([l.dwell_time_s for l in self.levels])
[docs]
self.num_photons = np.sum([l.num_photons for l in self.levels])
[docs]
self.uuid = uuid1()
[docs]
self.use_roi_for_grouping = False
self.cpts = FakeCpts(num_levels=self.num_levels, levels=self.levels)
[docs]
self.ahca = AHCA(particle=self)
@property
[docs]
def has_levels(self):
return self.cpts.has_levels
@property
[docs]
def has_groups(self):
return self.ahca.has_groups
@property
[docs]
def groups(self):
if self.has_groups:
return self.ahca.selected_step.groups
@property
[docs]
def num_groups(self):
if self.has_groups:
return self.ahca.selected_step.num_groups
@property
[docs]
def groups_bounds(self):
return self.ahca.selected_step.calc_int_bounds()
@property
[docs]
def groups_ints(self):
return self.ahca.selected_step.group_ints
@property
[docs]
def grouping_bics(self):
return self.ahca.bics
@property
[docs]
def grouping_selected_ind(self):
return self.ahca.selected_step_ind
@property
[docs]
def best_grouping_ind(self):
return self.ahca.best_step_ind
@grouping_selected_ind.setter
def grouping_selected_ind(self, ind: int):
self.ahca.selected_step_ind = ind
@property
[docs]
def grouping_num_groups(self):
return self.ahca.steps_num_groups
@property
[docs]
def level_particle_dataset_inds(self):
return [l.parent_particle_dataset_ind for l in self.global_levels]
@property
[docs]
def global_levels(self) -> list:
if self.ahca.has_groups:
return self.ahca.selected_step.group_levels
[docs]
def run_grouping(self):
self.ahca.run_grouping()
# all_times = []
# all_ints = []
# all_particle_names = []
# for l in levels:
# all_times.extend(l.times_s)
# all_ints.extend([l.int_p_s, l.int_p_s])
# all_particle_names.extend([l.particle.name, l.particle.name])
# df = pd.DataFrame(data={
# "times": all_times,
# "ints": all_ints,
# "particle": all_particle_names
# })
[docs]
class Trace:
"""Binned intensity trace.
Parameters
----------
particle : Particle
The Particle which creates the Trace.
binsize : int
Size of time bin in ms.
"""
def __init__(self, particle: Particle, binsize: int):
data = particle.abstimes[:]
binsize_ns = binsize * 1e6 # Convert ms to ns
if len(data) == 0:
self.intdata = np.array([])
self.inttimes = np.array([])
return
# Use numpy.histogram to bin the data
max_time = np.max(data)
num_bins = int(np.ceil(max_time / binsize_ns))
bin_edges = np.arange(0, (num_bins + 1) * binsize_ns, binsize_ns)
binned, _ = np.histogram(data, bins=bin_edges)
# try:
# endbin = int(np.max(data) / binsize_ns)
# except ValueError:
# endbin = 0
#
# binned = np.zeros(endbin + 1, dtype=int)
# for step in range(endbin):
# binned[step + 1] = np.size(
# data[((step + 1) * binsize_ns > data) * (data > step * binsize_ns)]
# )
# if step == 0:
# binned[step] = binned[step + 1]
# binned *= (1000 / 100)
[docs]
self.inttimes = bin_edges[:-1] / 1e6 # Convert back to ms
[docs]
class Histogram:
"""TCSPC histogram.
This class represents histogrammed TCSPC arrival times (micro times)
as well as multi-exponential fits thereof.
Parameters
----------
particle : Particle
The parent Particle of this object.
level : Level or List = None
The possible parent level of this object.
start_point : float = None
Start point for lifetime fit.
channel : bool = True
Whether to use hardware channel width.
trim_start : bool = False
Whether to trim zeros at the start of the histogram.
is_for_roi : bool = False
Whether this histogram is from a trace ROI.
"""
def __init__(
self,
particle: Particle,
level: Union[Level, List[int]] = None,
start_point: float = None,
channel: bool = True,
trim_start: bool = False,
is_for_roi: bool = False,
):
assert not (level is not None and is_for_roi), "ROI can't be used for a Level"
[docs]
self.is_for_roi = is_for_roi
[docs]
self.fitted_with_roi = None
[docs]
self.roi_region_used = None
no_sort = False
[docs]
self._particle = particle
[docs]
self.original_kwargs = {
"start_point": start_point,
"channel": channel,
"trim_start": trim_start,
}
self.setup(level=level, use_roi=is_for_roi, **self.original_kwargs)
self.convd = None
self.convd_t = None
[docs]
self.decay_roi_start_ns = None
[docs]
self.decay_roi_end_ns = None
[docs]
self.num_photons_used = None
[docs]
def setup(
self,
level: Union[Level, List[int]] = None,
start_point: float = None,
channel: bool = True,
trim_start: bool = False,
use_roi: bool = False,
):
"""Set up the object.
This method can be called to re-do setup steps without reinitializing.
Arguments
---------
level : Level or List = None
The possible parent level of this object.
start_point : float = None
Start point for lifetime fit.
channel : bool = True
Whether to use hardware channel width.
trim_start : bool = False
Whether to trim zeros at the start of the histogram.
use_roi : bool = False
Whether this histogram is from a trace ROI.
"""
no_sort = False
if level is None:
if not use_roi:
self.microtimes = self._particle.microtimes[:]
else:
self.microtimes = self._particle.microtimes_roi
self.roi_region_used = self._particle.roi_region
elif type(level) is list:
if not self._particle.has_groups:
logger.error("Multiple levels provided, but has no groups")
raise RuntimeError("Multiple levels provided, but has no groups")
self.microtimes = np.array([])
for ind in level:
self.microtimes = np.append(
self.microtimes, self._particle.cpts.levels[ind].microtimes
)
else:
self.microtimes = self.level.microtimes[:]
if self.microtimes.size == 0:
self.decay = np.empty(1)
self.t = np.empty(1)
else:
# tmin = self.microtimes.min()
tmin = min(self._particle.tmin, self.microtimes.min())
tmax = max(self._particle.tmax, self.microtimes.max())
if start_point is None:
pass
else:
if channel:
start_point = int(start_point)
t = np.arange(tmin, tmax, self._particle.channelwidth)
tmin = t[start_point]
no_sort = True
else:
tmin = start_point
tmax = max(self._particle.tmax, self.microtimes.max())
sorted_micro = np.sort(self.microtimes)
if not no_sort and trim_start:
tmin = sorted_micro[
np.searchsorted(sorted_micro, tmin)
] # Make sure bins align with TCSPC bins
tmax = sorted_micro[
np.searchsorted(sorted_micro, tmax) - 1
] # - 1 # Fix if max is end
window = tmax - tmin
numpoints = int(window // self._particle.channelwidth)
t = np.arange(tmin, tmax, self._particle.channelwidth)
self.decay, self.t = np.histogram(self.microtimes, bins=t)
self.t = self.t[:-1] # Remove last value so the arrays are the same size
where_neg = np.where(self.t <= 0)
self.t = np.delete(self.t, where_neg)
self.decay = np.delete(self.decay, where_neg)
assert len(self.t) == len(self.decay), (
"Time series must be same length as decay " "histogram"
)
if start_point is None and trim_start:
try:
self.decaystart = np.nonzero(self.decay)[0][0]
except IndexError: # Happens when there is a level with no photons
pass
else:
if level is not None:
self.decay, self.t = start_at_value(
self.decay, self.t, neg_t=False, decaystart=self.decaystart
)
else:
self.decaystart = 0
try:
self.t -= self.t.min()
except ValueError:
dbg.p(
f"Histogram object of {self._particle.name} does not have a valid"
f" self.t attribute",
"Histogram",
)
[docs]
def update_roi(self):
"""Rerun setup to update ROI."""
self.setup(level=self.level, use_roi=True, **self.original_kwargs)
@property
[docs]
def t(self):
return self._t.copy()
@t.setter
def t(self, value):
self._t = value
[docs]
def fit(
self,
numexp,
tauparam,
ampparam,
shift,
decaybg,
irfbg,
boundaries,
addopt,
irf,
fwhm=None,
normalize_amps=True,
maximum_likelihood=False
):
"""Fit a multiexponential decay to the histogram.
This function mainly calls the relevant code from `tcspcfit`.
Arguments
---------
numexp : int
Number of exponentials in fit function (1-3).
tauparam : array_like
Initial guess times (in ns). This is either in the format
[tau1, tau2, ...] or [[tau1, min1, max1, fix1], [tau2, ...], ...].
When the "fix" value is False, the min and max values are ignored.
ampparam : array_like
Initial guess amplitude. Format [amp1, amp2, ...] or [[amp1, fix1],
[amp2, fix2], ...]
shift : array_like
Initial guess IRF shift. Either a float, or [shift, min, max, fix].
decaybg : float
Background value for decay. Will be estimated if not given.
irfbg : float
Background value for IRF. Will be estimated if not given.
boundaries : list
Start and end of fitting range as well as options for automatic
determination of the parameters as used by `FluoFit.calculate_boundaries`.
irf : ndarray
Instrumental Response Function
fwhm : float = None
Full-width at half maximum of simulated irf. IRF is not simulated if fwhm is None.
addopt : Dict = None
Additional options for `scipy.optimize.curve_fit` (such as optimization parameters).
normalize_amps : bool = True
Whether to use normalized lifetime amplitudes.
maximum_likelihood : bool = False
Whether to use maximum likelihood fitting (otherwise use least squares).
"""
if addopt is not None:
addopt = ast.literal_eval(addopt)
self.numexp = numexp
if maximum_likelihood:
method = 'ml'
decaybg = [5, 0, 50, 0]
else:
method = 'ls'
# TODO: debug option that would keep the fit object (not done normally to conserve memory)
try:
if numexp == 1:
fit = tcspcfit.OneExp(
irf,
self.decay,
self.t,
self._particle.channelwidth,
tauparam,
None,
shift,
decaybg,
irfbg,
boundaries,
addopt,
fwhm=fwhm,
method=method
)
elif numexp == 2:
fit = tcspcfit.TwoExp(
irf,
self.decay,
self.t,
self._particle.channelwidth,
tauparam,
ampparam,
shift,
decaybg,
irfbg,
boundaries,
addopt,
fwhm=fwhm,
normalize_amps=normalize_amps,
method=method
)
elif numexp == 3:
fit = tcspcfit.ThreeExp(
irf,
self.decay,
self.t,
self._particle.channelwidth,
tauparam,
ampparam,
shift,
decaybg,
irfbg,
boundaries,
addopt,
fwhm=fwhm,
normalize_amps=normalize_amps,
method=method
)
except Exception as e:
trace_string = ""
for trace_part in traceback.format_tb(e.__traceback__):
trace_string = trace_string + trace_part
logger.error(e.args[0] + "\n\n" + trace_string[:-1])
# print(traceback.format_exc().split('\n')[-2])
return False
else:
self.fit_decay = fit.measured
self.convd = fit.convd
self.convd_t = fit.t
self.tau = fit.tau
self.amp = fit.amp
self.shift = fit.shift
self.bg = fit.bg
self.irfbg = fit.irfbg
self.fwhm = fit.fwhm
self.stds = fit.stds
self.avtaustd = fit.avtaustd
self.chisq = fit.chisq
self.dw = fit.dw
self.dw_bound = fit.dw_bound
self.residuals = fit.residuals
self.fitted = True
if numexp == 1:
self.avtau = self.tau
else:
self.avtau = sum(np.array(self.tau) * np.array(self.amp)) / sum(
self.amp
)
self.decay_roi_start_ns = fit.startpoint * self._particle.channelwidth
self.decay_roi_end_ns = fit.endpoint * self._particle.channelwidth
self.num_photons_used = np.sum(fit.measured_not_normalized)
return True
[docs]
def levelhist(self, level):
# TODO: Remove this function as it is never used
levelobj = self._particle.levels[level]
tmin = levelobj.microtimes[:].min()
tmax = levelobj.microtimes[:].max()
window = tmax - tmin
numpoints = int(window // self._particle.channelwidth)
t = np.linspace(0, window, numpoints)
decay, t = np.histogram(levelobj.microtimes[:], bins=t)
t = t[:-1] # Remove last value so the arrays are the same size
return decay, t
@staticmethod
[docs]
def start_at_value(decay, t, neg_t=True, decaystart=None):
"""Helper method for setting decay startpoint."""
if decaystart is None:
decaystart = np.nonzero(decay)[0][0]
if neg_t:
t -= t[decaystart]
t = t[decaystart:]
decay = decay[decaystart:]
return decay, t
[docs]
class ParticleAllHists:
"""Class containing all Histograms from Particle.
Parameters
----------
particle : Particle
The parent particle of this object.
"""
def __init__(self, particle: Particle):
[docs]
self.part_uuid = particle.uuid
[docs]
self.numexp = None
[docs]
self.part_hist = particle.histogram
[docs]
self.has_level_hists = particle.has_levels
[docs]
self.level_hists = list()
if particle.has_levels:
for level in particle.cpts.levels:
if hasattr(level, "histogram") and level.histogram is not None:
self.level_hists.append(level.histogram)
[docs]
self.has_group_hists = particle.has_groups
[docs]
self.group_hists = list()
if particle.has_groups:
for group in particle.groups:
if group.histogram is None:
group.histogram = Histogram(
particle=particle,
level=group.lvls_inds,
start_point=particle.startpoint,
)
self.group_hists.append(group.histogram)
[docs]
def fit_part_and_levels(
self, channelwidth, start, end, fit_param: FittingParameters
):
"""Fit all Histograms in this object.
Arguments
---------
channelwidth : float
TCSPC channelwidth (time step size) in ns.
start : int
Fitting startpoint in number of time steps.
end : int
Fitting endpoint in number of time steps.
fit_param : FittingParameters
Object containing fit parameters.
"""
self.numexp = fit_param.numexp
all_hists = [self.part_hist]
all_hists.extend(self.level_hists)
all_hists.extend(self.group_hists)
shift = fit_param.shift[:-1] / channelwidth
shiftfix = fit_param.shift[-1]
shift = [*shift, shiftfix]
boundaries = [start, end, fit_param.autostart, fit_param.autoend]
for hist in all_hists:
if hist.microtimes.size > 10:
try:
if not hist.fit(
fit_param.numexp,
fit_param.tau,
fit_param.amp,
shift,
fit_param.decaybg,
fit_param.irfbg,
boundaries,
fit_param.addopt,
fit_param.irf,
fit_param.fwhm,
fit_param.normalize_amps,
fit_param.maximum_likelihood
):
pass # fit unsuccessful
except AttributeError:
logger.info("Level or trace not fitted. No decay.")
[docs]
class RasterScan:
"""Class containing raster scan data.
A raster scan is a 2D intensity scan used to visualize particles
before measurement.
Parameters
----------
h5dataset : H5dataset
The parent HDF5 dataset object.
particle_num : int
Number of particles connected to this raster scan.
h5dataset_index : int
Index of this raster scan in the dataset.
particle_indexes : List[int]
Dataset indices of the particles in this raster scan.
"""
def __init__(
self,
h5dataset: H5dataset,
particle_num: int,
h5dataset_index: int,
particle_indexes: List[int],
):
[docs]
self.h5dataset = h5dataset
[docs]
self.particle_num = particle_num
[docs]
self.h5dataset_index = h5dataset_index
[docs]
self.particle_indexes = particle_indexes
[docs]
self.integration_time = h5_fr.rs_integration_time(part_or_rs=self)
[docs]
self.pixel_per_line = h5_fr.rs_pixels_per_line(part_or_rs=self)
[docs]
self.range = h5_fr.rs_range(part_or_rs=self)
[docs]
self.x_start = h5_fr.rs_x_start(part_or_rs=self)
[docs]
self.y_start = h5_fr.rs_y_start(part_or_rs=self)
[docs]
self.x_axis_pos = np.linspace(
self.x_start, self.x_start + self.range, self.pixel_per_line
)
[docs]
self.y_axis_pos = np.linspace(
self.y_start, self.y_start + self.range, self.pixel_per_line
)
@property
[docs]
def dataset(self) -> h5pickle.Dataset:
if self.h5dataset.file is not None and self.h5dataset.file.__bool__() is True:
return h5_fr.raster_scan(h5_fr.particle(self.particle_num, self.h5dataset))
[docs]
class Spectra:
"""Class containing spectral data.
Spectra are recorded as a time scan, with a certain integration time
over which a single spectrum is recorded using a grating and CCD.
Parameters
----------
particle : Particle
The parent particle object.
"""
def __init__(self, particle: Particle):
[docs]
self._particle = particle
@property
[docs]
def _has_spectra(self) -> bool:
if self._particle.file is not None and self._particle.file.__bool__() is True:
return h5_fr.has_spectra(particle=self._particle)
@property
[docs]
def data(self) -> h5pickle.Dataset:
if self._particle.file is not None and self._particle.file.__bool__() is True:
return h5_fr.spectra(particle=self._particle) if self._has_spectra else None
@property
[docs]
def wavelengths(self) -> np.ndarray:
if self._particle.file is not None and self._particle.file.__bool__() is True:
return (
h5_fr.spectra_wavelengths(particle=self._particle)
if self._has_spectra
else None
)
@property
[docs]
def series_times(self) -> np.ndarray:
if self._particle.file is not None and self._particle.file.__bool__() is True:
return (
h5_fr.spectra_abstimes(particle=self._particle)
if self._has_spectra
else None
)