Skip to content

Instantly share code, notes, and snippets.

@teburd
Created April 12, 2018 20:05
Show Gist options
  • Save teburd/e14678821a05c81ff33538ef7b201673 to your computer and use it in GitHub Desktop.
Save teburd/e14678821a05c81ff33538ef7b201673 to your computer and use it in GitHub Desktop.
import sys
import csv
import select
import numpy as np
from numpy import *
import quaternion as quat
#QUATERNIONS = [quat.quaternion(0.999999998971134, -2.90833843472273e-05, -8.09302082679352e-06, 3.38584042287428e-05),
# quat.quaternion(0.240001876668282, 0.601447508933827, 0.16885365153494, -0.743066913242888),
# quat.quaternion(0.986804155073817, 0.0990620143376974, 0.018555479056279, -0.126727941044586),
# quat.quaternion(0.913411113326214, -0.262175933338282, -0.0903565876158233, 0.297959066156395)]
QUATERNIONS = [quat.quaternion(0.999999997146134, 4.66205271884476e-05, -4.9686485709746e-05, -3.26421814177937e-05),
quat.quaternion(0.399840713223264, -0.580898194789104, 0.584561518802914, 0.401213810922184),
quat.quaternion(0.9509085600237, -0.206502410567555, 0.182092441200399, 0.141322354074371),
quat.quaternion(0.926023166872184, -0.268973506879747, 0.216331923461484, 0.152757474135564)]
class Accelerometer(object):
def __init__(self, status, x, y, z):
self.status = status
self.x = x
self.y = y
self.z = z
def __repr__(self):
return "Accelerometer(status={}, x={}, y={}, z={})".format(self.status, self.x, self.y, self.z)
class Sample(object):
def __init__(self,timestamp, accelerometers):
self.timestamp = timestamp
self.accelerometers = accelerometers
class Decoder(object):
def __init__(self, f, accelerometers, quats=QUATERNIONS):
self.f = f
self.reader = csv.reader(f)
self.quats = quats
self.accelerometers = accelerometers
self.read_count = 0
self.initial_time = None
def next(self, timeout):
timeout = max(timeout, 0.01)
readable, _, _ = select.select([self.f], [], [], timeout)
if len(readable) > 0:
return self.__next__()
else:
return None
def __next__(self):
row = next(self.reader)
if len(row) < 1+self.accelerometers*5:
raise StopIteration
time = float(row[0])
if self.initial_time == None:
self.initial_time = time
accelerometers = []
for i in range(self.accelerometers):
status = row[i*5+2]
x = float(row[i*5+3])
y = float(row[i*5+4])
z = float(row[i*5+5])
v = np.array([x,y,z])
if i < len(self.quats):
q = self.quats[i]
x = quat.rotate_vectors(q, v)
#x.dtype = np.dtype(np.float)
v = x
acc = Accelerometer(status, v[0], v[1], v[2])
accelerometers.append(acc)
self.read_count += 1
return Sample((time - self.initial_time), accelerometers)
def __iter__(self):
return self
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment