%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import math
import random
default_cycles_per_step = 8
default_amplitude_steps = 4
default_wave_length = 16
random_signal = 0.0
def generate_signal(cycles_per_step = None, amplitude_steps = None, wave_length = None):
if cycles_per_step is None:
cycles_per_step = default_cycles_per_step
if amplitude_steps is None:
amplitude_steps = default_amplitude_steps
if wave_length is None:
wave_length = default_wave_length
return [ (amplitude if step < wave_length / 2 else -amplitude) + \
random.gauss(0, random_signal)
for amplitude in range(amplitude_steps - 1, -1, -1)
for cycle in range(cycles_per_step)
for step in range(wave_length)]
signal = generate_signal(10, 4, 8)
print("length: {}".format(len(signal)))
plt.plot(signal)
length: 320
[<matplotlib.lines.Line2D at 0xbf83e10>]
def plot_signal(l):
N = len(l)
s = sum(l) / N
y = [x - s for x in l]
f1 = np.fft.fft(y)
amplitude = np.sqrt([x.imag * x.imag + x.real * x.real for x in f1])
phase = [math.atan2(x.imag, x.real) for x in f1]
plt.plot([1 * x for x in phase])
plt.plot(amplitude / N * 10)
plt.plot(l)
index = [i for i, v in enumerate(amplitude) if v == max(amplitude)][0]
print("highest index: {} amplitude: {}".format(index, amplitude[index] / N))
if index > N / 2:
print("lower index: {} amplitude: {}".format(N - index, amplitude[N - index] / N))
vs = []
# for index, m in ((22, 3), (30, 4), (37, 5)):
# verschiebung = phase[index] / np.pi / m
# vs.append(verschiebung)
# print("verschiebung: {} amplitude: {} at index {}".format(verschiebung, amplitude[index] / N, index))
# print("verschiebung: {} {}".format((vs[1] - vs[0]) % 1, (vs[2] - vs[0]) % 1))
x = plt.title('wave ftt')
x.phase = phase
x.amplitude = amplitude
return x
plot_signal(generate_signal(10, 1, 12))
highest index: 220 amplitude: 0.3165096029130611 lower index: 20 amplitude: 0.31650960291306096
<matplotlib.text.Text at 0x10803310>
plot_signal(generate_signal(10, 2, 6))
highest index: 30 amplitude: 0.6666666666666667
<matplotlib.text.Text at 0x7e46df0>
plot_signal(generate_signal(10, 3, 4))
highest index: 40 amplitude: 1.0606601717798212
<matplotlib.text.Text at 0x7e7ee50>
class LinearApproximation(object):
def __str__(self):
return "f(x) = {} * x + {}; f({}) = 0".format(self.slope, self.y_at_0, self.x_at_0)
__repr__ = __str__
INFINITY = 1.0
while INFINITY != INFINITY * 2:
INFINITY *= 2
def linear_approximation(l):
l = l[:]
N = len(l)
space = np.arange(N)
A = np.array([ space, np.ones(N)])
# linearly generated sequence
w = np.linalg.lstsq(A.T,l)[0] # obtaining the parameters
slope = w[0]
y_at_0 = w[1]
# my computation
# derivation = [a - b for a, b in zip(l[1:], l[:-1])]
# middle_y = sum(l) / N
# slope = sum(derivation) / (N - 1)
# y_at_0 = middle_y - slope * (N - 1) / 2
result = LinearApproximation()
result.slope = slope
result.y_at_0 = y_at_0
if slope == 0:
x_at_0 = INFINITY
else:
x_at_0 = - y_at_0 / slope
result.x_at_0 = x_at_0
result.function = function = lambda x: result.y_at_0 + result.slope * x
def plot():
plt.plot(space,list(map(result.function, space)),'r-',space,l,'o')
plt.title(str(result))
result.plot = plot
return result
#linear_approximation([1,2,3,4]).plot()
linear_approximation([2.5629154477415055, 2.5629154477415055, 2.5629154477415055, 2.5629154477415055, 1.9221865858061293, 1.9221865858061293, 1.9221865858061293, 1.9221865858061293, 1.2814577238707527, 1.2814577238707527, 1.2814577238707527, 1.2814577238707527, 0.64072886193537637, 0.64072886193537637, 0.64072886193537637, 0.64072886193537637, 0.0, 0.0, 0.0, 0.0]).plot()
def generate_signal_stream():
return generate_signal() * 100
def get_synchronisation_offset():
default_cycles_per_step
default_amplitude_steps
default_wave_length
return default_cycles_per_step * default_wave_length * (default_amplitude_steps * 1.5)
default_window_size = 32
class WindowAnalysis:
pass
def analyze_window(window_offset, window_size = None):
if window_size is None:
window_size = default_window_size
# get signal
window = signal_stream()[window_offset: window_offset + window_size]
# compute signal fft
N = len(window)
assert len(window) == window_size
s = sum(window) / N
y = [x - s for x in window]
f1 = np.fft.fft(y)
# compute amplitude and phase
amplitude = np.sqrt([x.imag * x.imag + x.real * x.real for x in f1])
phase = [math.atan2(x.imag, x.real) for x in f1]
# evaluate and assert assumptions
max_index = round(window_size / default_wave_length)
index = [i for i, v in enumerate(amplitude) if v == max(amplitude)][0]
if index > N / 2:
index = N - index
#if index != 0:
# assert index == max_index, "{} == {}".format(index, round(window_size / default_wave_length))
#print("highest index: {} amplitude: {}".format(index, amplitude[index] / N))
# show signal
#plt.plot([1 * x for x in phase])
#plt.plot(amplitude / N * 10)
#plt.plot(window)
#x = plt.title('analyze signal')
x = WindowAnalysis()
x.amplitude = amplitude[max_index] / N
x.next_offset = window_offset + window_size
x.window_size = window_size
return x
def analyze_stream(window_offset):
window_offset0 = window_offset
stream = generate_signal_stream()
amplitudes = []
last_amplitude = None
# throw away the first incomplete waves
amplitudes = []
for i in range(int(round(default_cycles_per_step * default_amplitude_steps * default_wave_length / default_window_size))):
x = analyze_window(window_offset)
window_offset = x.next_offset
amplitude = x.amplitude
amplitudes.append(amplitude)
window_size = x.window_size
#print(amplitudes)
approximations = []
for i in range(len(amplitudes)):
approximations.append(linear_approximation(amplitudes))
amplitudes.append(amplitudes.pop(0))
best_approximation = min(approximations, key = lambda a: a.slope)
approximate_synchronisation_offset = best_approximation.x_at_0 * x.window_size
#print("{} == {}".format(approximate_synchronisation_offset, original_synchronisation_offset))
return approximate_synchronisation_offset
a = [analyze_stream(o) for o in range(100)]
plt.plot(a)
b = [analyze_stream(0) for o in range(100)]
plt.plot(b)
[<matplotlib.lines.Line2D at 0x11ce2890>]