Skip to content

Instantly share code, notes, and snippets.

@hardbyte
Created July 7, 2013 22:43
Show Gist options
  • Save hardbyte/5945252 to your computer and use it in GitHub Desktop.
Save hardbyte/5945252 to your computer and use it in GitHub Desktop.
Frequency analysis of fast changing data.
import numpy as np
from collections import deque
import time
import threading
import matplotlib.pyplot as plt
def rollingFFT(s, n, dt):
fy = np.fft.fft(s)
# Frequencies associated with each samples
freqs = np.fft.fftfreq(n, d=dt)
hz = np.fft.fftshift(freqs)
ampl = np.fft.fftshift(abs(fy))
index = hz > 0
hz = hz[index]
ampl = ampl[index]
return hz, ampl
def plotFFT(fig, n, dt, data, title="Frequencies Observed", show_data=True):
"""A thread to run the fft as fast as it can.
data is a shared dequeue - filled from somewhere else
"""
if show_data:
ax = fig.add_subplot(211)
ax2 = fig.add_subplot(212)
line2, = ax2.plot(data)
else:
ax = fig.add_subplot(111)
f, a = rollingFFT(data, n, dt)
line1, = ax.plot(f, a, 'r--')
ax.set_xlabel("Hz")
ax.set_xlim([0, f[-1]])
ax.set_ylim([-500, 10000])
plt.title(title)
while True:
f, a = rollingFFT(data, n, dt)
line1.set_ydata(a)
if show_data:
line2.set_ydata(data)
plt.draw()
if __name__ == "__main__":
# simulate generation
def sim(fig, l=20):
n = 200
dt = 0.003
t = np.arange(start=0, stop=l, step=dt)
noise = 25*np.random.normal(0, 2, len(t))
motor_speed = 4000 * np.ones_like(t)
ripple = 70 * np.sin(2*np.pi*(5*t)*t) + \
(40 * np.cos(2*np.pi*t*30)) + \
(80 * (t/10) * np.cos(2*np.pi*t*60))
signal = noise + motor_speed + ripple
signal_queue = deque(maxlen=n)
for i in range(n+1):
signal_queue.append(signal[i])
t = threading.Thread(target=plotFFT, args=(fig, n, dt, signal_queue))
t.start()
for new_value in signal:
signal_queue.append(new_value)
time.sleep(dt)
t.join()
fig = plt.figure()
t = threading.Thread(target=sim, args=(fig,))
t.start()
plt.show()
t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment