Select data vectors by similarity using a metric score¶
Marcos Duarte
Laboratory of Biomechanics and Motor Control
Federal University of ABC, Brazil
import numpy as np
import matplotlib.pyplot as plt
import sys
sys.path.insert(1, r'./../functions')
from simila import similarity, mse
%load_ext autoreload
%autoreload 2
%load_ext line_profiler
%load_ext watermark
%watermark -u -t -d -m -v --iversions
np.set_printoptions(precision=3)
Last updated: 2023-08-08 18:09:03 Python implementation: CPython Python version : 3.11.4 IPython version : 8.14.0 Compiler : GCC 12.2.0 OS : Linux Release : 6.2.0-26-generic Machine : x86_64 Processor : x86_64 CPU cores : 16 Architecture: 64bit numpy : 1.25.2 sys : 3.11.4 | packaged by conda-forge | (main, Jun 10 2023, 18:08:17) [GCC 12.2.0] matplotlib: 3.7.2
help(similarity)
Help on function similarity in module simila: similarity(y: numpy.ndarray, axis1: int = 0, axis2: int = 1, threshold: float = 0, nmin: int = 3, repeat: bool = True, metric: Callable = <function mse at 0x7f982168dbc0>, drop=True, msg: bool = True, **kwargs: Callable) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray] Select vectors in array by their similarity using a metric score. For example, if `y` is a 2-D numpy.ndarray, with shape (n, m), `axis1`=0 (n is the number of rows) and `axis2`=1 (m is the number of columns), this function will select the vectors along the columns, that are more similar to a `central` statistics of `y` or to a `target`, using a `metric` score. The metric score can be calculated repeatedly until all selected vectors have a `metric` score not greater than a `threshold`, but the minimum number of vectors to keep or the maximum number of vectors to discard can be specified with parameter `nmin`. The default `metric` and target are the mean squared error (`mse`) of `y` w.r.t. the median of `y` along `axis2`. The `mse` metric is equivalent to the squared Euclidean distance and it is prefered because it penalizes largest differences more than the Euclidian distance. But any other `metric` that can be calculated with a function can be used. A possible use of this function is to discard the time-series data from bad trials (treated as outliers) stored in a 2-D array (each trial as a column and instants as rows) where the criterion is the similarity of the trial w.r.t. the median trial (the median statistics is more robust to remove outliers than the mean in case there are very bad trials). After the bad trials are discarded, the mean of all trials could then be calculated more reliablly. Parameters ---------- y : numpy.ndarray Array for the calculation of similarity (defined by a `metric`) of its vectors w.r.t. a `target` or a `central` statistics of `y`. axis1 : integer, optional, default = 0 Axis of `y` for which the `metric` will be calculated at each value and possibly averaged in the `metric` calculation. axis2 : integer, optional, default = 1 Axis of `y` along which the different vectors are going to be compared with the `threshold` for their similarity (using their `metric` score). threshold : float, optional, default = 0 If greater than 0, vector with `metric` score above this value will be discarded. If 0, threshold will be automatically calculated as the minimum of [qs[2] + 1.5*(qs[2]-qs[0]), score[-2], 3], where qs are the quantiles and score[-2] is the before-last largest score of `metric` among vectors calculated at the first time, not updated by the `repeat` option. nmin : integer, optional, default = 3 If greater than 0, minumum number of vectors to keep. If lower than 0, maximum number of vectors to discard. repeat :bool, optional, default = True Whether to calculate similarity `metric` repeatedly, updating the score calculation each time a vector is discarded. With `repeat` True, the output `scores` will contain at each row the updated score values for the used `metric` for each data vector. The first row will contain the calculated original scores before any vector was discarded. On the next equent rows, the vectors discarded are represented by NaN values and the kept vectors by their updated scores. The last row will contain the updated scores of the final vectors kept. With the `repeat` False, the comparison of score values with `threshold` are made only once and vectors are discarded accordingly at once. In this case, the output `scores` will contain only two rows, the first row will contain the calculated original scores before any vectors were discarded. At the second row, the vectors discarded are represented with NaN values and the kept vectors by their updated scores. metric : optional, default=mse Function to use as metric to compute similarity. drop : bool, optional, default = True Whether to drop (delete) the discarded vectors from `y` in the output. If False, the values of the vectors discarded will be replaced by nans and the returned array will have the same dimensions as the original array. msg : bool, optional, default = True Whether to print some messages. kwargs : optional Options for the metric function (e.g., see `mse` function). Returns ------- y : numpy.ndarray Array similar to input `y` but with vectors discarded (deleted) if option `drop` is True or with all values of vectors discarded replaced by nans if option `drop` is False. ikept : numpy.ndarray Indexes of kept vectors. inotkept : numpy.ndarray Indexes of not kept (discarded or replaced by nan) vectors. scores : 2-D numpy.ndarray Metric score values of each vector (as columns) for each round of vector selection (one row per round plus the final values). References ---------- .. [1] https://nbviewer.org/github/BMClab/BMC/blob/master/notebooks/Similarity.ipynb Examples -------- >>> import matplotlib.pyplot as plt >>> rng = np.random.default_rng() >>> t, n = 100, 10 >>> y = rng.random((t, n)) / 2 >>> y = y + np.atleast_2d(2*np.sin(2*np.pi*np.linspace(0, 1, t))).T >>> for i in range(0, n, 2): >>> j = rng.integers(t-20) >>> p = rng.integers(20) >>> y[j:j+p, i] = y[j:j+p, i] + rng.integers(10) - 5 >>> y[:, i] += rng.integers(4) - 2 >>> ysr, ikeptr, inotkeptr, scoresr = similarity(y) >>> ysn, ikeptn, inotkeptn, scoresn = similarity(y, repeat=False) >>> fig, axs = plt.subplots(3, 1, sharex=True, figsize=(8, 8)) >>> axs[0].plot(y, label=list(range(n))) >>> axs[0].legend(loc=(1.01, 0)) >>> axs[0].set_title(f'Original vectors (n={n})') >>> axs[1].plot(ysr, label= ikeptr.tolist()) >>> axs[1].set_title(f'Vectors maintained with repeat selection (n={len(ikeptr)})') >>> axs[1].legend(loc=(1.01, 0)) >>> axs[2].plot(ysn, label= ikeptn.tolist()) >>> axs[2].set_title(f'Vectors maintained without repeat selection (n={len(ikeptn)})') >>> axs[2].legend(loc=(1.01, 0)) >>> plt.show() Version history --------------- '1.0.0': First release version
help(mse)
Help on function mse in module simila: mse(y: numpy.ndarray, target: numpy.ndarray | None = None, axis1: int = 0, axis2: int = 1, central: Callable = <function nanmedian at 0x7f98cc1059b0>, normalization: Callable = <function nanmedian at 0x7f98cc1059b0>) -> numpy.ndarray Mean Squared Error of `y` w.r.t. `target` or `central` along `axis2` at `axis1`. Parameters ---------- y : numpy.ndarray At least a 2-D numpy.ndarray of data for the calculation of mean squared error w.r.t. a `target` or a `central` statistics of the data. target : 1-D numpy.ndarray of length `axis1`, optional, default = None Reference value to calculate the mean squared error of `y` w.r.t. this vector. If it is None, the mse value will be calculated w.r.t. a `central` calculated along `axis2` of `y`. axis1 : integer, optional, default = 0 Axis of `y` for which the mse will be calculated at each value. axis2 : integer, optional, default = 1 Axis of `y` along which the `central` statistics might be calculated or along which the target will be subtracted. central : Python function, optional, default = np.nanmedian Function to calculate statistics on `y` w.r.t. mse is computed. normalization : Python function, optional, default = np.nanmedian Function to normalize the calculated mse values Returns ------- score : numpy.ndarray Mean Squared Error values References ---------- .. [1] https://nbviewer.org/github/BMClab/BMC/blob/master/notebooks/Similarity.ipynb Examples -------- >>> import numpy as np >>> rng = np.random.default_rng() >>> y = rng.random((100, 10)) >>> y = y + np.atleast_2d(np.sin(2*np.pi*np.linspace(0, 1, 100))).T >>> mse(y, axis1=0, axis2=1, central=np.nanmedian, normalization=np.nanmedian) Version history --------------- '1.0.0': First release version
>>> import matplotlib.pyplot as plt
>>> rng = np.random.default_rng()
>>> t, n = 100, 10
>>> y = rng.random((t, n)) / 2
>>> y += np.atleast_2d(2*np.sin(2*np.pi*np.linspace(0, 1, t))).T
>>> for i in range(0, n, 2):
>>> j = rng.integers(t-20)
>>> p = rng.integers(20)
>>> y[j:j+p, i] = y[j:j+p, i] + rng.integers(10) - 5
>>> y[:, i] += rng.integers(4) - 2
>>> ysr, ikeptr, inotkeptr, scoresr = similarity(y)
>>> ysn, ikeptn, inotkeptn, scoresn = similarity(y, repeat=False)
>>> fig, axs = plt.subplots(3, 1, sharex=True, figsize=(8, 8))
>>> axs[0].plot(y, label=list(range(n)))
>>> axs[0].legend(loc=(1.01, 0))
>>> axs[0].set_title(f'Original vectors (n={n})')
>>> axs[1].plot(ysr, label= ikeptr.tolist())
>>> axs[1].set_title(f'Vectors maintained with repeat selection (n={len(ikeptr)})')
>>> axs[1].legend(loc=(1.01, 0))
>>> axs[2].plot(ysn, label= ikeptn.tolist())
>>> axs[2].set_title(f'Vectors maintained without repeat selection (n={len(ikeptn)})')
>>> axs[2].legend(loc=(1.01, 0))
>>> plt.show()
Calculated threshold: 3.0 Vectors discarded (dimension 1, n=5): [8 4 2 0 6] Calculated threshold: 3.0 Vectors discarded (dimension 1, n=2): [8 0]
ys, ikept, inotkept, scores = similarity(y, threshold=0, nmin=3, repeat=True, metric=mse,
msg=1, central=np.nanmedian, normalization=np.nanmedian)
Calculated threshold: 3.0 Vectors discarded (dimension 1, n=5): [8 4 2 0 6]
ys.shape
(100, 5)
ikept
array([1, 3, 5, 7, 9])
inotkept
array([8, 4, 2, 0, 6])
scores
array([[3.064e+00, 3.714e-02, 2.942e+00, 4.471e-02, 2.952e+00, 4.792e-02, 1.952e+00, 4.131e-02, 3.177e+00, 4.547e-02], [5.671e+01, 7.606e-01, 5.623e+01, 8.209e-01, 5.938e+01, 1.000e+00, 4.111e+01, 8.213e-01, nan, 9.374e-01], [6.461e+01, 8.569e-01, 6.616e+01, 8.682e-01, nan, 9.941e-01, 4.917e+01, 1.006e+00, nan, 9.440e-01], [6.780e+01, 8.077e-01, nan, 9.623e-01, nan, 8.091e-01, 4.691e+01, 1.070e+00, nan, 1.000e+00], [ nan, 9.317e-01, nan, 9.755e-01, nan, 8.965e-01, 4.548e+01, 1.025e+00, nan, 1.126e+00], [ nan, 8.394e-01, nan, 1.000e+00, nan, 8.408e-01, nan, 1.112e+00, nan, 1.039e+00]])
ys, ikept, inotkept, scores = similarity(y, threshold=0, nmin=3, repeat=True, metric=mse,
drop=False, msg=True, central=np.nanmedian, normalization=np.nanmedian)
Calculated threshold: 3.0 Vectors discarded (dimension 1, n=5): [8 4 2 0 6]
ys.shape
(100, 10)
ys, ikept, inotkept, scores = similarity(y, threshold=0, nmin=3, repeat=False, metric=mse,
msg=True, central=np.nanmedian, normalization=np.nanmedian)
Calculated threshold: 3.0 Vectors discarded (dimension 1, n=2): [8 0]
%%timeit
ys, ikept, inotkept, score_all = similarity(y, repeat=True, msg=False)
2.31 ms ± 20 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%%timeit
ys, ikept, inotkept, score_all = similarity(y, repeat=False, msg=False)
887 µs ± 21 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
The computation of the metric score (in the example above, mse
) takes 86% of the funtion time. Let's look on that.
%%timeit
score = mse(y, central=np.nanmedian, normalization=np.nanmedian)
328 µs ± 3.24 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
%%timeit
score = mse(y, central=np.nanmean, normalization=np.nanmean)
77.4 µs ± 4.13 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
Using median
to calculate statistics slows down the code by about 4 times compared to using mean
.
%lprun -f similarity similarity(y, msg=False)
Timer unit: 1e-09 s Total time: 0.00471052 s File: /home/marcos/adrive/Python/BMC/notebooks/./../functions/simila.py Function: similarity at line 87 Line # Hits Time Per Hit % Time Line Contents ============================================================== 87 def similarity(y: np.ndarray, axis1: int = 0, axis2: int = 1, threshold: float = 0, 88 nmin: int = 3, repeat: bool = True, metric: Callable = mse, 89 drop=True, msg: bool = True, **kwargs: Callable 90 ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: 91 92 """Select vectors in array by their similarity using a metric score. 93 94 For example, if `y` is a 2-D numpy.ndarray, with shape (n, m), `axis1`=0 95 (n is the number of rows) and `axis2`=1 (m is the number of columns), this 96 function will select the vectors along the columns, that are more similar 97 to a `central` statistics of `y` or to a `target`, using a `metric` score. 98 The metric score can be calculated repeatedly until all selected vectors 99 have a `metric` score not greater than a `threshold`, but the minimum 100 number of vectors to keep or the maximum number of vectors to discard 101 can be specified with parameter `nmin`. 102 103 The default `metric` and target are the mean squared error (`mse`) of `y` 104 w.r.t. the median of `y` along `axis2`. The `mse` metric is equivalent 105 to the squared Euclidean distance and it is prefered because it 106 penalizes largest differences more than the Euclidian distance. But any 107 other `metric` that can be calculated with a function can be used. 108 109 A possible use of this function is to discard the time-series data from bad 110 trials (treated as outliers) stored in a 2-D array (each trial as a column 111 and instants as rows) where the criterion is the similarity of the trial 112 w.r.t. the median trial (the median statistics is more robust to remove 113 outliers than the mean in case there are very bad trials). After the bad 114 trials are discarded, the mean of all trials could then be calculated more 115 reliablly. 116 117 Parameters 118 ---------- 119 y : numpy.ndarray 120 Array for the calculation of similarity (defined by a `metric`) of its 121 vectors w.r.t. a `target` or a `central` statistics of `y`. 122 axis1 : integer, optional, default = 0 123 Axis of `y` for which the `metric` will be calculated at each value and 124 possibly averaged in the `metric` calculation. 125 axis2 : integer, optional, default = 1 126 Axis of `y` along which the different vectors are going to be compared 127 with the `threshold` for their similarity (using their `metric` score). 128 threshold : float, optional, default = 0 129 If greater than 0, vector with `metric` score above this value will be 130 discarded. If 0, threshold will be automatically calculated as the 131 minimum of [qs[2] + 1.5*(qs[2]-qs[0]), score[-2], 3], where qs are the 132 quantiles and score[-2] is the before-last largest score of `metric` 133 among vectors calculated at the first time, not updated by the `repeat` 134 option. 135 nmin : integer, optional, default = 3 136 If greater than 0, minumum number of vectors to keep. 137 If lower than 0, maximum number of vectors to discard. 138 repeat :bool, optional, default = True 139 Whether to calculate similarity `metric` repeatedly, updating the 140 score calculation each time a vector is discarded. 141 With `repeat` True, the output `scores` will contain at each row 142 the updated score values for the used `metric` for each data vector. 143 The first row will contain the calculated original scores before any 144 vector was discarded. On the next equent rows, the vectors discarded 145 are represented by NaN values and the kept vectors by their updated 146 scores. 147 The last row will contain the updated scores of the final vectors kept. 148 With the `repeat` False, the comparison of score values with 149 `threshold` are made only once and vectors are discarded accordingly at 150 once. In this case, the output `scores` will contain only two rows, the 151 first row will contain the calculated original scores before any 152 vectors were discarded. At the second row, the vectors discarded are 153 represented with NaN values and the kept vectors by their updated 154 scores. 155 metric : optional, default=mse 156 Function to use as metric to compute similarity. 157 drop : bool, optional, default = True 158 Whether to drop (delete) the discarded vectors from `y` in the output. 159 If False, the values of the vectors discarded will be replaced by nans 160 and the returned array will have the same dimensions as the original 161 array. 162 msg : bool, optional, default = True 163 Whether to print some messages. 164 kwargs : optional 165 Options for the metric function (e.g., see `mse` function). 166 167 Returns 168 ------- 169 y : numpy.ndarray 170 Array similar to input `y` but with vectors discarded (deleted) if 171 option `drop` is True or with all values of vectors discarded replaced 172 by nans if option `drop` is False. 173 ikept : numpy.ndarray 174 Indexes of kept vectors. 175 inotkept : numpy.ndarray 176 Indexes of not kept (discarded or replaced by nan) vectors. 177 scores : 2-D numpy.ndarray 178 Metric score values of each vector (as columns) for each round of 179 vector selection (one row per round plus the final values). 180 181 References 182 ---------- 183 .. [1] https://nbviewer.org/github/BMClab/BMC/blob/master/notebooks/Similarity.ipynb 184 185 Examples 186 -------- 187 >>> import matplotlib.pyplot as plt 188 >>> rng = np.random.default_rng() 189 >>> t, n = 100, 10 190 >>> y = rng.random((t, n)) / 2 191 >>> y = y + np.atleast_2d(2*np.sin(2*np.pi*np.linspace(0, 1, t))).T 192 >>> for i in range(0, n, 2): 193 >>> j = rng.integers(t-20) 194 >>> p = rng.integers(20) 195 >>> y[j:j+p, i] = y[j:j+p, i] + rng.integers(10) - 5 196 >>> y[:, i] += rng.integers(4) - 2 197 >>> ysr, ikeptr, inotkeptr, scoresr = similarity(y) 198 >>> ysn, ikeptn, inotkeptn, scoresn = similarity(y, repeat=False) 199 >>> fig, axs = plt.subplots(3, 1, sharex=True, figsize=(8, 8)) 200 >>> axs[0].plot(y, label=list(range(n))) 201 >>> axs[0].legend(loc=(1.01, 0)) 202 >>> axs[0].set_title(f'Original vectors (n={n})') 203 >>> axs[1].plot(ysr, label= ikeptr.tolist()) 204 >>> axs[1].set_title(f'Vectors maintained with repeat selection (n={len(ikeptr)})') 205 >>> axs[1].legend(loc=(1.01, 0)) 206 >>> axs[2].plot(ysn, label= ikeptn.tolist()) 207 >>> axs[2].set_title(f'Vectors maintained without repeat selection (n={len(ikeptn)})') 208 >>> axs[2].legend(loc=(1.01, 0)) 209 >>> plt.show() 210 211 Version history 212 --------------- 213 '1.0.0': 214 First release version 215 """ 216 217 1 9085.0 9085.0 0.2 logger.debug('Similarity...') 218 219 1 1620.0 1620.0 0.0 if y.ndim < 2: 220 raise ValueError('The input array must be at least a 2-D array.') 221 1 10706.0 10706.0 0.2 y = y.copy() 222 1 1153430.0 1153430.0 24.5 score: np.ndarray = metric(y, axis1=axis1, axis2=axis2, **kwargs) 223 1 8608.0 8608.0 0.2 scores: np.ndarray = np.atleast_2d(score) 224 1 4393.0 4393.0 0.1 ikept: np.ndarray = np.where(~np.isnan(score))[0] # indexes of kept vectors 225 1 1715.0 1715.0 0.0 inotkept: np.ndarray = np.where(np.isnan(score))[0] # indexes of discarded vectors 226 1 7560.0 7560.0 0.2 idx: np.ndarray = np.argsort(score) 227 1 791.0 791.0 0.0 score = score[idx] 228 1 3164.0 3164.0 0.1 nkept: int = np.count_nonzero(~np.isnan(score)) # number of kept vectors 229 1 368.0 368.0 0.0 if nkept < 3: 230 logger.debug('nkept: %s', nkept) 231 raise ValueError('The input array must have at least 3 valid vectors.') 232 1 222.0 222.0 0.0 if nmin < 0: 233 nmin = np.max([3, nkept + nmin]) 234 1 167.0 167.0 0.0 if threshold == 0: # automatic threshold calculation 235 1 186565.0 186565.0 4.0 qs: np.ndarray = np.nanquantile(a=score, q=[.25, .50, .75]) 236 1 16580.0 16580.0 0.4 threshold = np.min([qs[2] + 1.5*(qs[2]-qs[0]), score[-2], 3.]) 237 1 223.0 223.0 0.0 if msg: 238 print(f'Calculated threshold: {threshold}') 239 240 1 332.0 332.0 0.0 if not repeat: # discard all vectors at once 241 idx2: np.ndarray = np.nonzero(score > threshold)[0] # vectors to discard 242 if len(idx2) > 0: 243 if nkept > nmin: # keep at least nmin vectors 244 inotkept = np.r_[inotkept, idx[idx2[-(y.shape[axis2] - nmin):]][::-1]] 245 y.swapaxes(0, axis2)[inotkept, ...] = np.nan 246 score = metric(y, axis1=axis1, axis2=axis2, **kwargs) 247 scores = np.vstack((scores, score)) 248 logger.debug('not repeat - score: %s', score) 249 else: # discard vector with largest updated score one by one 250 5 5300.0 1060.0 0.1 while nkept > nmin and score[nkept-1] > threshold: 251 5 135611.0 27122.2 2.9 inotkept = np.r_[inotkept, idx[nkept-1]] 252 5 14350.0 2870.0 0.3 y.swapaxes(0, axis2)[inotkept[-1], ...] = np.nan 253 5 2891765.0 578353.0 61.4 score = metric(y, axis1=axis1, axis2=axis2, **kwargs) 254 5 67722.0 13544.4 1.4 scores = np.vstack((scores, score)) 255 5 29507.0 5901.4 0.6 idx = np.argsort(score) 256 5 3727.0 745.4 0.1 score = score[idx] 257 5 1410.0 282.0 0.0 nkept = nkept - 1 258 5 7003.0 1400.6 0.1 logger.debug('repeat - nkept: %s, score: %s', nkept, score) 259 260 1 528.0 528.0 0.0 if len(inotkept): 261 1 140764.0 140764.0 3.0 ikept = np.setdiff1d(ikept, inotkept) 262 1 211.0 211.0 0.0 if drop: 263 1 6709.0 6709.0 0.1 y = y.swapaxes(0, axis2)[ikept, ...].swapaxes(0, axis2) 264 1 155.0 155.0 0.0 if msg: 265 print( 266 f'Vectors discarded (dimension {axis2}, n={len(inotkept)}): {inotkept}') 267 268 1 232.0 232.0 0.0 return y, ikept, inotkept, scores
similarity
¶# %load ./../functions/simila.py
"""Select vectors in array by their similarity using a metric score.
"""
import logging
from typing import Callable
import numpy as np
__author__ = 'Marcos Duarte, https://github.com/demotu/BMC'
__version__ = 'simila.py v.1.0.0 20123/07/31'
__license__ = "MIT"
# set logger and configure it not to confuse with matplotlib debug messages
logger = logging.getLogger(__name__)
logger.setLevel(20) # 0: NOTSET, 10: DEBUG, 20: INFO, 30: WARNING, 40: ERROR, 50: CRITICAL
ch = logging.StreamHandler()
formatter = logging.Formatter('{name}: {levelname}: {message}', style='{')
ch.setFormatter(formatter)
logger.addHandler(ch)
def mse(y: np.ndarray, target: np.ndarray | None = None, axis1: int = 0, axis2: int = 1,
central: Callable = np.nanmedian, normalization: Callable = np.nanmedian
) -> np.ndarray:
"""Mean Squared Error of `y` w.r.t. `target` or `central` along `axis2` at `axis1`.
Parameters
----------
y : numpy.ndarray
At least a 2-D numpy.ndarray of data for the calculation of mean squared
error w.r.t. a `target` or a `central` statistics of the data.
target : 1-D numpy.ndarray of length `axis1`, optional, default = None
Reference value to calculate the mean squared error of `y` w.r.t. this
vector. If it is None, the mse value will be calculated w.r.t. a `central`
calculated along `axis2` of `y`.
axis1 : integer, optional, default = 0
Axis of `y` for which the mse will be calculated at each value.
axis2 : integer, optional, default = 1
Axis of `y` along which the `central` statistics might be calculated or
along which the target will be subtracted.
central : Python function, optional, default = np.nanmedian
Function to calculate statistics on `y` w.r.t. mse is computed.
normalization : Python function, optional, default = np.nanmedian
Function to normalize the calculated mse values
Returns
-------
score : numpy.ndarray
Mean Squared Error values
References
----------
.. [1] https://nbviewer.org/github/BMClab/BMC/blob/master/notebooks/Similarity.ipynb
Examples
--------
>>> import numpy as np
>>> rng = np.random.default_rng()
>>> y = rng.random((100, 10))
>>> y = y + np.atleast_2d(np.sin(2*np.pi*np.linspace(0, 1, 100))).T
>>> mse(y, axis1=0, axis2=1, central=np.nanmedian, normalization=np.nanmedian)
Version history
---------------
'1.0.0':
First release version
"""
logger.debug('mse...')
score: np.ndarray = np.empty((y.shape[axis2]), dtype=float)
score.fill(np.nan)
idx: np.ndarray = np.where(~np.all(np.isnan(y), axis=axis1))[0]
y = y.swapaxes(0, axis2)[idx, ...].swapaxes(0, axis2) # faster than .take
if target is not None:
logger.debug('target shape: %s', target.shape)
score[idx] = np.nanmean((y - target)**2, axis=axis1)
else:
score[idx] = np.nanmean((y - central(y, axis=axis2, keepdims=True))**2, axis=axis1)
if normalization is not None:
score = score/normalization(score)
logger.debug('idx: %s, score: %s', idx, score)
return score
def similarity(y: np.ndarray, axis1: int = 0, axis2: int = 1, threshold: float = 0,
nmin: int = 3, repeat: bool = True, metric: Callable = mse,
drop=True, msg: bool = True, **kwargs: Callable
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Select vectors in array by their similarity using a metric score.
For example, if `y` is a 2-D numpy.ndarray, with shape (n, m), `axis1`=0
(n is the number of rows) and `axis2`=1 (m is the number of columns), this
function will select the vectors along the columns, that are more similar
to a `central` statistics of `y` or to a `target`, using a `metric` score.
The metric score can be calculated repeatedly until all selected vectors
have a `metric` score not greater than a `threshold`, but the minimum
number of vectors to keep or the maximum number of vectors to discard
can be specified with parameter `nmin`.
The default `metric` and target are the mean squared error (`mse`) of `y`
w.r.t. the median of `y` along `axis2`. The `mse` metric is equivalent
to the squared Euclidean distance and it is prefered because it
penalizes largest differences more than the Euclidian distance. But any
other `metric` that can be calculated with a function can be used.
A possible use of this function is to discard the time-series data from bad
trials (treated as outliers) stored in a 2-D array (each trial as a column
and instants as rows) where the criterion is the similarity of the trial
w.r.t. the median trial (the median statistics is more robust to remove
outliers than the mean in case there are very bad trials). After the bad
trials are discarded, the mean of all trials could then be calculated more
reliablly.
Parameters
----------
y : numpy.ndarray
Array for the calculation of similarity (defined by a `metric`) of its
vectors w.r.t. a `target` or a `central` statistics of `y`.
axis1 : integer, optional, default = 0
Axis of `y` for which the `metric` will be calculated at each value and
possibly averaged in the `metric` calculation.
axis2 : integer, optional, default = 1
Axis of `y` along which the different vectors are going to be compared
with the `threshold` for their similarity (using their `metric` score).
threshold : float, optional, default = 0
If greater than 0, vector with `metric` score above this value will be
discarded. If 0, threshold will be automatically calculated as the
minimum of [qs[2] + 1.5*(qs[2]-qs[0]), score[-2], 3], where qs are the
quantiles and score[-2] is the before-last largest score of `metric`
among vectors calculated at the first time, not updated by the `repeat`
option.
nmin : integer, optional, default = 3
If greater than 0, minumum number of vectors to keep.
If lower than 0, maximum number of vectors to discard.
repeat :bool, optional, default = True
Whether to calculate similarity `metric` repeatedly, updating the
score calculation each time a vector is discarded.
With `repeat` True, the output `scores` will contain at each row
the updated score values for the used `metric` for each data vector.
The first row will contain the calculated original scores before any
vector was discarded. On the next equent rows, the vectors discarded
are represented by NaN values and the kept vectors by their updated
scores.
The last row will contain the updated scores of the final vectors kept.
With the `repeat` False, the comparison of score values with
`threshold` are made only once and vectors are discarded accordingly at
once. In this case, the output `scores` will contain only two rows, the
first row will contain the calculated original scores before any
vectors were discarded. At the second row, the vectors discarded are
represented with NaN values and the kept vectors by their updated
scores.
metric : optional, default=mse
Function to use as metric to compute similarity.
drop : bool, optional, default = True
Whether to drop (delete) the discarded vectors from `y` in the output.
If False, the values of the vectors discarded will be replaced by nans
and the returned array will have the same dimensions as the original
array.
msg : bool, optional, default = True
Whether to print some messages.
kwargs : optional
Options for the metric function (e.g., see `mse` function).
Returns
-------
y : numpy.ndarray
Array similar to input `y` but with vectors discarded (deleted) if
option `drop` is True or with all values of vectors discarded replaced
by nans if option `drop` is False.
ikept : numpy.ndarray
Indexes of kept vectors.
inotkept : numpy.ndarray
Indexes of not kept (discarded or replaced by nan) vectors.
scores : 2-D numpy.ndarray
Metric score values of each vector (as columns) for each round of
vector selection (one row per round plus the final values).
References
----------
.. [1] https://nbviewer.org/github/BMClab/BMC/blob/master/notebooks/Similarity.ipynb
Examples
--------
>>> import matplotlib.pyplot as plt
>>> rng = np.random.default_rng()
>>> t, n = 100, 10
>>> y = rng.random((t, n)) / 2
>>> y = y + np.atleast_2d(2*np.sin(2*np.pi*np.linspace(0, 1, t))).T
>>> for i in range(0, n, 2):
>>> j = rng.integers(t-20)
>>> p = rng.integers(20)
>>> y[j:j+p, i] = y[j:j+p, i] + rng.integers(10) - 5
>>> y[:, i] += rng.integers(4) - 2
>>> ysr, ikeptr, inotkeptr, scoresr = similarity(y)
>>> ysn, ikeptn, inotkeptn, scoresn = similarity(y, repeat=False)
>>> fig, axs = plt.subplots(3, 1, sharex=True, figsize=(8, 8))
>>> axs[0].plot(y, label=list(range(n)))
>>> axs[0].legend(loc=(1.01, 0))
>>> axs[0].set_title(f'Original vectors (n={n})')
>>> axs[1].plot(ysr, label= ikeptr.tolist())
>>> axs[1].set_title(f'Vectors maintained with repeat selection (n={len(ikeptr)})')
>>> axs[1].legend(loc=(1.01, 0))
>>> axs[2].plot(ysn, label= ikeptn.tolist())
>>> axs[2].set_title(f'Vectors maintained without repeat selection (n={len(ikeptn)})')
>>> axs[2].legend(loc=(1.01, 0))
>>> plt.show()
Version history
---------------
'1.0.0':
First release version
"""
logger.debug('Similarity...')
if y.ndim < 2:
raise ValueError('The input array must be at least a 2-D array.')
y = y.copy()
score: np.ndarray = metric(y, axis1=axis1, axis2=axis2, **kwargs)
scores: np.ndarray = np.atleast_2d(score)
ikept: np.ndarray = np.where(~np.isnan(score))[0] # indexes of kept vectors
inotkept: np.ndarray = np.where(np.isnan(score))[0] # indexes of discarded vectors
idx: np.ndarray = np.argsort(score)
score = score[idx]
nkept: int = np.count_nonzero(~np.isnan(score)) # number of kept vectors
if nkept < 3:
logger.debug('nkept: %s', nkept)
raise ValueError('The input array must have at least 3 valid vectors.')
if nmin < 0:
nmin = np.max([3, nkept + nmin])
if threshold == 0: # automatic threshold calculation
qs: np.ndarray = np.nanquantile(a=score, q=[.25, .50, .75])
threshold = np.min([qs[2] + 1.5*(qs[2]-qs[0]), score[-2], 3.])
if msg:
print(f'Calculated threshold: {threshold}')
if not repeat: # discard all vectors at once
idx2: np.ndarray = np.nonzero(score > threshold)[0] # vectors to discard
if len(idx2) > 0:
if nkept > nmin: # keep at least nmin vectors
inotkept = np.r_[inotkept, idx[idx2[-(y.shape[axis2] - nmin):]][::-1]]
y.swapaxes(0, axis2)[inotkept, ...] = np.nan
score = metric(y, axis1=axis1, axis2=axis2, **kwargs)
scores = np.vstack((scores, score))
logger.debug('not repeat - score: %s', score)
else: # discard vector with largest updated score one by one
while nkept > nmin and score[nkept-1] > threshold:
inotkept = np.r_[inotkept, idx[nkept-1]]
y.swapaxes(0, axis2)[inotkept[-1], ...] = np.nan
score = metric(y, axis1=axis1, axis2=axis2, **kwargs)
scores = np.vstack((scores, score))
idx = np.argsort(score)
score = score[idx]
nkept = nkept - 1
logger.debug('repeat - nkept: %s, score: %s', nkept, score)
if len(inotkept):
ikept = np.setdiff1d(ikept, inotkept)
if drop:
y = y.swapaxes(0, axis2)[ikept, ...].swapaxes(0, axis2)
if msg:
print(
f'Vectors discarded (dimension {axis2}, n={len(inotkept)}): {inotkept}')
return y, ikept, inotkept, scores