"""Module to calculates, stores and provides access to the sums that are used for the
change point detection algorithm in the change_point module.
Written by Joshua Botha, University of Pretoria, Physics department.
"""
from __future__ import annotations
import os
import numpy as np
import pickle
import dbg
import file_manager as fm
from processes import ProcessProgFeedback, ProcessProgress
from my_logger import setup_logger
[docs]
logger = setup_logger(__name__)
[docs]
def calc_and_store_sums(n_min: int, n_max: int, prog_fb: ProcessProgFeedback = None):
cp_sums = CPSums(n_min=n_min, n_max=n_max)
cp_sums._calc_and_store()
#
# n_range = n_max - n_min
# sums_u_k = np.empty(shape=(n_max, n_range), dtype=np.float64)
# sums_u_n_k = np.empty(shape=(n_max, n_range), dtype=np.float64)
# sums_v2_k = np.empty(shape=(n_max, n_range), dtype=np.float64)
# sums_v2_n_k = np.empty(shape=(n_max, n_range), dtype=np.float64)
# sums_sig_e = np.empty(shape=n_range, dtype=np.float64)
#
# all_sums = dict()
#
# if prog_fb:
# prog_fb.set_status(status="Calculating sums...")
# progress = ProcessProgress(prog_fb=prog_fb,
# num_iterations=n_max - n_min)
#
# for n in range(n_min + 1, n_max + 1):
# sums_sig_e[n - n_min - 1] = (np.pi ** 2) / 6 \
# - sum(1 / j ** 2 for j in range(1, (n - 1) + 1))
# for k in range(1, n):
# sums_u_k[k - 1, n - n_min - 1] = -sum(1 / j for j in range(k, (n - 1) + 1))
# sums_u_n_k[k - 1, n - n_min - 1] = -sum(
# 1 / j for j in range(n - k, (n - 1) + 1))
# sums_v2_k[k - 1, n - n_min - 1] = sum(
# 1 / j ** 2 for j in range(k, (n - 1) + 1))
# sums_v2_n_k[k - 1, n - n_min - 1] = sum(
# 1 / j ** 2 for j in range(n - k, (n - 1) + 1))
# if prog_fb:
# progress.iterate()
#
# all_sums['sums_u_k'] = sums_u_k
# all_sums['sums_u_k_n'] = sums_u_n_k
# all_sums['sums_v2_k'] = sums_v2_k
# all_sums['sums_v2_n_k'] = sums_v2_n_k
# all_sums['sums_sig_e'] = sums_sig_e
#
# return all_sums
[docs]
class CPSums:
"""Calculates, stores and get's sum values."""
def __init__(
self,
n_max: int = None,
n_min: int = None,
only_pickle: bool = False,
prog_fb: ProcessProgFeedback = None,
):
[docs]
self._version = SUMS_VERSION
if n_max is None:
self.n_max = 1000
else:
self.n_max = n_max
if n_min is None:
self.n_min = 10
else:
self.n_min = n_min
n_range = self.n_max - self.n_min
[docs]
self._sums_u_k = np.empty(shape=(self.n_max, n_range), dtype=np.float64)
[docs]
self._sums_u_n_k = np.empty(shape=(self.n_max, n_range), dtype=np.float64)
[docs]
self._sums_v2_k = np.empty(shape=(self.n_max, n_range), dtype=np.float64)
[docs]
self._sums_v2_n_k = np.empty(shape=(self.n_max, n_range), dtype=np.float64)
[docs]
self._sums_sig_e = np.empty(shape=n_range, dtype=np.float64)
if only_pickle:
self._calc_and_store()
else:
all_sums_path = fm.path("all_sums.pickle", fm.Type.Data)
if os.path.exists(all_sums_path) and os.path.isfile(all_sums_path):
with open(all_sums_path, "rb") as all_sums_file:
all_sums: dict = pickle.load(all_sums_file)
if ("version" not in all_sums.keys()) or all_sums["version"] != self._version:
self._calc_and_store()
else:
self._sums_u_k = all_sums["sums_u_k"]
self._sums_u_n_k = all_sums["sums_u_n_k"]
self._sums_v2_k = all_sums["sums_v2_k"]
self._sums_v2_n_k = all_sums["sums_v2_n_k"]
self._sums_sig_e = all_sums["sums_sig_e"]
logger.info("all_sums.pickle found")
else:
self._calc_and_store()
self.prog_fb = None
[docs]
def _calc_sums(self) -> None:
"""
Calculates the all the possible sums that might be used in the change_point module to detect
change points.
"""
if self.prog_fb:
self.prog_fb.set_status(status="Calculating sums...")
progress = ProcessProgress(prog_fb=self.prog_fb, num_iterations=self.n_max - self.n_min)
for n in range(self.n_min + 1, self.n_max + 1):
self._sums_sig_e[n - self.n_min - 1] = (np.pi**2) / 6 - sum(1 / j**2 for j in range(1, (n - 1) + 1))
for k in range(1, n):
self._sums_u_k[k - 1, n - self.n_min - 1] = -sum(1 / j for j in range(k, (n - 1) + 1))
self._sums_u_n_k[k - 1, n - self.n_min - 1] = -sum(1 / j for j in range(n - k, (n - 1) + 1))
self._sums_v2_k[k - 1, n - self.n_min - 1] = sum(1 / j**2 for j in range(k, (n - 1) + 1))
self._sums_v2_n_k[k - 1, n - self.n_min - 1] = sum(1 / j**2 for j in range(n - k, (n - 1) + 1))
if self.prog_fb:
progress.iterate()
[docs]
def _calc_and_store(self):
logger.info("Calculating all_sums.pickle")
self._calc_sums()
all_sums = {
"version": self._version,
"sums_u_k": self._sums_u_k,
"sums_u_n_k": self._sums_u_n_k,
"sums_v2_k": self._sums_v2_k,
"sums_v2_n_k": self._sums_v2_n_k,
"sums_sig_e": self._sums_sig_e,
}
with open(fm.path("all_sums.pickle", fm.Type.Data), "wb") as all_sums_file:
pickle.dump(all_sums, all_sums_file)
logger.info("all_sums.pickle calculated and saved")
[docs]
def get_u_k(self, n, k) -> np.float64:
"""
Used to get the sum value for u_k.
Parameters
----------
n : int
The number of points in segment.
k : int
The number of the point in the segment being considered.
Returns
-------
sum_u_k : np.float64
As defined just after eq. 6
"""
row = k - 1
column = n - self.n_min - 1
return self._sums_u_k[row, column]
[docs]
def get_u_k_n(self, n, k) -> np.float64:
"""
Used to get the sum value for u_k_n.
Parameters
----------
n : int
The number of points in segment.
k : int
The number of the point in the segment being considered.
Returns
-------
sum_u_k : np.float64
As defined just after eq. 6
"""
row = k - 1
column = n - self.n_min - 1
return self._sums_u_n_k[row, column]
[docs]
def get_v2_k(self, n, k) -> np.float64:
"""
Used to get the sum value for v2_k.
Parameters
----------
n : int
The number of points in segment.
k : int
The number of the point in the segment being considered.
Returns
-------
sum_u_k : np.float64
AAs defined just before eq. 7
"""
row = k - 1
column = n - self.n_min - 1
return self._sums_v2_k[row, column]
[docs]
def get_v2_k_n(self, n, k) -> np.float64:
"""
Used to get the sum value for v2_k_n.
Parameters
----------
n : int
The number of points in segment.
k : int
The number of the point in the segment being considered.
Returns
-------
sum_u_k : np.float64
As defined just before eq. 7
"""
row = k - 1
column = n - self.n_min - 1
return self._sums_v2_n_k[row, column]
[docs]
def get_sig_e(self, n) -> np.float64:
"""
Used to get the sum value for sig_e.
Parameters
----------
n : int
The number of points in segment.
Returns
-------
sum_u_k : np.float64
As defined just before eq. 7
"""
return self._sums_sig_e[n - self.n_min - 1]
[docs]
def get_set(self, n, k):
"""
Returns a dict with all the sums, except for sig_e.
Parameters
----------
n : int
The number of points in segment.
k : int
The number of the point in the segment being considered.
Returns
-------
set_sums : dict
Dict of sums
"""
row = k - 1
column = n - self.n_min - 1
set_sums = {
"u_k": self._sums_u_k[row, column],
"u_n_k": self._sums_u_n_k[row, column],
"v2_k": self._sums_v2_k[row, column],
"v2_n_k": self._sums_v2_n_k[row, column],
}
return set_sums
[docs]
def main():
"""
Just a test.
"""
test = CPSums()
n = 853
k = 522
print(f"Test for n={n} and k={k}")
print("u_k:\t" + str(test.get_u_k(n, k)))
print("u_k_n:\t" + str(test.get_u_k_n(n, k)))
print("v2_k:\t" + str(test.get_v2_k(n, k)))
print("v2_k_n:\t" + str(test.get_v2_k_n(n, k)))
print("sig_e:\t" + str(test.get_sig_e(n, k)))
if __name__ == "__main__":
main()