# Modified Stylesheet for notebook.
from IPython.core.display import HTML
def css_styling():
styles = open("custom.css", "r").read()
return HTML(styles)
css_styling()
Notes and examples perhaps to use for a blog post or Tutorial on change detection.
References:
Michèle Basseville
- "Statistical methods for change detection" (2002)
- "Detecting Changes in Signals and Systems: A Survey" (1988) Automation, Vol. 2,t, No. 3, pp. 309-326
Aggarwall.
- "Outlier Detection" (2013 **check** )
Also see
Aman Ahuja http://pafnuty.wordpress.com/2013/02/05/reading-log-change-detection-papers-by-basseville/
We want to be able to detect change in a signal, or an ordered/chronological collection of data points. We'll probably use scalar values for each data point, but perhaps we can also consider vector values as well.
We'll start with the simplest signal we can imagine, and the simplest change detection method(s). From there we'll add signal complexity step by step in such a way that we demonstrate the limitations of previously discussed methods and have an excuse to consider more sophisticated approaches.
The solutions we use will be self-wrought code or imported from open source libraries, whatever seems appropriate for the moment.
Part 0: Introduction
Introduction (what is change detection)
Justification (why does change detection matter)
Offline vs. Online change detection
References and Further Reading
Part 1
A trivial signal
Signal 1
A "static mean" change detector
Framing the problem and some utility code #maybe move this to an appendix?
Part 2
Limitation of the static mean detector
Outliers
Noise
A noisy signal (with outliers?)
Signal 2
Streaming windows
Wilford's algorithm
A Z-score based detector
Part 3
Limitations of the Z-score change detector
Trend, slowly moving signals
Signal 3
%matplotlib
import matplotlib.pyplot as plt
import numpy as np
Using matplotlib backend: module://IPython.kernel.zmq.pylab.backend_inline
from collections import defaultdict
np.random.seed(seed=111111)
np.set_printoptions(precision=3, suppress=True)
The simplest signal we might be able to imagine is one that is completely constant except for exactly one increase or decrease in absolute value.
I'll use a signal size of 1000 points.
sig1 = np.ones(1000)
sig1[:500] = sig1[:500] * 50
sig1[500:] = sig1[500:] * 40
plot(sig1)
ylim(0,100)
title("sig1")
<matplotlib.text.Text at 0x40d8490>
An "offline" change detection algorithm is one in which we are given the entire signal and are attempting to "look back" and recognize where the change occcured.
An "online" algorithm is quite different, in which we imagine our signal as "streaming into" our detector, and at any given moment we only have knowledge of the most recent data point and whatever information we chose to retain about the history of the signal.
Reference: Sequential Analysis http://en.wikipedia.org/wiki/Sequential_analysis
My interest is in the latter "online" class of algorithms. That means I want to make sure data points are passed to the algorithm one by one, and that I am interested in
I also want to make sure that I am not cheating when I run my experiments.
To do all this, it is helpful to write some code that helps me simulate the online streaming scenario of interest.
I want to be able to realistically simulate streaming data for my online algorithm experiments. That is, I want to write code that simulates a scenario in which
The following code will provide this framework and enable us to run some experiments more conveniently. Feel free to skip this section.
def dict_to_arrays(ddict):
"""
Convenience function used by online simulator to bundle
residuals into a dict before returning
"""
new_dict = {}
for k,v in ddict.iteritems():
new_dict[k] = np.array(v)
return new_dict
def online_simulator(signal, change_detector):
"""
Function that simulates an online streaming scenario for change detection experiments.
---
Given a signal and a change detector, this simulator passes one signal data point at a time to
the change detector and processes the results.
"""
#Initiate
#change_detector = cd_algorithm()
all_residuals = defaultdict(list)
xx = 0
#Iterate through the signal, passing data points to the algorithm.
for value in signal:
#calculate residuals, compare residuals with the stopping rule(s)
check_results = next(change_detector.step(value))
#process results
rule_triggered = check_results[0]
res = check_results[1]
#store residuals
for k,v in res.iteritems():
all_residuals[k].append(v)
if rule_triggered == True:
#stopping rule was triggered
return (True, dict_to_arrays(all_residuals))
#Rule wasn't triggered by end of signal
return (False, dict_to_arrays(all_residuals))
def run_online_simulation(signal, change_detector, scale=True):
"""Run simulation and print results"""
#Run simulation
results = online_simulator(signal, change_detector)
#Display results
print_sim_results(signal, results, scale=scale)
#Return residuals
residuals = results[1]
return residuals
class change_detector(object):
"""
A change detection algorithm.
The algorithm calculates residuals and updates them for each new value passed.
Residuals are checked against stopping rules at each change, yielding either True or False, accordingly.
"""
def __init__(self):
#Interim and calculated values
self.signal_size = 0
self.total_val = 0
self.mean_ = np.nan
def update_residuals(self, new_signal_value):
#Update residuals
self.signal_size += 1
self.total_val += new_signal_value
self.mean_ = self.total_val / self.signal_size
def _get_residual_dict(self):
"""create a dictionary of residuals to return.
Inclues all class and instance variables ending in '_'
"""
residuals_dict = {}
for k,v in self.__dict__.iteritems():
if k.endswith('_'):
residuals_dict[k] = v
return residuals_dict
def check_stopping_rules(self, new_signal_value):
rules_triggered = False
#No rules implemented
pass
return rules_triggered
def _step(self, new_signal_value):
#update residuals
self.update_residuals(new_signal_value)
## compare residuals to stopping_rules
rules_triggered = self.check_stopping_rules(new_signal_value)
if rules_triggered:
yield (True, self._get_residual_dict())
else:
yield (False, self._get_residual_dict())
def step(self, new_signal_value):
return self._step(new_signal_value)
def print_sim_results(signal, results, **kwargs):
"""
Another convenience function to print out the results of our experiment.
"""
#Get results
stopped = results[0]
residuals = results[1]
print "Residuals: {}".format([res for res in residuals.viewkeys()])
if stopped:
#length of residuals array tells us when the rule was triggered
stop_point = len(residuals.itervalues().next())
print "Change detected. Stopping Rule triggered at {}.\n".format(stop_point)
plot_signal_and_residuals(signal, residuals, stop_point, **kwargs)
else:
print "Stopping rule not triggered."
plot_signal_and_residuals(signal, residuals, **kwargs)
def plot_signal_and_residuals(signal, residuals=None, stop_point=None, scale=True):
"""Convenience function to generate plots of the signal and the residuals"""
if residuals is None:
plotcount = 1
else:
plotcount = 1 + len(residuals)
fig, axes = plt.subplots(nrows=plotcount,
ncols = 1,
sharex=True,
figsize=(6, plotcount*3)
)
#First plot the signal
if plotcount > 1:
ax = axes[0]
elif plotcount == 1:
ax = axes
ax.plot(signal)
ax.set_title('Signal')
#Scale signal
ax.set_ylim(signal.min()*.5, signal.max()*1.5)
ax.set_xlim(0, len(signal))
#Plot a horizontal line where the stop_point is indicated
if stop_point is not None:
assert (stop_point > 0) & (stop_point < len(signal))
ax.vlines(x=stop_point, ymin=0, ymax=ax.get_ylim()[1],
colors='r', linestyles='dotted')
#Now plot each residual
if residuals is not None:
for ii, (res_name, res_values) in enumerate(residuals.iteritems()):
ax = axes[ii+1]
ax.plot(res_values)
ax.set_title("Residual #{}: {}".format(ii+1, res_name))
if scale:
ax.set_ylim(res_values.min()*0.5, res_values.max() * 1.5)
ax.vlines(x=stop_point, ymin=0, ymax=ax.get_ylim()[1],
colors='r', linestyles='dotted')
blank_detector = change_detector()
residuals = run_online_simulation(sig1, blank_detector)
Residuals: ['mean_'] Stopping rule not triggered.
Now what we got all that scaffolding, we can get to the good stuff and start experimenting with signals and change detectors.
We can upgrade our "basic" cd_algorithm with actual functionality.
What we'll do is collect the mean, and create a stopping rules based on if the actual value differs from the mean by more than some %.
class cd_static_mean_detector(change_detector):
"""
A change detection algorithm.
The algorithm calculates residuals and updates them for each new value passed.
Residuals are checked against stopping rules at each change, yielding either True or False, accordingly.
"""
def __init__(self, threshold=0.05):
#hyper-parameter(s)
self.threshold = threshold
#Interim and calculated values
self.signal_size = 0
self.total_val = 0
#... and residuals
self.diff_ = np.nan #np.zeros(1)
self.mean_ = np.nan #np.zeros(1)
def update_residuals(self, new_signal_value):
#Update residuals
self.signal_size += 1
self.total_val += new_signal_value
self.mean_ = self.total_val / self.signal_size
self.diff_ = np.absolute(self.mean_ - new_signal_value)
def check_stopping_rules(self, new_signal_value):
rules_triggered = False
#check if new value is more than % different from mean
threshold_level = self.mean_ * self.threshold
if self.diff_ > threshold_level:
#import pdb; pdb.set_trace();
rules_triggered = True
return rules_triggered
def step(self, new_signal_value):
return self._step(new_signal_value)
#Create detector
static_mean_detector = cd_static_mean_detector(threshold=0.05)
residuals = run_online_simulation(sig1, static_mean_detector)
Residuals: ['mean_', 'diff_'] Change detected. Stopping Rule triggered at 501.
#Note: Easily accomodates a scaling of sig1
# as the threshold is given as a percentage.
run_online_simulation(
sig1*100, #scale!
cd_static_mean_detector(threshold=0.05),
);
Residuals: ['mean_', 'diff_'] Change detected. Stopping Rule triggered at 501.
#Create a signal
sig2a = np.ones(1000)
sig2a[:650] = 60
sig2a[650:] = 70
noise = np.random.randn(1000)
sig2a = sig2a + noise
#What does this look like?
plot_signal_and_residuals(sig2a)
#Will our static mean detector work on this signal?
run_online_simulation(
sig2a,
cd_static_mean_detector(threshold=0.05),
scale=True
);
Residuals: ['mean_', 'diff_'] Change detected. Stopping Rule triggered at 190.
#Let's try to adjust the threshold so it is less sensitive
run_online_simulation(
sig2a,
cd_static_mean_detector(threshold=0.2),
scale=False
);
Residuals: ['mean_', 'diff_'] Stopping rule not triggered.
# Here's a signal with some outlier data points
sig2b = np.ones(1000) * 50
sig2b[np.random.randint(0, len(sig2b), size=10)] = 0
sig2b[np.random.randint(0, len(sig2b), size=10)] = 100
#Try running the simulation here
run_online_simulation(
sig2b,
cd_static_mean_detector(threshold=0.5),
scale=True
);
Residuals: ['mean_', 'diff_'] Change detected. Stopping Rule triggered at 35.
def welford(vals):
#via https://gist.github.com/alexalemi/2151722
m = vals[0]
s = 0
for k in range(1, len(vals)):
oldm = m
x = vals[k]
newm = oldm + (x - oldm) / (k + 1)
s = s + (x - newm) * (x - oldm)
m = newm
return s / (k+1)
#Testing
seq = np.random.randn(100)
print welford(seq)
print np.var(seq)
0.86975399197 0.86975399197
class welford_demo(change_detector):
"""
Testing Welford's algorithm
implemented like our other change detector
"""
def __init__(self):
#Interim and calculated values
self.k = 0 #k is the signal_size
#For Welford's
self.mean_ = 0.0
self.s = 0.0 # var = s / (k + 1)
def update_residuals(self, new_signal_value):
#Update residuals
#Welford's.
oldm = self.mean_
x = new_signal_value
newm = oldm + (x - oldm) / (self.k + 1)
self.s = self.s + (x - newm) * (x - oldm)
self.mean_ = newm
self.var_ = self.s / (self.k+1)
self.k += 1
welford = welford_demo()
residuals = run_online_simulation(
sig2a,
welford,
scale=False,
)
Residuals: ['var_', 'mean_'] Stopping rule not triggered.
# Compare to test
print "{:<40} : {:>}".format(
"Variance via Welford's algorithm",
residuals['var_'][-1] #(the last value is the variance after the entire signal has been processed.)
)
print "{:<40} : {:>}".format(
"Variance via Numpy",
np.var(sig2a),
)
print "{:<40} : {:>}".format(
"Mean via Welford's algorithm",
residuals['mean_'][-1] #(the last value is the variance after the entire signal has been processed.)
)
print "{:<40} : {:>}".format(
"Mean via Numpy",
np.mean(sig2a),
)
Variance via Welford's algorithm : 23.5588721526 Variance via Numpy : 23.5588721526 Mean via Welford's algorithm : 63.4650889229 Mean via Numpy : 63.4650889229
welford.__dict__
{'k': 1000, 'mean_': 63.465088922885641, 's': 23558.872152624277, 'var_': 23.558872152624279}
Okay now let's return to the problem of building a change detector for a signal with noise and outliers
#Create a signal
sig2 = np.ones(1000) * 50
sig2[650:] = 70
#Add outliers
sig2[np.random.randint(0, len(sig2), size=10)] = 0
sig2[np.random.randint(0, len(sig2), size=10)] = 100
#Add noise
noise = np.random.randn(1000)
sig2 = sig2 + noise
plot_signal_and_residuals(sig2)
When comparing two gaussian models, a z-score is the distance from the sample mean to the population mean in units of the standard error: $$z_{score} = \frac{(M - \sigma )}{SE}$$
Our strategy here is two-fold.
from collections import deque
class z_score_detector(change_detector):
"""
A change detection algorithm based on z-scores
---
1. We will use Welford's algorithm to collect mean and variance
of the full stream.
2. At the same time, we will collect a window of recent
observations and compute local statistics on this sample.
3. Then, we compute a Z-score to compare the sample (window)
distribution to the population (global) distribution.
Our stopping rule is based on this computed z-score
"""
def __init__(self, window_size = 100, threshold=0.05):
#hyper-parameter(s)
self.threshold = threshold
self.window_size = window_size
#Interim and calculated values
self.k = 0 #k is the signal_size
#For Welford's
self.g_mean = 0.0
self.s = 0.0 # var = s / (k + 1)
#For the Window
self.window = deque(maxlen = window_size)
#... and residuals
self.z_score_ = np.nan
def update_residuals(self, new_signal_value):
"""update residuals """
x = new_signal_value
#Welford's on global window
oldm = self.g_mean
newm = oldm + (x - oldm) / (self.k + 1)
self.s = self.s + (x - newm) * (x - oldm)
self.g_mean = newm
#Local window (sample)
self.window.append(x)
#Calculate z-score
w_mean = np.mean(self.window)
w_std = np.std(self.window)
mean_diff = (self.g_mean - w_mean) / self.g_mean
g_std = np.sqrt(self.s / (self.k+1)) #global std
std_diff = (g_std - w_std) / g_std
SE = g_std / np.sqrt(self.window_size)
self.z_score_ = (w_mean - self.g_mean) / SE
self.window_mean_ = w_mean
self.global_var_ = self.s / (self.k+1)
self.k += 1
def check_stopping_rules(self, new_signal_value):
rules_triggered = False
#Stopping rule!
if np.abs(self.z_score_) > self.threshold:
#import pdb; pdb.set_trace();
rules_triggered = True
return rules_triggered
# On Signal 2
residuals = run_online_simulation(
sig2,
z_score_detector(window_size = 100, threshold=10.0),
scale=False,
)
Residuals: ['z_score_', 'window_mean_', 'global_var_'] Change detected. Stopping Rule triggered at 705.
-c:55: RuntimeWarning: invalid value encountered in double_scalars -c:58: RuntimeWarning: invalid value encountered in double_scalars
# How does the Zscore detector work on our first signal?
# On Signal 1
residuals = run_online_simulation(
sig1,
z_score_detector(window_size = 100, threshold=10.0),
scale=False,
)
Residuals: ['z_score_', 'window_mean_', 'global_var_'] Change detected. Stopping Rule triggered at 528.
Since we are calculating window statistics instead of operating on the immediate value, our change detector now takes a little bit of time to detect the change in both signal 1 and signal 2.
This is a characteristic of online change detectors. In this type of algorithm, there is a trade off between the delay for detection and
"Apart from the tradeoff between the mean time between false alarms and the delay for detection -- both increasing when the sensitivity of the detector to high frequencies decreases -- there exists another tradeoff to be kept in mind which is closely related to the first one: efficiency vs complexity. -- Basseville (Survey)"
The optimal design for residuals is such that they are:
usu. then there are two steps:
that maximally satisfy above conditions
designing decision rules based on the res.