Last active
August 29, 2015 14:09
-
-
Save aldur/f2ae69167f4003324f76 to your computer and use it in GitHub Desktop.
Wireless Lab - Sliding window tracing parsing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy | |
import os | |
import pickle | |
from itertools import islice | |
TRACES_DIRECTORY = "traces" | |
OUTPUT_DIRECTORY = "output" | |
WINDOW_SIZE = 256 | |
SAMPLING_RATE = 32 | |
FREQ_RANGE_MIN = 1 | |
FREQ_RANGE_MAX = 3 | |
def getMagnitudeList(window): | |
"""Return a list of magnitudes from to the window.""" | |
magnitudes = list() | |
for line in window: | |
_, x, y, z = line.split("\t") | |
x, y, z = (float(i) for i in (x, y, z)) | |
magnitude = numpy.sqrt(x ** 2 + y ** 2 + z ** 2) | |
magnitudes.append(magnitude) | |
return magnitudes | |
def filterFrequencies(fft_x_half, freq_falf): | |
filtered_fft_x_half, filtered_freq_falf = list(), list() | |
for i, f in enumerate(freq_falf): | |
if f >= FREQ_RANGE_MIN and f <= FREQ_RANGE_MAX: | |
filtered_fft_x_half.append(freq_falf[i]) | |
filtered_freq_falf.append(f) | |
return filtered_fft_x_half, filtered_freq_falf | |
def window(seq, n=2): | |
"Returns a sliding window (of width n) over data from the iterable" | |
" s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... " | |
it = iter(seq) | |
result = tuple(islice(it, n)) | |
if len(result) == n: | |
yield result | |
for elem in it: | |
result = result[1:] + (elem,) | |
yield result | |
def transformateWindow(magnitudes): | |
fft_x = numpy.fft.fft(magnitudes) | |
l = len(fft_x) | |
freq = numpy.fft.fftfreq(l, 1.0 / 20) | |
ftt_x_shifted = numpy.fft.fftshift(fft_x) | |
half_l = numpy.ceil(l / 2.0) | |
fft_x_half = numpy.abs((2.0 / l) * fft_x[:half_l]) | |
freq_falf = freq[:half_l] | |
return filterFrequencies(fft_x_half[1:], freq_falf[1:]) | |
def processWindow(w): | |
w = getMagnitudeList(w) | |
var = numpy.var(w) | |
fft_x_half, freq_falf = transformateWindow(w) | |
pf_index = freq_falf[fft_x_half.index(max(fft_x_half))] | |
return [var, pf_index] + freq_falf | |
def main(): | |
if not os.path.isdir(OUTPUT_DIRECTORY): | |
os.mkdir(OUTPUT_DIRECTORY) | |
for f in os.listdir(TRACES_DIRECTORY): | |
with open(os.path.join(TRACES_DIRECTORY, f), 'r') as input: | |
content = input.readlines() | |
lists = list() | |
lists.append(processWindow(content[:WINDOW_SIZE])) | |
for w in window(content[WINDOW_SIZE:], SAMPLING_RATE): | |
lists.append(processWindow(w)) | |
with open(os.path.join(OUTPUT_DIRECTORY, f), 'wb') as output: | |
pickle.dump(lists, output) | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment