Source code for croissance.estimation

import logging
from collections import namedtuple
from operator import attrgetter

import numpy
import pandas

from croissance.estimation.outliers import remove_outliers
from croissance.estimation.ranking import rank_phases
from croissance.estimation.regression import fit_exponential
from croissance.estimation.smoothing.segments import segment_spline_smoothing
from croissance.estimation.util import points_per_hour, savitzky_golay


[docs] class RawGrowthPhase(namedtuple("RawGrowthPhase", ("start", "end"))): __slots__ = () @property def duration(self): return self.end - self.start
[docs] class GrowthPhase( namedtuple( "GrowthPhase", ("start", "end", "slope", "intercept", "n0", "SNR", "rank") ) ): __slots__ = () @property def duration(self): return self.end - self.start
[docs] @staticmethod def pick_best(growth_phases, metric="duration"): growth_phases = sorted(growth_phases, key=attrgetter(metric)) if growth_phases: return growth_phases[-1] return None
AnnotatedGrowthCurve = namedtuple( "AnnotatedGrowthCurve", ("series", "outliers", "growth_phases") )
[docs] class GrowthEstimationParameters: __slots__ = [ "segment_log_n0", "constrain_n0", "n0", "curve_minimum_duration_hours", "phase_minimum_signal_noise_ratio", "phase_minimum_duration_hours", "phase_minimum_slope", "phase_rank_exclude_below", "phase_rank_weights", ] def __init__(self): self.segment_log_n0 = False self.constrain_n0 = False self.n0 = 0.0 self.curve_minimum_duration_hours = 5 self.phase_minimum_signal_noise_ratio = 1.0 self.phase_minimum_duration_hours = 1.5 self.phase_minimum_slope = 0.005 self.phase_rank_exclude_below = 33 self.phase_rank_weights = { "SNR": 50, "duration": 30, "slope": 10, # TODO add 1 - start? }
growth_estimation_defaults = GrowthEstimationParameters()
[docs] def estimate_growth( curve: pandas.Series, *, params=growth_estimation_defaults, name: str = "untitled curve", ) -> AnnotatedGrowthCurve: log = logging.getLogger(__name__) series = curve.dropna() n_hours = int( numpy.round( points_per_hour(series) * max(1, params.curve_minimum_duration_hours) ) ) if n_hours == 0: log.warning( "Fewer than one data-point per hour for %s. Use the command-line " "`--input-time-unit minutes` if times are represented in minutes.", name, ) return AnnotatedGrowthCurve(series, pandas.Series(dtype="float64"), []) if n_hours % 2 == 0: n_hours += 1 series, outliers = remove_outliers(series, window=n_hours, std=3) # NOTE workaround for issue with negative curves if len(series[series > 0]) < 3: log.warning("Fewer than three positive data-points for %s", name) return AnnotatedGrowthCurve(series, outliers, []) if params.segment_log_n0: series_log_n0 = numpy.log(series - params.n0).dropna() smooth_series = segment_spline_smoothing(series, series_log_n0) else: smooth_series = segment_spline_smoothing(series) if len(smooth_series) < n_hours: log.warning("Insufficient smoothed data for %s", name) return AnnotatedGrowthCurve(series, outliers, []) phases = [] for phase in _find_growth_phases(smooth_series, window=n_hours): phase_series = series[phase.start : phase.end] # skip any growth phases that have not enough points for fitting if len(phase_series[phase_series > 0]) < 3: continue # skip any phases with less than minimum duration if phase.duration < max(0.0, params.phase_minimum_duration_hours): continue slope, intercept, n0, snr, _fallback_linear_method = fit_exponential( phase_series, n0=params.n0 if params.constrain_n0 else None ) # skip phases whose actual slope is below the limit if slope < max(0.0, params.phase_minimum_slope): continue # skip phases whose actual signal-noise-ratio is below the limit if snr < max(1.0, params.phase_minimum_signal_noise_ratio): continue phases.append( GrowthPhase( start=phase.start, end=phase.end, slope=slope, intercept=intercept, n0=n0, SNR=snr, rank=None, ) ) ranked_phases = rank_phases( phases, params.phase_rank_weights, thresholds={ "duration": max(0.0, params.phase_minimum_duration_hours), "slope": max(0.0, params.phase_minimum_slope), "SNR": max(1.0, params.phase_minimum_signal_noise_ratio), }, ) return AnnotatedGrowthCurve( series, outliers, [ phase for phase in ranked_phases if phase.rank >= params.phase_rank_exclude_below ], )
def _find_growth_phases(curve: "pandas.Series", window): """ Finds growth phases by locating regions in a series where both the first and second derivatives are positive. """ first_derivative = savitzky_golay(curve, window, 3, deriv=1) second_derivative = savitzky_golay(curve, window, 3, deriv=2) growth = pandas.Series( index=curve.index, data=(first_derivative.values > 0) & (second_derivative.values > 0), ) istart, iend, phases = None, None, [] for i, v in zip(growth.index, growth.values): if v: if istart is None: istart = i elif istart is not None: phases.append(RawGrowthPhase(istart, iend)) istart = None iend = i if istart is not None: phases.append(RawGrowthPhase(istart, iend)) return phases