from hyperopt import fmin, tpe, hp, STATUS_OK
from luminaire.model import LADStructuralModel, LADStructuralHyperParams, LADFilteringModel, LADFilteringHyperParams
from luminaire.exploration.data_exploration import DataExploration
from luminaire.utils.random_state_validation import check_random_state
import warnings
warnings.filterwarnings('ignore')
[docs]
class HyperparameterOptimization(object):
"""
Hyperparameter optimization for LAD outlier detection configuration for batch data.
:param str freq: The frequency of the time-series. A `Pandas offset`_ such as 'D', 'H', or 'M'. Luminaire currently
supports the following pandas frequency types: 'H', 'D', 'W', 'W-SUN', 'W-MON', 'W-TUE', 'W-WED', 'W-THU',
'W-FRI', 'W-SAT'.
:param str detection_type: Luminaire anomaly detection type. Only Outlier detection for batch data is currently
supported.
:param float min_ts_mean: Minimum average values in the most recent window of the time series. This optional parameter
can be used to avoid over-alerting from noisy low volume time series.
:param int max_ts_length: The maximum required length of the time series for training.
:param int min_ts_length: The minimum required length of the time series for training. The input time series will be
truncated if the length is greater than this value.
:param int scoring_length: Number of innovations to be scored after training window with respect to the frequency.
:param int random_state: Turn seed into a np.random.RandomState instance
.. _Pandas offset: https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects
"""
def __init__(self,
freq,
detection_type='OutlierDetection',
min_ts_mean=None,
max_ts_length=None,
min_ts_length=None,
scoring_length=None,
random_state=None,
**kwargs):
self._target_metric = 'raw'
self.freq = freq
self.detection_type = detection_type
self.min_ts_mean=min_ts_mean
self._target_index = 'index'
self._target_metric = 'raw'
self.max_ts_length = max_ts_length
self.min_ts_length = min_ts_length
self.anomaly_intensity_list = [-0.6, -0.3, -0.1, 0.1, 0.3, 0.6]
scoring_length_dict = {
'H': 36, 'D': 10, 'W': 8, 'M': 6,
}
self.scoring_length = scoring_length or (scoring_length_dict.get(freq)
if freq in scoring_length_dict.keys() else 30)
self.random_state = random_state
def _mape(self, actuals, predictions):
"""
This function computes the mean absolute percentage error for the observed vs the predicted values.
:param numpy.array actuals: Observed values
:param numpy.array predictions: Predicted values
:return: Mean absolute percentage error
:rtype: numpy.nanmean
"""
import numpy as np
actuals = np.array(actuals)
predictions = np.array(predictions)
non_zero_idx = set(np.argwhere(actuals)[:,0].tolist())
non_none_idx = set(np.where(~np.isnan(predictions.astype(float)))[0].tolist())
filtered_idx = list(non_zero_idx.intersection(non_none_idx))
filtered_actuals = actuals[filtered_idx]
filtered_predictions = predictions[filtered_idx]
mape = np.nanmean(np.abs((filtered_actuals - filtered_predictions) / filtered_actuals))
return mape if not np.isnan(mape) else None
def _synthetic_anomaly_check(self, observation, prediction, std_error):
"""
This function performs anomaly detection based on synthetic anomalies for a given anomaly intensity list
:param float observation: Observed value
:param float prediction: Predicted value
:param float std_error: Standard error for the predictive model
:return: return the list of anomaly flags with the corresponding probabilities
:rtype: tuple[list, list]
"""
import numpy as np
import scipy.stats as st
float_min = 1e-10
anomaly_flags = []
anomaly_probabilities = []
# Anomaly detection based on synthetic anomalies generated through a given intensity list
for prop in self.anomaly_intensity_list:
rnd = check_random_state(self.random_state)
trial_prob = rnd.uniform(0, 1, 1)
if trial_prob < 0.4:
synthetic_value = observation + (prop * observation)
anomaly_flags.append(1)
else:
synthetic_value = observation
anomaly_flags.append(0)
zscore_abs = abs((synthetic_value - prediction) / max(float(std_error), float_min))
probability = (2 * st.norm(0, 1).cdf(zscore_abs)) - 1
anomaly_probabilities.append(probability[0])
return anomaly_flags, anomaly_probabilities
def _objective_part(self, data, smoothed_series, args):
"""
This is the objective function that outputs the loss for a giveen set of hyperparameters for optimization
through hyperopt
:param pandas.DataFrame data: Input time series data
:param list smoothed_series: Input time series after smoothing
:param args:
:return: AUC based on observed (synthetic) and predicted anomalies
:rtype: dict
>>> data
raw
2016-01-02 1753421.0
2016-01-03 1879108.0
2016-01-04 1462725.0
2016-01-05 1525162.0
2016-01-06 1424264.0
... ...
2018-10-24 1726884.0
2018-10-25 1685651.0
2018-10-26 1632952.0
2018-10-27 1850912.0
2018-10-28 2021929.0
>>> {'loss': 1 - auc, 'status': STATUS_OK}
{'loss': 0.3917824074074072, 'status': 'ok'}
"""
import numpy as np
import pandas as pd
from sklearn.metrics import log_loss
import copy
is_log_transformed = args[0]
data_shift_truncate = args[1]
fill_rate = args[2]
# Getting hyperparameters for lad structural model
if args[3]['model'] == 'LADStructuralModel':
max_ft_freq = args[3]['param']['max_ft_freq']
include_holidays_exog = args[3]['param']['include_holidays_exog']
p = args[3]['param']['p']
q = args[3]['param']['q']
ts_start = data.index.min()
ts_end = data.index.max()
max_ts_length = self.max_ts_length
min_ts_length = self.min_ts_length
freq = self.freq
scoring_length = self.scoring_length
train_end = (pd.Timestamp(ts_end) - pd.Timedelta("{}".format(scoring_length) + freq)).to_pydatetime()
score_start = (pd.Timestamp(train_end) + pd.Timedelta("1" + freq)).to_pydatetime()
training_data = data.loc[ts_start:train_end]
scoring_data = data.loc[score_start:ts_end]
try:
# Required data preprocessing before training and scoring
de_obj = DataExploration(freq=self.freq, min_ts_length=self.min_ts_length,
min_ts_mean=self.min_ts_mean,
max_ts_length=self.max_ts_length,
is_log_transformed=is_log_transformed,
data_shift_truncate=data_shift_truncate,
detection_type=self.detection_type,
fill_rate=fill_rate)
training_data, preprocess_summary = de_obj.profile(df=training_data)
is_log_transformed = preprocess_summary['is_log_transformed']
# Getting De-noised smoothed series for generating synthetic anomalies
smoothed_scoring_series = smoothed_series[-len(scoring_data):]
labels = []
probs = []
if args[3]['model'] == 'LADStructuralModel':
# LAD structural training and scoring
hyper_params = LADStructuralHyperParams(is_log_transformed=is_log_transformed, max_ft_freq=max_ft_freq,
include_holidays_exog=include_holidays_exog,
p=p, q=q)
lad_struct = LADStructuralModel(hyper_params.params, max_ts_length=max_ts_length,
min_ts_length=min_ts_length, freq=freq)
success, model_date, model = lad_struct.train(data=training_data, optimize=True, **preprocess_summary)
scr_idx = 0
obs = []
preds = []
# Scoring and anomaly classification for synthetic anomalies
for i, row in scoring_data.iterrows():
observed_value = row.raw
obs.append(observed_value)
result = model.score(observed_value, i)
prediction = result['Prediction']
preds.append(prediction)
std_error = result['StdErr']
observation = smoothed_scoring_series[scr_idx]
scr_idx = scr_idx + 1
anomaly_flags, anomaly_probabilities = self._synthetic_anomaly_check(prediction=prediction,
std_error=std_error,
observation=observation)
labels = labels + anomaly_flags
probs = probs + anomaly_probabilities
mape = self._mape(obs, preds)
elif args[3]['model'] == 'LADFilteringModel':
# LAD filtering training and scoring
hyper_params = LADFilteringHyperParams(is_log_transformed=is_log_transformed)
lad_filtering = LADFilteringModel(hyper_params.params, max_ts_length=max_ts_length,
min_ts_length=min_ts_length, freq=freq)
success, model_date, stable_model = lad_filtering.train(training_data, **preprocess_summary)
# Scoring and anomaly classification for synthetic anomalies
for prop in self.anomaly_intensity_list:
anomaly_flags_list = []
anomaly_probabilities_list = []
local_model = copy.deepcopy(stable_model)
for i, row in scoring_data.iterrows():
rnd = check_random_state(self.random_state)
trial_prob = rnd.random.uniform(0, 1, 1)
observed_value = row.raw
synthetic_actual = observed_value
if trial_prob < 0.4:
synthetic_actual = observed_value + (prop * observed_value)
anomaly_flags_list.append(1)
else:
anomaly_flags_list.append(0)
result, local_model = local_model.score(observed_value=observed_value, pred_date=i,
synthetic_actual=synthetic_actual)
anomaly_probabilities_list.append(result['AnomalyProbability'])
labels = labels + anomaly_flags_list
probs = probs + anomaly_probabilities_list
if args[3]['model'] == 'LADStructuralModel' and mape:
cost = np.sqrt(mape * log_loss(labels, probs))
else:
cost = log_loss(labels, probs)
except Exception as e:
return {'loss': 1e100, 'status': STATUS_OK}
return {'loss': cost, 'status': STATUS_OK}
def _optimize(self, data, objective_part, algo=tpe.suggest, max_evals=50):
"""
Optimization function that calls the hyperopt for a given set of hyperparameters
:param pandas.dataFrame data: Input time series data
:param python function objective_part: Partial objective function with the hyperparameters only
:param str algo: hyperopt optimization algorithm
:param int max_evals: Maximum number of evaluation
:return: Optimal hyperparameters
:rtype: dict
"""
import numpy as np
from functools import partial
from pykalman import KalmanFilter
# detection_type: [OutlierDetection, LaggedOutlierDetection, VolumeDropDetection]
if self.detection_type == 'OutlierDetection':
hyper_param_list = [
{'model': 'LADStructuralModel', 'param': {'max_ft_freq': hp.randint('max_ft_freq', 6) + 2,
'include_holidays_exog': hp.randint('include_holidays_exog', 1) + 1
if self.freq == 'D' else hp.randint('include_holidays_exog', 1),
'p': hp.randint('p', 6) + 1,
'q': hp.randint('q', 6) + 1}
},
{'model': 'LADFilteringModel'}]
space = [hp.randint('is_log_transformed',2),
hp.randint('data_shift_truncate', 2),
hp.uniform('fill_rate', 0.7, 1.0),
hp.choice('LuminaireModel', [
{'model': hyper_param_list[0]['model'], 'param': hyper_param_list[0]['param']},
{'model': hyper_param_list[1]['model']}])]
try:
series = data[self._target_metric].values
kf = KalmanFilter(random_state=self.random_state)
smoothed_series, cov_series = kf.em(series).smooth(series)
except:
raise ValueError('Kalman Smoothing requires more than one data point')
objective = partial(objective_part, data, smoothed_series)
else:
raise ValueError('Only `detection_type=OutlierDetection` is supported in hyperparameter optimization right now')
# Calling the optimization function
hyper_param = fmin(objective, space=space, algo=algo, max_evals=max_evals, show_progressbar=True, rstate=np.random.default_rng(self.random_state))
hyper_param['LuminaireModel'] = hyper_param_list[hyper_param['LuminaireModel']]['model']
if 'max_ft_freq' in hyper_param:
hyper_param['max_ft_freq'] = hyper_param['max_ft_freq'] + 2
if 'include_holidays_exog' in hyper_param and self.freq == 'D':
hyper_param['include_holidays_exog'] = hyper_param['include_holidays_exog'] + 1
if 'p' in hyper_param:
hyper_param['p'] = hyper_param['p'] + 1
if 'q' in hyper_param:
hyper_param['q'] = hyper_param['q'] + 1
return hyper_param
[docs]
def run(self, data, max_evals=50):
"""
This function runs hyperparameter optimization fort LAD batch outlier detection models
:param list[list] data: Input time series.
:param int max_evals: Number of iterations for hyperparameter optimization.
:return: Optimal hyperparameters.
:rtype: dict
>>> data
[[Timestamp('2020-01-01 00:00:00'), 1326.0],
[Timestamp('2020-01-02 00:00:00'), 1552.0],
[Timestamp('2020-01-03 00:00:00'), 1432.0],
. . . ,
[Timestamp('2020-06-06 00:00:00'), 1747.0],
[Timestamp('2020-06-07 00:00:00'), 1782.0]]
>>> hopt_obj = HyperparameterOptimization(freq='D', detection_type='OutlierDetection')
>>> hyper_params = hopt_obj._run(data=data, max_evals=5)
>>> hyper_params
{'LuminaireModel': 'LADStructuralModel', 'data_shift_truncate': 0, 'fill_rate': 0.8409249603686499,
'include_holidays_exog': 1, 'is_log_transformed': 1, 'max_ft_freq': 3, 'p': 4, 'q': 3}
"""
# Calling data exploration to perform imputation only
de_obj = DataExploration(freq=self.freq, detection_type=self.detection_type)
data, summary = de_obj.profile(df=data, impute_only=True)
if summary['success']:
return self._optimize(data=data, objective_part=self._objective_part, max_evals=max_evals)
else:
return None