%pylab inline
rcParams['figure.figsize'] = (10, 4) #wide graphs by default
from __future__ import print_function
from __future__ import division
Populating the interactive namespace from numpy and matplotlib
from scipy.io import wavfile
sr, e = wavfile.read('e.wav')
sr
16000
from IPython.display import Audio
Audio(e, rate=sr)
plot(e)
[<matplotlib.lines.Line2D at 0x7fa91b2a1950>]
start = 4000
fourier_trans = rfft(e[start:start + 2048]* hanning(2048), n=4096)
mag_spectrum_e = abs(fourier_trans)
plot(mag_spectrum_e)
[<matplotlib.lines.Line2D at 0x7fa91b27dfd0>]
log_mag_spec_e = log(mag_spectrum_e)
plot(log_mag_spec_e[:1000])
[<matplotlib.lines.Line2D at 0x7fa91b220750>]
freqs = linspace(0, sr/2, 2049)
plot(freqs, log_mag_spec_e)
xlabel('Frequency (Hz)')
ylabel('Log amplitude')
xlim((0, 4000))
grid()
sr, a = wavfile.read('a.wav')
plot(a)
[<matplotlib.lines.Line2D at 0x7fa91af03b10>]
Audio(a, rate=sr)
start = 4000
fourier_trans = rfft(a[start:start + 2048]* hanning(2048), n=4096)
mag_spectrum_a = abs(fourier_trans)
log_mag_spec_a = log(mag_spectrum_a)
freqs = linspace(0, sr/2, 2049)
plot(freqs, log_mag_spec_a)
xlabel('Frequency (Hz)')
ylabel('Log amplitude')
xlim((0, 4000))
grid()
sr, o = wavfile.read('o.wav')
plot(o)
[<matplotlib.lines.Line2D at 0x7fa91add95d0>]
Audio(o, rate=sr)
start = 4000
fourier_trans = rfft(o[start:start + 2048]* hanning(2048), n=4096)
mag_spectrum_o = abs(fourier_trans)
log_mag_spec_o = log(mag_spectrum_o)
plot(log_mag_spec_o[:1000])
figure()
plot(mag_spectrum_o[:1000])
[<matplotlib.lines.Line2D at 0x7fa91b152850>]
plot(log_mag_spec_a[:1000], alpha=0.5)
plot(log_mag_spec_e[:1000], alpha=0.5)
plot(log_mag_spec_o[:1000], alpha=0.5)
legend(['a','e','o'])
<matplotlib.legend.Legend at 0x7fa91ad5bb90>
where $s$ is the voice signal, $x$ is the source or excitation signal and $y$ is the filter.
$$S(f) = X(f)Y(f)$$$$|S(f)| = |X(f)||Y(f)|$$$$\ln|S(f)| = \ln|X(f)| + \ln|Y(f)|$$$$\mathcal{F^{-1}}\big[\ln|S(f)|\big] = \mathcal{F^{-1}}\big[\ln|X(f)|\big] + \mathcal{F^{-1}}\big[\ln|Y(f)|\big]$$cepstrum_a = ifft(log_mag_spec_a)
plot(cepstrum_a[1:])
/usr/lib/python2.7/dist-packages/numpy/core/numeric.py:460: ComplexWarning: Casting complex values to real discards the imaginary part return array(a, dtype, copy=False, order=order)
[<matplotlib.lines.Line2D at 0x7fa9194938d0>]
plot(cepstrum_a[:])
[<matplotlib.lines.Line2D at 0x7fa9193d6e50>]
Now we separate the source and the filter by finding the first peak in the cepstrum.
pulse = list(r_[1, zeros(50)])*40
plot(pulse)
[<matplotlib.lines.Line2D at 0x7fa9192ff7d0>]
plot(abs(rfft(pulse)))
[<matplotlib.lines.Line2D at 0x7fa919297390>]
filtered = abs(rfft(pulse)) * cos(linspace(0, 0.5*pi, len(rfft(pulse)), endpoint=False))
plot(filtered)
[<matplotlib.lines.Line2D at 0x7fa9191bdd50>]
plot(abs(rfft(filtered)))
[<matplotlib.lines.Line2D at 0x7fa919166910>]
Separate source and filter:
n0 = argmax(cepstrum_a[1:])+ 1
n0
69
Everything before that point represents the filter.
cepstrum_filter_a = abs(fft.fft(cepstrum_a[:n0 - 1], n=2048))
plot(cepstrum_filter_a[:1000])
grid()
plot(cepstrum_filter_a, 'green', lw=3)
twinx()
plot(log_mag_spec_a)
xlim((0,1000))
(0, 1000)
Everything after represents the source (excitation).
source_coeffs = r_[zeros(n0),cepstrum_a[n0:]]
cepstrum_source_a = fft.fft(source_coeffs, n=2048)
plot(abs(cepstrum_source_a)[:1025])
[<matplotlib.lines.Line2D at 0x7fa91900ba10>]
source_spec = np.e ** (abs(cepstrum_source_a)[:1025])
plot(source_spec)
[<matplotlib.lines.Line2D at 0x7fa918f3d0d0>]
source = fft.irfft(source_spec)
plot(source)
[<matplotlib.lines.Line2D at 0x7fa918ede5d0>]
source_cycled = list(source * 500) * 50
plot(source_cycled)
Audio(source_cycled, rate=sr*2)
cepstrum_e = ifft(log_mag_spec_e)
n0 = argmax(cepstrum_e[1:])+ 1
cepstrum_filter_e = abs(fft.fft(cepstrum_e[:n0 - 1], n=2048))
plot(abs(cepstrum_filter_e), 'green', lw=3)
twinx()
plot(log_mag_spec_e)
xlim((0,1000))
(0, 1000)
cepstrum_o = ifft(log_mag_spec_o)
n0 = argmax(cepstrum_o[10:1024])+ 10 # had to do some minor manual adjustments here!
cepstrum_filter_o = abs(fft.fft(cepstrum_o[:n0 - 1], n=2048))
plot(cepstrum_filter_o, 'green', lw=3)
twinx()
plot(log_mag_spec_o)
xlim((0,1000))
(0, 1000)
freqs = linspace(0, sr/2, 2048)
plot(freqs, cepstrum_filter_a)
plot(freqs, cepstrum_filter_e)
plot(freqs, cepstrum_filter_o)
legend(['a','e','o'])
xlabel('Frequency (Hz)')
grid()
xlim((0, 4000))
title('Cepstra extracted filter for vowels');
Different amount of detail can be preserved by using more or less cepstral coefficients.
num_coeffs = [10, 15, 30, 50]
for n in num_coeffs:
cepstrum_filter = abs(fft.fft(cepstrum_a[:n], n=512))
plot(abs(cepstrum_filter)[:250])
legend(num_coeffs)
grid()
title('Different number of coefficients for Cepstral filter ("a")');
num_coeffs = [10, 15, 30,50]
for n in num_coeffs:
cepstrum_filter = abs(fft.fft(cepstrum_e[:n], n=512))
plot(abs(cepstrum_filter)[:250])
legend(num_coeffs)
grid()
title('Different number of coefficients for Cepstral filter ("e")');
num_coeffs = [10, 15, 30,50]
for n in num_coeffs:
cepstrum_filter = abs(fft.fft(cepstrum_o[:n], n=512))
plot(abs(cepstrum_filter)[:250])
legend(num_coeffs)
grid()
title('Different number of coefficients for Cepstral filter ("o")');
DCT type II is used.
$$X_k = \sum_{n=0}^{N-1} x_n \cos \left[\frac{\pi}{N} \left(n+\frac{1}{2}\right) k \right] \quad \quad k = 0, \dots, N-1.$$http://en.wikipedia.org/wiki/Discrete_cosine_transform
The DCT is another type of harmonic analisys, but (for DCT type II), each harmonic is shifted by 0.5 "steps" within the analysis window.
N = 1024
k = 0
phs = linspace(k * 0.5*pi/N, (k * pi *(N-0.5))/N, N)
plot(cos(phs))
[<matplotlib.lines.Line2D at 0x7fa918e37cd0>]
k = 1
phs = linspace(k * 0.5*pi/N, (k * pi *(N-0.5))/N, N)
plot(cos(phs))
[<matplotlib.lines.Line2D at 0x7fa918db0bd0>]
k = 2
phs = linspace(k * 0.5*pi/N, (k * pi *(N-0.5))/N, N)
plot(cos(phs))
[<matplotlib.lines.Line2D at 0x7fa918cd2990>]
This produces some assymetrical aliasing on the second half of the spectrum (i.e. it's not symmetrical like the Fourier Transform for real input)
k = 1023
phs = linspace(k * 0.5*pi/N, (k * pi *(N-0.5))/N, N)
plot(cos(phs))
[<matplotlib.lines.Line2D at 0x7fa91898d8d0>]
phs = linspace(k * 0.5*pi/N, (k * pi *(N-0.5))/N, N)
plot(cos(phs)[0:100])
[<matplotlib.lines.Line2D at 0x7fa91892f750>]
from scipy.fftpack import dct
cepstrum_dct_o = dct(log_mag_spec_o)
n0 = argmax(cepstrum_dct_o[10:1024])+ 10 # had to do some minor manual adjustments here!
cepstrum_dct_o = abs(fft.fft(cepstrum_dct_o[:n0 - 1], n=4096))
plot(cepstrum_dct_o, 'green', lw=3)
twinx()
plot((cepstrum_filter_o), 'r')
twinx()
plot(log_mag_spec_o)
xlim((0,1000))
(0, 1000)
plot(cepstrum_a[1:200])
[<matplotlib.lines.Line2D at 0x7fa9190b1850>]
len(cepstrum_a)
2049
plot(cepstrum_e[1:200])
[<matplotlib.lines.Line2D at 0x7fa91ae83490>]
plot(cepstrum_o[1:200])
[<matplotlib.lines.Line2D at 0x7fa918727190>]
The x-axis in a Ceptrum plot is called Quefrency. But it is in fact a time axis.
argmax(cepstrum_a[50:100])+ 50
69
argmax(cepstrum_e[50:100])+ 50
67
argmax(cepstrum_o[50:100])+ 50
64
# Cepstrum was oversampled
f_a = sr /(2.0*(argmax(cepstrum_a[50:100])+ 50))
f_a
115.94202898550725
f_e = sr /(2.0 * (argmax(cepstrum_e[50:100])+ 50))
f_e
119.40298507462687
f_o = sr /(2.0 * (argmax(cepstrum_o[50:100])+ 50))
f_o
125.0
freqs = linspace(0, sr/2, 2049)
plot(freqs, log_mag_spec_a)
xlim((0, 500))
vlines(f_a, 0, 14)
grid()
freqs = linspace(0, sr/2, 2049)
plot(freqs, log_mag_spec_e)
xlim((0, 500))
vlines(f_e, 0, 14)
grid()
freqs = linspace(0, sr/2, 2049)
plot(freqs, log_mag_spec_o)
xlim((0, 500))
vlines(f_o, 0, 16)
grid()
noise = 5000.0 * (random.random(2048) - 0.5)
fourier_trans = rfft(noise * hanning(2048), n=4096)
mag_spectrum_noise = abs(fourier_trans)
log_mag_spec_noise = log(mag_spectrum_noise)
plot(log_mag_spec_noise)
[<matplotlib.lines.Line2D at 0x7fa918164810>]
cepstrum_noise = ifft(log_mag_spec_noise)
plot(abs(cepstrum_noise[1:]))
[<matplotlib.lines.Line2D at 0x7fa918773810>]
sinsig = 2500.0 * ((sin(linspace(0, 20*2*pi,2048))) + (sin(linspace(0, 40*2*pi,2048))))
fourier_trans = rfft(sinsig * hanning(2048), n=4096)
mag_spectrum_sinsig = abs(fourier_trans)
log_mag_spec_sinsig = log(mag_spectrum_sinsig)
plot(log_mag_spec_sinsig)
[<matplotlib.lines.Line2D at 0x7fa918a1a850>]
cepstrum_sinsig = ifft(log_mag_spec_sinsig)
plot(abs(cepstrum_sinsig[1:]))
[<matplotlib.lines.Line2D at 0x7fa918b230d0>]
The simplest way can be setting a threshold for the maximum value of the cepstrum, but other techniques to detect flatness or peakedness of the cepstrum can be used.
A signal is modeled as a sum of time varying sinusoids:
$$P_k(n) = \alpha_k(n)\sin(\phi_k(n))$$The signal is the sum of each individual sinusoid:
$$ s(n) = \sum\limits_{k}P_k(n)$$n is the point in time (sample number) and k is the index to each sinusoidal component.
Pxx, freqs, bins, im = specgram(e, 2048, 16000, noverlap=1024, pad_to=8192)
Pxx.shape
(4097, 10)
plot(Pxx[:,0])
[<matplotlib.lines.Line2D at 0x7fa91903af90>]
First we find the local maxima to identify peaks:
maxima = argwhere((Pxx[:-2,0] < Pxx[1:-1,0]) & (Pxx[2:,0] < Pxx[1:-1,0])) + 1
plot(Pxx[:,0])
plot(maxima, Pxx[maxima, 0], 'o')
[<matplotlib.lines.Line2D at 0x7fa91874dc10>]
Now filter by threshold (let's choose 1000):
peaks = [index for index in maxima if Pxx[index, 0] > 1000]
peaks
[array([68]), array([136]), array([204]), array([272])]
plot(Pxx[:,0])
plot(peaks, Pxx[peaks, 0], 'o')
[<matplotlib.lines.Line2D at 0x7fa9180bcfd0>]
peak_list = []
for spec in Pxx.T:
maxima = argwhere((spec[:-2] < spec[1:-1]) & (spec[2:] < spec[1:-1])) + 1
peaks = [(freqs[index][0], spec[index][0]) for index in maxima if spec[index] > 1000]
peak_list.append(peaks)
A = (4,5)
A[0] = 2
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-74-3863c4c11a92> in <module>() 1 A = (4,5) ----> 2 A[0] = 2 TypeError: 'tuple' object does not support item assignment
peak_list[0]
[(132.8125, 13259.500682785412), (265.625, 5598.2166931424135), (398.4375, 11918.611861673435), (531.25, 12480.112370777058)]
array(peak_list[0])
array([[ 132.8125 , 13259.50068279], [ 265.625 , 5598.21669314], [ 398.4375 , 11918.61186167], [ 531.25 , 12480.11237078]])
array(peak_list[0])[0]
array([ 132.8125 , 13259.50068279])
array(peak_list[0])[0, 0]
132.8125
array(peak_list[0])[:,0]
array([ 132.8125, 265.625 , 398.4375, 531.25 ])
array(peak_list[0])[:,1] # amplitudes
array([ 13259.50068279, 5598.21669314, 11918.61186167, 12480.11237078])
for i, peaks in enumerate(peak_list):
freqs = array(peaks)[:,0]
plot(ones(len(freqs))*i, freqs, 'o')
specgram(e, 2048, 16000, noverlap=1024, pad_to=8192)
for i, peaks in enumerate(peak_list):
freqs = array(peaks)[:,0]
plot(ones(len(freqs))*bins[i], freqs, 'o')
specgram(e, 2048, 16000, noverlap=1024, pad_to=8192, interpolation='nearest')
for i, peaks in enumerate(peak_list):
freqs = array(peaks)[:,0]
plot(ones(len(freqs))*bins[i], freqs, 'o')
ylim((0, 1000))
(0, 1000)
Top part of the spectrum:
specgram(e, 2048, 16000, noverlap=1024, pad_to=8192, interpolation='nearest')
for i, peaks in enumerate(peak_list):
freqs = array(peaks)[:,0]
plot(ones(len(freqs))*bins[i], freqs, 'o')
ylim((1500, 3000))
(1500, 3000)
Now connect the dots. First start tracks at initial peak list:
tracks = [[r_[freq, amp, bins[0]]] for freq, amp in peak_list.pop(0)]
tracks
[[array([ 1.32812500e+02, 1.32595007e+04, 6.40000000e-02])], [array([ 2.65625000e+02, 5.59821669e+03, 6.40000000e-02])], [array([ 3.98437500e+02, 1.19186119e+04, 6.40000000e-02])], [array([ 5.31250000e+02, 1.24801124e+04, 6.40000000e-02])]]
tracks_ = array(tracks)
tracks_, tracks_.shape
(array([[[ 1.32812500e+02, 1.32595007e+04, 6.40000000e-02]], [[ 2.65625000e+02, 5.59821669e+03, 6.40000000e-02]], [[ 3.98437500e+02, 1.19186119e+04, 6.40000000e-02]], [[ 5.31250000e+02, 1.24801124e+04, 6.40000000e-02]]]), (4, 1, 3))
Then start connecting frame by frame:
new_peaks = peak_list.pop(0)
new_peaks
[(128.90625, 43415.969559712321), (257.8125, 10136.461353220735), (388.671875, 12396.216214466369), (513.671875, 28229.64572344625), (1814.453125, 1062.6965352678799), (1898.4375, 1376.5386563369461), (1917.96875, 1572.4372651563174), (1943.359375, 3003.1438876970128), (1957.03125, 1337.0451035829653), (2572.265625, 1448.3072079608921), (2595.703125, 1168.6898589921368), (2724.609375, 1000.7178254947365)]
f = new_peaks[0][0]
f
128.90625
tracks[:]
[[array([ 1.32812500e+02, 1.32595007e+04, 6.40000000e-02])], [array([ 2.65625000e+02, 5.59821669e+03, 6.40000000e-02])], [array([ 3.98437500e+02, 1.19186119e+04, 6.40000000e-02])], [array([ 5.31250000e+02, 1.24801124e+04, 6.40000000e-02])]]
last_bps_freq = []
last_bps_time = []
for bps in tracks:
last_bps_freq.append(bps[-1][0]) # get last breakpoint for all tracks
last_bps_time.append(bps[-1][2]) # get last breakpoint for all tracks
print(last_bps_freq)
[132.8125, 265.625, 398.4375, 531.25]
tracks[:]
[[array([ 1.32812500e+02, 1.32595007e+04, 6.40000000e-02])], [array([ 2.65625000e+02, 5.59821669e+03, 6.40000000e-02])], [array([ 3.98437500e+02, 1.19186119e+04, 6.40000000e-02])], [array([ 5.31250000e+02, 1.24801124e+04, 6.40000000e-02])]]
But last breakpoint must be from the previous frame!
previous_frame_time = bins[0]
previous_frame_time
0.064000000000000001
active_tracks = argwhere(last_bps_time == previous_frame_time)
active_tracks = array(active_tracks)
active_tracks
array([[0], [1], [2], [3]])
prev_freqs = array(tracks)[active_tracks,-1,0]
prev_freqs
array([[ 132.8125], [ 265.625 ], [ 398.4375], [ 531.25 ]])
dists = abs(prev_freqs - f)
dists
array([[ 3.90625], [ 136.71875], [ 269.53125], [ 402.34375]])
argmin(dists)
0
active_tracks[argmin(dists)]
array([0])
best_matches = []
for peak in new_peaks:
f = peak[0]
dists = abs(prev_freqs - f)
best_matches.append([active_tracks[argmin(dists)], dists.min()])
best_matches = array(best_matches)
best_matches
array([[array([0]), 3.90625], [array([1]), 7.8125], [array([2]), 9.765625], [array([3]), 17.578125], [array([3]), 1283.203125], [array([3]), 1367.1875], [array([3]), 1386.71875], [array([3]), 1412.109375], [array([3]), 1425.78125], [array([3]), 2041.015625], [array([3]), 2064.453125], [array([3]), 2193.359375]], dtype=object)
Now that we have the best match for each of the new points, we need to decide which ones get connected.
argwhere(best_matches[:, 0] == 0)
array([[0]])
argwhere(best_matches[:, 0] == 3)
array([[ 3], [ 4], [ 5], [ 6], [ 7], [ 8], [ 9], [10], [11]])
best_matches[argwhere(best_matches[:, 0] == 3)]
array([[[array([3]), 17.578125]], [[array([3]), 1283.203125]], [[array([3]), 1367.1875]], [[array([3]), 1386.71875]], [[array([3]), 1412.109375]], [[array([3]), 1425.78125]], [[array([3]), 2041.015625]], [[array([3]), 2064.453125]], [[array([3]), 2193.359375]]], dtype=object)
best_matches[argwhere(best_matches[:, 0] == 3)][:,:,1]
array([[17.578125], [1283.203125], [1367.1875], [1386.71875], [1412.109375], [1425.78125], [2041.015625], [2064.453125], [2193.359375]], dtype=object)
best_next = argmin(best_matches[argwhere(best_matches[:, 0] == 3)][:,:,1])
best_next
0
best_next += argwhere(best_matches[:, 0] == 3)[0]
best_next
array([3])
Check if close enough
dist_th = 500 # Set maximum distance allowed for connection
if best_matches[best_next,1] < dist_th:
print("Match!")
Match!
Now, all together:
Pxx, freqs, bins, im = specgram(e, 2048, 16000, noverlap=1024, pad_to=8192)
peak_list = []
for spec in Pxx.T:
maxima = argwhere((spec[:-2] < spec[1:-1]) & (spec[2:] < spec[1:-1])) + 1
peaks = [(freqs[index], spec[index]) for index in maxima if spec[index] > 1000]
peak_list.append(peaks)
tracks = [[r_[freq, amp, bins[0]]] for freq, amp in peak_list.pop(0)] #inital tracks from initial peaks
tracks = array(tracks)
new_peaks = peak_list.pop(0)
last_bps = tracks[:,-1,:]
last_bps[:,2]
previous_frame_time = 0.064
active_tracks = argwhere(last_bps[:,2] == previous_frame_time)
active_tracks = array(active_tracks)
prev_freqs = tracks[active_tracks,-1,0]
best_matches = []
for peak in new_peaks:
f = peak[0]
dists = abs(prev_freqs - f)
best_matches.append([active_tracks[argmin(dists)][0], dists.min()])
best_matches = array(best_matches)
connections = dict()
for i in set(best_matches[:, 0]):
best_next = argmin(best_matches[argwhere(best_matches[:, 0] == i)][:,:,1])
best_next += argwhere(best_matches[:, 0] == i)[0]
connections[int(i)] = best_next[0]
connections
{0: 0, 1: 1, 2: 2, 3: 3}
Finally place breakpoints in tracks. (Exercise left to the reader)
Try sinusoidal modeling:
http://www.klingbeil.com/spear/
http://mtg.upf.edu/technologies/sms
http://www.cerlsoundgroup.org/Loris/
http://www.csounds.com/manual/html/SpectralATS.html
Streaming real-time:
By: Andrés Cabrera mantaraya36@gmail.com For MAT course MAT 201A at UCSB
This ipython notebook is licensed under the CC-BY-NC-SA license: http://creativecommons.org/licenses/by-nc-sa/4.0/