from luminaire.model.model_utils import LADHolidays
[docs]
class DataExplorationError(Exception):
"""
Exception class for Luminaire Data Exploration.
"""
def __init__(self, message):
message = f'Data exploration failed. Error: {message}'
super(DataExplorationError, self).__init__(message)
[docs]
class DataExploration(object):
"""
This is a general class for time series data exploration and pre-processing.
:param str freq: The frequency of the time-series. A `Pandas offset`_ such as 'D', 'H', or 'M'. Luminaire currently
supports the following pandas frequency types: 'H', 'D', 'W', 'W-SUN', 'W-MON', 'W-TUE', 'W-WED', 'W-THU',
'W-FRI', 'W-SAT'.
:param float sig_level: The significance level to use for any statistical test withing data profile. This should be
a number between 0 and 1.
:param float min_ts_mean: The minimum mean value of the time series required for the model to run. For data that
originated as integers (such as counts), the ARIMA model can behave erratically when the numbers are small. When
this parameter is set, any time series whose mean value is less than this will automatically result in a model
failure, rather than a mostly bogus anomaly.
:param float fill_rate: Minimum proportion of data availability in the recent data window. Should be a fraction
between 0 and 1.
:param int max_window_length: The maximum size of the sub windows for input data segmentation.
:param int window_length: The size of the sub windows for input data segmentation.
:param int min_ts_length: The minimum required length of the time series for training.
:param int max_ts_length: The maximum required length of the time series for training.
:param bool is_log_transformed: A flag to specify whether to take a log transform of the input data. If the data
contain negatives, is_log_transformed is ignored even though it is set to True.
:param bool data_shift_truncate: A flag to specify whether left side of the most recent change point needs to
be truncated from the training data.
:param int min_changepoint_padding_length: A padding length between two change points. This parameter makes sure
that two consecutive change points are not close to each other.
:param float change_point_threshold: Minimum threshold (a value > 0) to flag change points based on KL divergence.
This parameter can be used to tune the sensitivity of the change point detection method.
.. _Pandas offset: https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects
"""
__version__ = "0.1"
def __init__(self,
freq='D',
min_ts_mean=None,
fill_rate=None,
sig_level=0.05,
min_ts_length=None,
max_ts_length=None,
is_log_transformed=None,
data_shift_truncate=True,
min_changepoint_padding_length=None,
change_point_threshold=2,
window_length=None,
*args,
**kwargs):
self._target_index = 'index'
self._target_metric = 'raw'
# Assigning a default min_ts_length if not provided by the user. Assigning two cycles.
# Keys are same as pandas frequency.
min_ts_length_dict = {
'H': 144,
'D': 21,
'W': 16, 'W-SUN': 16, 'W-MON': 16, 'W-TUE': 16, 'W-WED': 16, 'W-THU': 16, 'W-FRI': 16, 'W-SAT': 16,
'M': 24, 'MS': 24,
}
self.min_ts_length = min_ts_length or min_ts_length_dict.get(freq)
# Assigning a default max_ts_length if not provided by the user.
# Keys are same as pandas frequency.
max_ts_length_dict = {
'H': 90 * 24,
'D': 365 * 3,
'W': 100, 'W-SUN': 100, 'W-MON': 100, 'W-TUE': 100, 'W-WED': 100, 'W-THU': 100, 'W-FRI': 100, 'W-SAT': 100,
'M': 120, 'MS': 120,
}
self.max_ts_length = max_ts_length or max_ts_length_dict.get(freq)
# Pre-specification of the window lengths for different window frequencies with their min and max
window_length_dict = {
'S': 60 * 60,
'T': 60 * 24,
'15T': 4 * 24,
'H': 24,
'D': 28,
}
if freq in ['S', 'T', '15T', 'H', 'D']:
window_length = window_length_dict.get(freq)
min_num_train_windows = 6
max_num_train_windows = 10000
min_window_length = 10
max_window_length = 100000
self.freq = freq
self.min_ts_mean = min_ts_mean
self.fill_rate = fill_rate
self.sig_level = sig_level
self.is_log_transformed = is_log_transformed
self.data_shift_truncate = data_shift_truncate
self.change_point_threshold = change_point_threshold
self.min_changepoint_padding_length = min_changepoint_padding_length
self.min_window_length = min_window_length
self.max_window_length = max_window_length
self.min_num_train_windows = min_num_train_windows
self.max_num_train_windows = max_num_train_windows
self.window_length = window_length
# Assigning different padding based on the time series frequency
min_changepoint_padding_length_dict = {
'H': 7 * 24,
'D': 10,
'W': 12, 'W-SUN': 12, 'W-MON': 12, 'W-TUE': 12, 'W-WED': 12, 'W-THU': 12, 'W-FRI': 12, 'W-SAT': 12,
'M': 24, 'MS': 24,
}
self.min_changepoint_padding_length = min_changepoint_padding_length or min_changepoint_padding_length_dict.get(
freq)
tc_window_len_dict = {
'H': 24,
'D': 7,
'W':4
}
self.tc_window_length = tc_window_len_dict.get(freq) if freq in ['H', 'D', 'W'] else None
self.tc_max_window_length = 24
[docs]
def add_missing_index(self, df=None, freq=None):
"""
This function reindexes a pandas dataframe with missing dates for a given time series frequency.
Note: If duplicate dates dates are present in the dataframe, this function takes average of the duplicate
data dates and merges them as a single data date.
:param pandas.DataFrame df: Input pandas dataframe containing the time series
:param str freq: The frequency of the time-series. A `Pandas offset`_ such as 'D', 'H', or 'M'
:return: pandas dataframe after reindexing missing data dates
:rtype: pandas.DataFrame
"""
import pandas as pd
# Adding a group by logic for duplicate index
df = df.groupby(df.index).mean()
# Create a new Pandas data frame based on the first valid index and
# current date using the frequency defined by the use
idx = pd.date_range(start=df.first_valid_index(), end=df.last_valid_index(), freq=freq)
df_reindexed = df.reindex(idx)
return df_reindexed
def _kalman_smoothing_imputation(self, df=None, target_metric=None, imputed_metric=None, impute_only=False):
"""
This function performs a missing data imputation using the kalman smoothing.
:param pandas.Dataframe df: A pandas dataframe containing the time series
:param str target_metric: A string among the dataframe column name that contains the time series
:param str imputed_metric: A string among the dataframe column that stores the imputed time series
:return: A pandas dataframe containing the raw and the imputed time series
.. Note: missing data are imputed using moving average if position of the missing data does not satisfy the minimum
length requirement for Kalman smoothing
"""
import numpy as np
from pykalman import KalmanFilter
time_series = np.array(df[target_metric], dtype=np.float64)
missing_idx = np.where(np.isnan(time_series))[0]
if len(missing_idx) > 0:
if missing_idx[0] == 0:
time_series[0] = np.nanmean(time_series)
missing_idx = np.delete(missing_idx, 0)
if missing_idx[-1] == len(time_series) - 1:
time_series[-1] = np.nanmean(time_series[-min(10, len(time_series)):])
if np.isnan(time_series[-1]):
raise ValueError("Interpolation failed due to too many recent missing data.")
missing_idx = np.delete(missing_idx, -1)
if len(missing_idx) > 0:
masked_series = np.ma.array(np.copy(time_series))
for i in missing_idx:
masked_series[i] = np.ma.masked
kf = KalmanFilter()
ts_smoothed, ts_cov = kf.em(masked_series).smooth(masked_series)
for i in missing_idx:
time_series[i] = ts_smoothed[i]
if impute_only:
df[target_metric] = time_series
else:
df[imputed_metric] = time_series
return df
def _moving_average(self, series=None, window_length=None, train_subwindow_len=None):
"""
This function calculates the moving average of a series based on a given window length.
:param list series: The list containing the training data.
:param int window_length: The length of the moving average window.
:return: Smoothed input series using moving averages
:rtype: list
"""
import numpy as np
moving_averages = []
iter_length = len(series) - window_length
for i in range(0, iter_length):
ma_current = np.mean(series[i:i + window_length])
moving_averages.append(ma_current)
# Moving average shrinkes w.r.t the actual series based on the moving average window. Hence, to keep the
# length of the moving average equal to the series, we append proxy padding which are the moving averages
# from the closest representative training sub-window.
moving_averages_padded = moving_averages[(train_subwindow_len - (
window_length // 2)):train_subwindow_len] + moving_averages + moving_averages[-train_subwindow_len:-(
train_subwindow_len - (window_length // 2))]
return moving_averages_padded
@classmethod
def _get_exog_data(cls, exog_start, exog_end, index):
"""
This function gets the exogenous data for the specified index.
:param pandas.Timestamp exog_start: Start date for the exogenous data
:param pandas.Timestamp exog_end: End date for the exogenous data
:param list[pandas.Timestamp] index: List of indices
:return: Exogenous data for the given list of index
:rtype: pandas.DataFrame
"""
import pandas as pd
holiday_calendar = LADHolidays()
holiday_series = holiday_calendar.holidays(start=exog_start, end=exog_end, return_name=True)
return (pd.DataFrame({'Holiday': holiday_series, 'Ones': 1})
.pivot(columns='Holiday', values='Ones')
.reindex(index)
.fillna(0)
)
@classmethod
def _stationarizer(cls, endog=None, diff_min=1, diff_max=2, significance_level=0.01, obs_incl=True):
"""
This function tests the stationarity of the given time series and performs the required differencing.
:param pandas.Series endog: The list containing the training data. Note: endog can include the
raw actual for specific models (e.g. State Space)
:param int diff_min: Minimum order of differencing for non-stationarity.
:param int diff_max: Maximum order of differencing for non-stationarity.
:param float significance_level: Significance level for the adfuller test for checking non-stationarity
:param bool obs_incl: A flag indicating whether the raw actual is included in the endog.
:return: Difference time series, the order of differencing based on the stationarity test and the
raw actuals in every differencing step (for prediction adjustment in the actual
:rtype: tuple(numpy.array, int, list)
"""
import numpy as np
from statsmodels.tsa.stattools import adfuller
endog_diff = np.array(endog)
if diff_min == 0:
# Performing the adfuller test over the raw and the aggregated data to test for stationarity
adf_pvalue = adfuller(endog_diff)[1]
take_diff_flag = adf_pvalue > significance_level
else:
take_diff_flag = True
diff_order = 0
actual_previous_per_diff = []
# Take the difference until the difference data is stationarity based on the adfuller test
while take_diff_flag and diff_order < diff_max:
diff_order = diff_order + 1
if obs_incl:
actual_previous_per_diff.append(endog_diff[-2])
else:
actual_previous_per_diff.append(endog_diff[-1])
endog_diff = np.diff(endog_diff)
adf_pvalue = adfuller(endog_diff)[1]
take_diff_flag = adf_pvalue > significance_level
return endog_diff, diff_order, actual_previous_per_diff
def _partition(self, training_data, window_length, value_column=None):
"""
This function slices a list from the end of the list based on the size of the slice. Any remainder part of the
list from the beginning is ignored
:param pandas.DataFrame/list training_data: Pandas dataframe or a list containing the training data.
:param int window_length: The length of every slice of the list.
:param str value_column: Value column in the training dataframe.
:return: Sliced lists of input time series and aggregated timestamps
:rtype: tuple
"""
import collections
import operator
if not isinstance(training_data, list):
lst = list(training_data[value_column])
idx = training_data.index.normalize()
else:
lst = training_data
n = int(len(lst) / float(window_length))
# Performing pertition
lst_sliced = [lst[::-1][int(round(window_length * i)):
int(round(window_length * (i + 1)))][::-1] for i in range(n)][::-1]
if not isinstance(training_data, list):
idx_truncated = idx[-(n * window_length):]
aggregated_datetime = []
for i in range(n):
current_date_window = idx_truncated[(i * window_length): ((i + 1) * window_length)]
dates_freq_dist = dict(collections.Counter(current_date_window))
aggregated_datetime.append(max(dates_freq_dist.items(), key=operator.itemgetter(1))[0])
return lst_sliced, aggregated_datetime
else:
return lst_sliced, None
def _detrender(self, training_data_sliced=None, detrend_order_max=2, significance_level=0.01,
detrend_method=None, agg_datetime=None, past_model=None):
"""
This function tests the stationarity of the given time series and performs the required differencing.
:param list training_data_sliced: The list of list containing the training data.
:param int detrend_order_max: Maximum number of differencing for non-stationarity.
:param float significance_level: Significance level for the adfuller test for checking non-stationarity.
:param list agg_datetime: List of aggregated date times.
:param luminaire.model.window_density.WindowDensityModel past_model: Past stored window density model.
:return: Difference time series and the order of differencing based on the stationarity test.
:rtype: tuple(list, int)
"""
import numpy as np
import pandas as pd
from itertools import chain
from statsmodels.tsa.stattools import adfuller
from luminaire.model.lad_structural import LADStructuralModel
agg_data_model = None
# Flattening the training data using for the stationarity test
training_data_flattened = list(chain.from_iterable(training_data_sliced))
# Obtaining the aggregated series for modeling longer term patterns
avg_series = np.median(np.array(training_data_sliced), axis=1).tolist()
avg_series_df = pd.DataFrame({'index': agg_datetime, 'raw': avg_series}).set_index('index')
if past_model:
past_df = pd.DataFrame(past_model._params['AggregatedData'][
list(past_model._params['AggregatedData'].keys())[0]], columns=[
self._target_index, self._target_metric]).set_index(self._target_index)
current_agg_data_dict = avg_series_df['raw'].to_dict()
agg_data_dict = past_df['raw'].to_dict()
agg_data_dict.update(current_agg_data_dict)
avg_series_df = pd.DataFrame({'raw': agg_data_dict})
# Performing the adfuller test over the raw and the aggregated data to test for stationarity
adf_pvalue_raw_series = adfuller(training_data_flattened)[1]
adf_pvalue_avg_series = adfuller(avg_series)[1]
detrend_flag = adf_pvalue_raw_series > significance_level or adf_pvalue_avg_series > significance_level
detrend_order = 0
if detrend_method == 'diff':
training_data_sliced_stationarized = training_data_sliced
elif detrend_method == 'modeling':
if not detrend_flag:
return training_data_sliced, detrend_order, agg_data_model, agg_data_model
# Take the difference until the difference data is stationarity based on the adfuller test
while detrend_flag and detrend_order < detrend_order_max:
detrend_order = detrend_order + 1
if detrend_method == 'diff':
for i in range(0, len(training_data_sliced)):
training_data_sliced_stationarized[i] = np.diff(training_data_sliced_stationarized[i])
training_data_stationarized_flattened = list(chain.from_iterable(training_data_sliced_stationarized))
avg_series = np.array(training_data_sliced_stationarized).mean(0).tolist()
adf_pvalue_raw_series = adfuller(training_data_stationarized_flattened)[1]
adf_pvalue_avg_series = adfuller(avg_series)[1]
detrend_flag = (
adf_pvalue_raw_series > significance_level or adf_pvalue_avg_series > significance_level)
elif detrend_method == 'modeling':
training_data_sliced_stationarized = (np.array(training_data_sliced) /
(np.array(avg_series).reshape(-1, 1))).tolist()
agg_struct_model_config = {"include_holidays_exog": 1, "is_log_transformed": 0,
"max_ft_freq": 3, "p": 3, "q": 3}
de_obj = DataExploration(freq='D', data_shift_truncate=False, is_log_transformed=False, fill_rate=0.9)
avg_series_df = avg_series_df.groupby(level=0).max()
agg_cleaned_data, pre_prc = de_obj.profile(avg_series_df)
if pre_prc['success']:
lad_struct_obj = LADStructuralModel(hyper_params=agg_struct_model_config, freq='D')
success, model_date, agg_data_model = lad_struct_obj.train(data=agg_cleaned_data, **pre_prc)
if not success:
if 'AR coefficients' in agg_data_model._params['ErrorMessage']:
agg_struct_model_config = {"include_holidays_exog": 1, "is_log_transformed": 0,
"max_ft_freq": 3, "p": 0, "q": 1}
lad_struct_obj = LADStructuralModel(hyper_params=agg_struct_model_config, freq='D')
success, model_date, agg_data_model = lad_struct_obj.train(data=agg_cleaned_data, **pre_prc)
elif 'MA coefficients' in agg_data_model._params['ErrorMessage']:
agg_struct_model_config = {"include_holidays_exog": 1, "is_log_transformed": 0,
"max_ft_freq": 3, "p": 1, "q": 0}
lad_struct_obj = LADStructuralModel(hyper_params=agg_struct_model_config, freq='D')
success, model_date, agg_data_model = lad_struct_obj.train(data=agg_cleaned_data, **pre_prc)
agg_data_model = agg_data_model if success else None
detrend_flag = False
return training_data_sliced_stationarized, detrend_order, \
agg_data_model, avg_series_df.reset_index().values.tolist()
def _ma_detrender(self, series=None, padded_series=None, ma_window_length=None):
"""
This function detrends a the values from a target time window w.r.t a padding around the target window.
Note: This function is only been used for detrending the scoring window.
:param list series: The series containing the values from the scoring window.
:param list padded_series: The series containing the values for the padded scoring window
:param int ma_window_length: Size of the padding
:return: Returns 'series' after removing the trend
:rtype: list
"""
import numpy as np
moving_averages = []
iter_length = len(padded_series) - ma_window_length
for i in range(0, iter_length):
ma_current = np.mean(padded_series[i:i + ma_window_length])
moving_averages.append(ma_current)
stationarized_series = (np.array(series) / np.array(moving_averages)).tolist()
return stationarized_series
def _detect_window_size(self, series=None, streaming=False):
"""
This function detects the ideal window size based on the seasonality pattern of the data
:param pandas.DataFrame series: The input sequence of data.
:param bool streaming: Glag to update the logic for streaming anomaly detection models.
:return: An int containing the optimal window size
:rtype: int
"""
import numpy as np
n = len(series)
if not streaming:
series = np.diff(series, 1)
# Generating the indices based on odd and event number of terms in the time series
if int(n) % 2 != 0:
all_idx = np.arange(1, n // 2 + 1)
else:
all_idx = np.arange(1, n // 2)
# Performing Fourier transformation
yf = np.real(np.fft.rfft(series))
# Spectral density for the fourier transformation (to identify the significant frequencies)
psd = abs(yf[all_idx]) ** 2 + abs(yf[-all_idx]) ** 2
sig_freq_idx = np.argsort(psd[: int(len(psd) / 2)])
if streaming:
return int(np.rint(float(n) / max(1, sig_freq_idx[-1] + 1)))
return min(self.tc_max_window_length,
max(int(np.rint(float(n) / max(1, sig_freq_idx[-1] + 1))),
int(np.rint(float(n) / max(1, sig_freq_idx[-2] + 1)))))
def _local_minima(self, input_dict=None, window_length=None):
"""
This function finds the index corresponding to the local minimas for detected consecutive trend changes
:param dict input_dict: A dictionary containing the timestamps as keys for potential trend changes
and the values being the corresponding p-values
:param int window_length: The size of the sub windows for input data segmentation.
:return: List of local minimas
:rtype: list
"""
import numpy as np
import collections
ordered_dict = collections.OrderedDict(sorted(input_dict.items()))
key_list = np.array(list(ordered_dict.keys()))
value_list = np.array(list(ordered_dict.values()))
diff2_series = np.diff(np.sign(np.diff(value_list)))
local_min_loc = np.where(diff2_series > 0)[0]
local_min_loc = local_min_loc + 1 if len(local_min_loc) > 0 else np.zeros(1)
if len(local_min_loc) > 1:
min_keys = [key_list[int(loc)] for loc in local_min_loc]
elif len(local_min_loc) <= 1:
min_keys = [key_list[0], key_list[-1]] if len(input_dict) > window_length else [key_list[int(loc)] for loc in
local_min_loc]
return min_keys
def _shift_intensity(self, change_points=None, df=None, metric=None):
"""
This function computes the Kullback_Leibler divergence of the the time series around a changepoint detected by the
pelt_change_point_detection() function. This considers Gaussian assumption on the underlying data distribution.
:param list change_points: A list storing indices of the potential change points
:param pandas.dataframe df: A pandas dataframe containing time series ignoring the top 5% volatility
:param str metric: A string in the dataframe column names that contains the time series
:return: A list containing the magnitude of changes for every corresponding change points
:rtype: list
"""
import numpy as np
min_changepoint_padding_length = self.min_changepoint_padding_length
mag_change = []
float_min = 1e-10
c_count = 0
# This loop iterates over all the change points obtained through PELT
for dates in change_points:
# KL divergence is measured for the datapoints on the left side of the change point versus the right side
# of the change point.
if len(change_points) == 1:
window_l = df[metric].iloc[:change_points[c_count]].values
window_r = df[metric].iloc[change_points[c_count]:].values
elif c_count == 0:
window_l = df[metric].iloc[:change_points[c_count]].values
window_r = df[metric].iloc[change_points[c_count]:change_points[c_count + 1]].values
elif c_count < len(change_points) - 1:
window_l = df[metric].iloc[change_points[c_count - 1]:change_points[c_count]].values
window_r = df[metric].iloc[change_points[c_count]:change_points[c_count + 1]].values
else:
window_l = df[metric].iloc[change_points[c_count - 1]:change_points[c_count]].values
window_r = df[metric].iloc[change_points[c_count]:].values
if len(window_r) <= min_changepoint_padding_length:
mag_change.append(0)
else:
window = np.concatenate((window_l, window_r), axis=0)
# Mean and standard deviation of the data from the combined window
w_mean = np.mean(window)
w_std = np.std(window, ddof=1) if len(window) > 1 else float_min
# Mean and standard deviation of the data from the window after the change point
wr_mean = np.mean(window_r)
wr_std = np.std(window_r, ddof=1) if len(window_r) > 1 else float_min
# Kullback-Leibler divergence between two normal densities
mag_change.append(
np.log(wr_std / w_std) + ((w_std ** 2 + (w_mean - wr_mean) ** 2) / (2 * (wr_std ** 2))) - 0.5)
c_count = c_count + 1
return mag_change
def _pelt_change_point_detection(self, df=None, metric=None, min_ts_length=None, max_ts_length=None):
"""
This function computes the significant change points based on PELT and the Kullback-Leibler divergence method.
:param pandas.dataframe df: A pandas dataframe containing the time series
:param pandas.dataframe metric: The metric in the dataframe that contains the time series
:param int min_ts_length: Specifying the minimum required length of the time series for training
:param int max_ts_length: Specifying the maximum required length of the time series for training.
The training time series length truncates accordingly based on minimum between max_ts_length and the
length of the input time series.
:return: A pandas dataframe containing the time series after the last changepoint
:rtype: tuple
>>> df
raw interpolated
2016-01-02 1753421.0 14.377080
2016-01-03 1879108.0 14.446308
2016-01-04 1462725.0 14.195812
2016-01-05 1525162.0 14.237612
2016-01-06 1424264.0 14.169166
... ... ...
2018-10-14 2185230.0 14.597232
2018-10-15 1825539.0 14.417386
2018-10-16 1776778.0 14.390313
2018-10-17 1792899.0 14.399345
2018-10-18 1738657.0 14.368624
>>> copy_df, change_point_list
( raw interpolated
2016-01-02 1753421.0 14.377080
2016-01-03 1879108.0 14.446308
2016-01-04 1462725.0 14.195812
2016-01-05 1525162.0 14.237612
2016-01-06 1424264.0 14.169166
... ... ...
2018-10-14 2185230.0 14.597232
2018-10-15 1825539.0 14.417386
2018-10-16 1776778.0 14.390313
2018-10-17 1792899.0 14.399345
2018-10-18 1738657.0 14.368624
[1021 rows x 2 columns], ['2016-12-26 00:00:00', '2018-09-10 00:00:00'])
"""
import numpy as np
import pandas as pd
from changepy import pelt
from changepy.costs import normal_var
change_point_threshold = self.change_point_threshold
df_copy = pd.DataFrame(df[metric])
counts = df_copy[metric].values
mean = np.mean(counts)
# Performing changepoint detection with respect to the data variablity shift through PELT
cdate = pelt(normal_var(counts, mean), len(counts))
# If PELT detects the first datapoint to be a change point, then we ignore that change point
if cdate:
if cdate[0] == 0:
cdate.remove(0)
if len(cdate) > 0:
# Finding the magnitude of divergence around every change point (detected by PELT) by comparing the
# distributions of the data points on the left and the right side of the change point
entrp = self._shift_intensity(change_points=cdate, df=df_copy, metric=metric)
df_change_points = pd.DataFrame({'c_point': cdate, 'entropy': entrp})
# Narrowing down to the change points which satisfies a required lower bound of divergence
df_change_points = df_change_points[df_change_points['entropy'] > change_point_threshold]
cdate = df_change_points['c_point'].values
# Set the start date of the time series based on the min_ts_length and the max_ts_length and the change
# points
if len(cdate) > 0:
df_subset = df_copy.iloc[cdate]
change_point_list = [i.__str__() for i in df_subset.index]
index = df_subset.index[-1]
copy_df = df.loc[index: df.last_valid_index()] if self.data_shift_truncate else df
if copy_df.shape[0] < min_ts_length:
# Return None, If time series after the change point contains less number of data points
# than the minimum required length
return None, change_point_list
elif copy_df.shape[0] < max_ts_length:
# Return the time series after the change point if it's length lies between the minimum and the
# maximum required length
pass
elif copy_df.shape[0] > max_ts_length:
# Truncate the time series after change point if it contains more than required data for
# training
copy_df = df.iloc[-max_ts_length:]
else:
change_point_list = None
if df.shape[0] < min_ts_length:
return None, change_point_list
else:
if df.shape[0] < max_ts_length:
copy_df = df
else:
copy_df = df.iloc[-max_ts_length:]
else:
change_point_list = None
if df.shape[0] < min_ts_length:
return None, change_point_list
else:
if df.shape[0] < max_ts_length:
copy_df = df
else:
copy_df = df.iloc[-max_ts_length:]
return copy_df, change_point_list
def _trend_changes(self, input_df=None, value_column=None):
"""
This function detects the trend changes of the input time series
:param pandas.DataFrame input_df: The input sequence of data.
:param str value_column: A string containing the column name containing the target series
:return: list of strings with the potential trend changes.
:rtype: list[str]
>>> input_df
raw interpolated
2016-01-02 1753421.0 14.377080
2016-01-03 1879108.0 14.446308
2016-01-04 1462725.0 14.195812
2016-01-05 1525162.0 14.237612
2016-01-06 1424264.0 14.169166
... ... ...
2018-10-14 2185230.0 14.597232
2018-10-15 1825539.0 14.417386
2018-10-16 1776778.0 14.390313
2018-10-17 1792899.0 14.399345
2018-10-18 1738657.0 14.368624
>>> global_trend_changes
['2016-04-16 00:00:00', '2016-05-28 00:00:00', '2016-08-06 00:00:00', '2016-11-05 00:00:00',
'2016-12-31 00:00:00', '2017-02-25 00:00:00', '2017-03-25 00:00:00', '2017-06-03 00:00:00',
'2017-07-01 00:00:00', '2017-08-26 00:00:00', '2017-09-30 00:00:00', '2017-10-28 00:00:00',
'2017-12-23 00:00:00', '2018-02-17 00:00:00', '2018-03-17 00:00:00', '2018-05-19 00:00:00',
'2018-06-30 00:00:00', '2018-09-08 00:00:00']
"""
import numpy as np
from scipy import stats
from statsmodels.tsa.stattools import acf
min_float = 1e-10
window_length = self.tc_window_length
sig_level = self.sig_level
series = input_df[value_column].tolist()
timestamps = input_df.index.tolist()
if not window_length:
window_length = self._detect_window_size(series=series)
# Creating a crude estimation of the required window size
nwindows = int(window_length * 1.5)
current_mid_point = window_length * nwindows
past_trend_change = 0
global_trend_changes = []
local_trend_changes = {}
past_p_value = -1
# If the remaining part of the time series is less than the (window_length * nwindows) we terminate the while loop
current_reminder = len(series) if (current_mid_point + (window_length * nwindows)) < len(series) else 0
while current_reminder >= window_length:
# Creating the left and the right window for slope detection
l_window = series[current_mid_point - (window_length * nwindows): current_mid_point]
r_window = series[current_mid_point: current_mid_point + (window_length * nwindows)]
l_window_length = len(l_window)
r_window_length = len(r_window)
N = l_window_length + r_window_length
# Finding the effective degrees of freedom
auto_corr = acf(l_window + r_window, nlags=N)
auto_corr[np.isnan(auto_corr)] = 1
eff_df = 0
for i in range(1, N):
eff_df = eff_df + (((N - i) / float(N)) * auto_corr[i])
eff_df = max(1, int(np.rint(1 / ((1 / float(N)) + ((2 / float(N)) * eff_df)))) - 4)
# Creating the left and right indices for running the regression
l_x, r_x = np.arange(l_window_length), np.arange(r_window_length)
# Linear regression on the left and the right window
l_slope, l_intercept, l_r_value, l_p_value, l_std_err = stats.linregress(l_x, l_window)
r_slope, r_intercept, r_r_value, r_p_value, r_std_err = stats.linregress(r_x, r_window)
# t-test for slope shift
l_window_hat = (l_slope * l_x) + l_intercept
r_window_hat = (r_slope * r_x) + r_intercept
l_sse = np.sum((l_window - l_window_hat) ** 2)
r_sse = np.sum((r_window - r_window_hat) ** 2)
l_const = np.sum((np.arange(1, l_window_length + 1) - ((l_window_length + 1) / 2.0)) ** 2)
r_const = np.sum((np.arange(1, r_window_length + 1) - ((r_window_length + 1) / 2.0)) ** 2)
prop_const = (l_const * r_const) / (l_const + r_const)
total_sse = l_sse + r_sse
std_err = max(np.sqrt(total_sse / (prop_const * (l_window_length + r_window_length - 4))), min_float)
t_stat = abs(l_slope - r_slope) / std_err
p_value = (1 - stats.t.cdf(t_stat, df=eff_df)) * 2
if p_value < sig_level:
# Check if the same shift detected multiple times
if p_value == past_p_value:
local_trend_changes.pop(past_trend_change)
if current_reminder - window_length < window_length:
if len(local_trend_changes) > 2:
# _local_minima function is called to detec the optimal trend change(s) among a group of local
# trend changes
current_trend_change = self._local_minima(input_dict=local_trend_changes,
window_length=window_length)
for key in current_trend_change:
global_trend_changes.append(timestamps[key].__str__())
else:
# Handling the trend changes at the tail of the time series
local_trend_changes[current_mid_point] = p_value
past_trend_change = current_mid_point
past_p_value = p_value
else:
# Handling the trend changes at the tail of the time series
if (current_mid_point - past_trend_change) <= window_length and len(local_trend_changes) > 2:
current_trend_change = self._local_minima(input_dict=local_trend_changes, window_length=window_length)
for key in current_trend_change:
global_trend_changes.append(timestamps[key].__str__())
local_trend_changes = {}
current_mid_point = current_mid_point + window_length
current_reminder = len(series) - current_mid_point
return global_trend_changes
[docs]
def kf_naive_outlier_detection(self, input_series, idx_position):
"""
This function detects outlier for the specified index position of the series.
:param numpy.array input_series: Input time series
:param int idx_position: Target index position
:return: Anomaly flag
:rtype: bool
>>> input_series = [110, 119, 316, 248, 451, 324, 241, 275, 381]
>>> self.kf_naive_outlier_detection(input_series, 6)
False
"""
import numpy as np
from pykalman import KalmanFilter
kf = KalmanFilter()
truncated_series = input_series[-(self.min_ts_length * 3):] \
if idx_position == len(input_series) - 1 else input_series
idx_position = len(truncated_series) - 1
filtered_state_means, filtered_state_covariance = kf.em(truncated_series).filter(truncated_series)
residuals = truncated_series - filtered_state_means[:, 0]
# Catching marginal anomalies to avoid during training
is_anomaly = residuals[idx_position] < np.mean(residuals) \
- (1 * np.sqrt(filtered_state_covariance)[idx_position][0][0]) \
or residuals[idx_position] > np.mean(residuals) \
+ (1 * np.sqrt(filtered_state_covariance)[idx_position][0][0])
return is_anomaly
def _truncate_by_data_gaps(self, df, target_metric):
"""
This function truncates time series after large data gaps.
:param pandas.DataFrame df: Input time series in pandas data frame
:param str target_metric: Target value column in the input data frame
:return: Pandas dataframe with truncated pandas data frame
:rtype: pandas.DataFrame
"""
import numpy as np
max_data_gap = abs(self.min_ts_length / 3.0)
gap_len = 0
last_avl_idx = None
for row in df[::-1].iterrows():
if np.isnan(row[1][target_metric]) or row[1][target_metric] is None:
gap_len = gap_len + 1
else:
gap_len = 0
last_avl_idx = row[0]
if gap_len >= max_data_gap and last_avl_idx:
truncated_df = df[last_avl_idx:]
return truncated_df
return df
def _prepare(self, df, impute_only, streaming=False, **kwargs):
"""
This function performs a basic data preparation before performing a full profiling
:param pandas.DataFrame/list df: Input time series in pandas data frame or in list format
:param bool impute_only: A flag to decide whether to return just after imputation only
:param streaming: A flag to identify the logic based on streaming vs non-streaming data
:param kwargs: Other input parameters
:return: Pandas dataframe with prepared data (with identified frequency for streaming)
:rtype: tuple
"""
import pandas as pd
min_ts_length = self.min_ts_length
max_ts_length = self.max_ts_length
target_metric = 'raw'
imputed_metric = 'interpolated'
# if input dimension/metric combination timeseries is null, skip modeling
if len(df) == 0:
raise ValueError("No model ran because dimension/metric combination is null")
if not streaming and len(df) < min_ts_length:
raise ValueError("Current time series length of {}{} is less tha minimum requires "
"length {}{} for training".format(len(df), self.freq, min_ts_length, self.freq))
if isinstance(df, list):
df = (pd.DataFrame(df, columns=[self._target_index, self._target_metric]).set_index(self._target_index))
if not self.freq:
if streaming and len(df) > 1:
freq = (pd.DatetimeIndex(df.index[1:]) - pd.DatetimeIndex(df.index[:-1])).value_counts().index[0]
else:
freq = self.freq
freq_delta = pd.Timedelta("1" + freq) if not any(char.isdigit() for char in str(freq)) else pd.Timedelta(freq)
df.index = pd.DatetimeIndex(df.index)
df = self.add_missing_index(df=df, freq=freq_delta)
if not streaming:
df = df.iloc[-min(max_ts_length, len(df)):]
df = self._truncate_by_data_gaps(df=df, target_metric=target_metric)
if len(df) < min_ts_length:
raise ValueError("Due to a recent data gap, training is waiting for more data to populate")
if not streaming and len(df) < min_ts_length:
raise ValueError("The training data observed continuous missing data near the end. Require more stable "
"data to train")
if streaming and 'impute_zero' in kwargs and kwargs['impute_zero']:
df = df.fillna(0)
else:
df = self._kalman_smoothing_imputation(df=df, target_metric=target_metric, imputed_metric=imputed_metric,
impute_only=impute_only)
if streaming:
return df, freq
else:
return df
[docs]
def profile(self, df, impute_only=False, **kwargs):
"""
This function performs required data profiling and pre-processing before hyperparameter optimization or time
series model training.
:param list/pandas.DataFrame df: Input time series.
:param bool impute_only: Flag to perform preprocessing until imputation OR full preprocessing.
:return: Preprocessed dataframe with batch data summary.
:rtype: tuple[pandas.dataFrame, dict]
>>> de_obj = DataExploration(freq='D', data_shift_truncate=1, is_log_transformed=0, fill_rate=0.9)
>>> data
raw
index
2020-01-01 1326.0
2020-01-02 1552.0
2020-01-03 1432.0
2020-01-04 1470.0
2020-01-05 1565.0
... ...
2020-06-03 1934.0
2020-06-04 1873.0
2020-06-05 1674.0
2020-06-06 1747.0
2020-06-07 1782.0
>>> data, summary = de_obj.profile(data)
>>> data, summary
( raw interpolated
2020-03-16 1371.0 1371.0
2020-03-17 1325.0 1325.0
2020-03-18 1318.0 1318.0
2020-03-19 1270.0 1270.0
2020-03-20 1116.0 1116.0
... ... ...
2020-06-03 1934.0 1934.0
2020-06-04 1873.0 1873.0
2020-06-05 1674.0 1674.0
2020-06-06 1747.0 1747.0
2020-06-07 1782.0 1782.0
[84 rows x 2 columns], {'success': True, 'trend_change_list': ['2020-04-01 00:00:00'], 'change_point_list':
['2020-03-16 00:00:00'], 'is_log_transformed': 0, 'min_ts_mean': None, 'ts_start': '2020-01-01 00:00:00',
'ts_end': '2020-06-07 00:00:00'})
"""
import numpy as np
min_ts_length = self.min_ts_length
max_ts_length = self.max_ts_length
is_log_transformed = self.is_log_transformed
target_metric = 'raw'
imputed_metric = 'interpolated'
try:
processed_df= self._prepare(df, impute_only)
if impute_only:
summary = {'success': True}
return processed_df, summary
if self.freq == 'D':
train_end_anomaly_flag = self.kf_naive_outlier_detection(input_series=processed_df['interpolated'].values,
idx_position=len(processed_df['interpolated']) - 1)
if train_end_anomaly_flag:
processed_df = processed_df.iloc[:-1]
if is_log_transformed and not processed_df[processed_df[target_metric] < 0].empty:
is_log_transformed = False
# We want to make sure the time series does not contain any negatives in case of log transformation
if is_log_transformed:
min_ts_mean = np.log(max(self.min_ts_mean, 1)) if self.min_ts_mean is not None else None
else:
min_ts_mean = self.min_ts_mean
if np.sum(np.isnan(processed_df[target_metric].values[-(2 * min_ts_length):])) / float(2 * min_ts_length) \
> 1 - self.fill_rate:
if np.sum(np.isnan(processed_df[target_metric].values[-min_ts_length:])) > 0:
raise ValueError('Too few observed data near the prediction date')
if 'ts_start' not in kwargs:
ts_start = processed_df.index.min()
else:
ts_start = max(processed_df.index.min(), kwargs['ts_start'])
if 'ts_end' not in kwargs:
ts_end = processed_df.index.max()
else:
ts_end = min(processed_df.index.max(), kwargs['ts_end'])
processed_df = processed_df[ts_start:]
processed_df = processed_df.loc[:ts_end]
if is_log_transformed:
processed_df[imputed_metric] = np.log(processed_df[imputed_metric] + 1) \
if is_log_transformed else processed_df[imputed_metric]
data, change_point_list = self._pelt_change_point_detection(df=processed_df, metric=imputed_metric,
min_ts_length=min_ts_length,
max_ts_length=max_ts_length)
trend_change_list = self._trend_changes(input_df=processed_df, value_column=imputed_metric)
summary = {'success': True,
'trend_change_list': trend_change_list if len(trend_change_list) > 0 else None,
'change_point_list': change_point_list,
'is_log_transformed': is_log_transformed,
'min_ts_mean': min_ts_mean,
'ts_start': str(ts_start),
'ts_end': str(ts_end)}
except Exception as e:
# Data exploration failed
summary = {'success': False,
'ErrorMessage': str(e)}
data = None
return data, summary
[docs]
def stream_profile(self, df, impute_only=False, **kwargs):
"""
This function performs data preparation for streaming data.
:param df: list/pandas.DataFrame df: Input time series.
:param impute_only: Flag to perform preprocessing until imputation OR full preprocessing.
:param kwargs: Other input parameters.
:return: Prepared ppandas dataframe with profile information.
:rtype: tuple[pandas.dataFrame, dict]
"""
from random import sample
import datetime
import numpy as np
import pandas as pd
from scipy import stats
try:
processed_df, freq = self._prepare(df, impute_only=impute_only, streaming=True, **kwargs)
if impute_only:
return processed_df, None
training_end = processed_df.index[-1]
training_end_time = training_end.time()
train_start_search_flag = True
idx_date_list = []
for idx in processed_df.index:
if idx.time() == training_end_time and train_start_search_flag:
delta = "1" + freq if not any(char.isdigit() for char in str(freq)) else freq
training_start = idx + pd.Timedelta(delta)
training_start_time = training_start.time()
train_start_search_flag = False
if idx.date() not in idx_date_list:
idx_date_list.append(idx.date())
trunc_df = processed_df[training_start: training_end]
if not self.window_length:
window_length_list = []
# If the window size is not specified, the following logic makes several random segments of the
# time series which obtains a list of optimal window sizes
for i in range(100):
rand_date = sample(idx_date_list, 1)[0]
rand_start_idx = pd.Timestamp(datetime.datetime.combine(rand_date, training_start_time))
if rand_date in idx_date_list[:int(len(idx_date_list) / 2)]:
time_series_i = trunc_df.loc[rand_start_idx:]['interpolated'].values
else:
time_series_i = trunc_df.loc[:rand_start_idx]['interpolated'].values
window_length_i = self._detect_window_size(time_series_i, streaming=True) if not self.window_length \
else self.window_length
window_length_list.append(window_length_i)
window_length_list = np.array(window_length_list)
# From the list of optimal window sizes, if it is a list of constants, we take the constant as the
# window size. Otherwise, we obtain the window size that is most frequently observed in the list.
if np.all(window_length_list == min(window_length_list)):
window_length = window_length_list[0]
else:
bin_count = max(1, int((max(window_length_list) - min(window_length_list)) / 12))
bins = np.linspace(min(window_length_list) - 1, max(window_length_list) + 1, bin_count)
if len(bins) == 1:
window_length = int(stats.mode(window_length_list).mode[0])
else:
digitized = np.digitize(window_length_list, bins)
arg_mode = np.argmax([len(window_length_list[digitized == i]) for i in range(1, len(bins))]) + 1
window_length = int(stats.mode(window_length_list[digitized == arg_mode]).mode[0])
if window_length < self.min_window_length:
raise ValueError('Training window too small')
if window_length > self.max_window_length:
raise ValueError('Training window too large')
n_windows = len(trunc_df) // window_length
if n_windows < self.min_num_train_windows:
raise ValueError('Too few training windows')
if n_windows > self.max_num_train_windows:
raise ValueError('Too many training windows')
else:
window_length = self.window_length
summary = {'success': True,
'freq': str(freq),
'window_length': window_length,
'min_window_length': self.min_window_length,
'max_window_length': self.max_window_length}
except Exception as e:
# Streaming data exploration failed
summary = {'success': False,
'ErrorMessage': str(e)}
trunc_df = None
return trunc_df, summary