I have a time-series dataset with two lables (0
and 1
). I am using Dynamic Time Warping (DTW) as a similarity measure for classification using k-nearest neighbour (kNN) as described in these two wonderful blog posts:
http://alexminnaar.com/2014/04/16/Time-Series-Classification-and-Clustering-with-Python.html
Arguments
---------
n_neighbors : int, optional (default = 5)
Number of neighbors to use by default for KNN
max_warping_window : int, optional (default = infinity)
Maximum warping window allowed by the DTW dynamic
programming function
subsample_step : int, optional (default = 1)
Step size for the timeseries array. By setting subsample_step = 2,
the timeseries length will be reduced by 50% because every second
item is skipped. Implemented by x[:, ::subsample_step]
"""
def __init__(self, n_neighbors=5, max_warping_window=10000, subsample_step=1):
self.n_neighbors = n_neighbors
self.max_warping_window = max_warping_window
self.subsample_step = subsample_step
def fit(self, x, l):
"""Fit the model using x as training data and l as class labels
Arguments
---------
x : array of shape [n_samples, n_timepoints]
Training data set for input into KNN classifer
l : array of shape [n_samples]
Training labels for input into KNN classifier
"""
self.x = x
self.l = l
def _dtw_distance(self, ts_a, ts_b, d = lambda x,y: abs(x-y)):
"""Returns the DTW similarity distance between two 2-D
timeseries numpy arrays.
Arguments
---------
ts_a, ts_b : array of shape [n_samples, n_timepoints]
Two arrays containing n_samples of timeseries data
whose DTW distance between each sample of A and B
will be compared
d : DistanceMetric object (default = abs(x-y))
the distance measure used for A_i - B_j in the
DTW dynamic programming function
Returns
-------
DTW distance between A and B
"""
# Create cost matrix via broadcasting with large int
ts_a, ts_b = np.array(ts_a), np.array(ts_b)
M, N = len(ts_a), len(ts_b)
cost = sys.maxint * np.ones((M, N))
# Initialize the first row and column
cost[0, 0] = d(ts_a[0], ts_b[0])
for i in xrange(1, M):
cost[i, 0] = cost[i-1, 0] + d(ts_a[i], ts_b[0])
for j in xrange(1, N):
cost[0, j] = cost[0, j-1] + d(ts_a[0], ts_b[j])
# Populate rest of cost matrix within window
for i in xrange(1, M):
for j in xrange(max(1, i - self.max_warping_window),
min(N, i + self.max_warping_window)):
choices = cost[i - 1, j - 1], cost[i, j-1], cost[i-1, j]
cost[i, j] = min(choices) + d(ts_a[i], ts_b[j])
# Return DTW distance given window
return cost[-1, -1]
def _dist_matrix(self, x, y):
"""Computes the M x N distance matrix between the training
dataset and testing dataset (y) using the DTW distance measure
Arguments
---------
x : array of shape [n_samples, n_timepoints]
y : array of shape [n_samples, n_timepoints]
Returns
-------
Distance matrix between each item of x and y with
shape [training_n_samples, testing_n_samples]
"""
# Compute the distance matrix
dm_count = 0
# Compute condensed distance matrix (upper triangle) of pairwise dtw distances
# when x and y are the same array
if(np.array_equal(x, y)):
x_s = np.shape(x)
dm = np.zeros((x_s[0] * (x_s[0] - 1)) // 2, dtype=np.double)
p = ProgressBar(shape(dm)[0])
for i in xrange(0, x_s[0] - 1):
for j in xrange(i + 1, x_s[0]):
dm[dm_count] = self._dtw_distance(x[i, ::self.subsample_step],
y[j, ::self.subsample_step])
dm_count += 1
p.animate(dm_count)
# Convert to squareform
dm = squareform(dm)
return dm
# Compute full distance matrix of dtw distnces between x and y
else:
x_s = np.shape(x)
y_s = np.shape(y)
dm = np.zeros((x_s[0], y_s[0]))
dm_size = x_s[0]*y_s[0]
p = ProgressBar(dm_size)
for i in xrange(0, x_s[0]):
for j in xrange(0, y_s[0]):
dm[i, j] = self._dtw_distance(x[i, ::self.subsample_step],
y[j, ::self.subsample_step])
# Update progress bar
dm_count += 1
p.animate(dm_count)
return dm
def predict(self, x):
"""Predict the class labels or probability estimates for
the provided data
Arguments
---------
x : array of shape [n_samples, n_timepoints]
Array containing the testing data set to be classified
Returns
-------
2 arrays representing:
(1) the predicted class labels
(2) the knn label count probability
"""
dm = self._dist_matrix(x, self.x)
# Identify the k nearest neighbors
knn_idx = dm.argsort()[:, :self.n_neighbors]
# Identify k nearest labels
knn_labels = self.l[knn_idx]
# Model Label
mode_data = mode(knn_labels, axis=1)
mode_label = mode_data[0]
mode_proba = mode_data[1]/self.n_neighbors
return mode_label.ravel(), mode_proba.ravel()
However, for classification with kNN the two posts use their own kNN algorithms.
I want to use sklearn's options such as gridsearchcv
in my classification. Therefore, I would like to know how I can use Dynamic Time Warping (DTW) with sklearn kNN.
Note: I am not limited to sklearn
and happy to receive answers in other libraries as well
I am happy to provide more details if needed.
You can use a custom metric for KNN. Therefore you only need to implement DTW yourself (or use/adapt any existing DTW implementation in python) [gist of this code].
import numpy as np
from scipy.spatial import distance
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report
#toy dataset
X = np.random.random((100,10))
y = np.random.randint(0,2, (100))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
#custom metric
def DTW(a, b):
an = a.size
bn = b.size
pointwise_distance = distance.cdist(a.reshape(-1,1),b.reshape(-1,1))
cumdist = np.matrix(np.ones((an+1,bn+1)) * np.inf)
cumdist[0,0] = 0
for ai in range(an):
for bi in range(bn):
minimum_cost = np.min([cumdist[ai, bi+1],
cumdist[ai+1, bi],
cumdist[ai, bi]])
cumdist[ai+1, bi+1] = pointwise_distance[ai,bi] + minimum_cost
return cumdist[an, bn]
#train
parameters = {'n_neighbors':[2, 4, 8]}
clf = GridSearchCV(KNeighborsClassifier(metric=DTW), parameters, cv=3, verbose=1)
clf.fit(X_train, y_train)
#evaluate
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))
Which yields
Fitting 3 folds for each of 3 candidates, totalling 9 fits
[Parallel(n_jobs=1)]: Done 9 out of 9 | elapsed: 29.0s finished
precision recall f1-score support
0 0.57 0.89 0.70 18
1 0.60 0.20 0.30 15
avg / total 0.58 0.58 0.52 33