Skip to content

Instantly share code, notes, and snippets.

@daeken
Created August 3, 2012 06:15
Show Gist options
  • Select an option

  • Save daeken/3245016 to your computer and use it in GitHub Desktop.

Select an option

Save daeken/3245016 to your computer and use it in GitHub Desktop.
Encoder
class Encoder(object):
instance = None
@staticmethod
def get():
if Encoder.instance != None: #and Encoder.instance.test():
return Encoder.instance
ports = getPorts()
port = None
for tport in ports:
if tport[2] == 'Prolific USB-to-Serial Comm Port' and tport[3] == 'Prolific':
port = tport[0]
break
if port == None:
return None
Encoder.instance = Encoder().init(port)
return Encoder.instance
def init(self, port):
try:
self.conn = serial.Serial(port, 9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=None)
except:
return None
self._lock = thread.allocate_lock()
self.reset()
return self
def write(self, data):
self.conn.write(data)
def read(self, size):
return self.conn.read(size)
def lock(self):
self._lock.acquire()
def unlock(self):
self._lock.release()
def reset(self, lock=True):
if lock:
self.lock()
self.write('\x1Ba')
self.write('\x1Bo\x08\x08\x08')
if self.read(5) != '\x1B0\x08\x08\x08':
return False
self.write('\x1By')
if self.read(2) != '\x1B0':
return False
ret = self.setLeadingZeros(40)
if lock:
self.unlock()
return ret
def cancel(self):
self.reset(lock=False)
def test(self):
self.write('\x1Ba')
to = self.conn.timeout
try:
self.conn.timeout = 0.1
except:
self.reset(lock=False)
self.conn.timeout = 0.1
found = False
for i in range(10):
#while self.read(1) != '':
# pass
self.write('\x1Be')
data = ''
while True:
c = self.read(1)
if c == '':
break
data += c
if data == '\x1By':
found = True
break
self.conn.timeout = to
if not found:
self.reset(lock=False)
return found
def setLeadingZeros(self, count):
self.write('\x1Bz' + chr(count) + chr(count))
return self.read(2) == '\x1B0'
def format(self, data):
out = 1
off = 1
for byte in map(ord, data):
bout = byte | 0x100
out |= bout << off
off += 9
data = ''
while out != 0:
data += chr(out & 0xFF)
out >>= 8
return data
def initWriteCard(self, card, crypt=True, hico=False):
if crypt:
data = encryptCard(state.sitecode, card)
else:
data = ''.join(chr(int(card[i:i+2], 16)) for i in range(0, len(card), 2))
data += checksum(data)
print ''.join('%02X' % ord(x) for x in data)
first, last, middle = ord(data[0]), ord(data[-1]), data[1:-1]
if last & 1:
first |= 1
last &= 0xFE
data = chr(first) + middle + chr(last)
print ''.join('%02X' % ord(x) for x in data)
self.writer = self.writeRaw((None, None, self.format(data)), hico)
return self.writer()
def finishWriteCard(self):
return self.writer()
@callableGenerator
def writeRaw(self, data, hico):
self.lock()
self.reset(lock=False)
#found = False
#for i in range(10):
# if self.test():
# found = True
# break
#if not found:
# self.unlock()
# yield 'Encoder communication error'
# return
if hico:
self.write('\x1Bx')
if self.read(2) != '\x1B0':
yield False
return
else:
self.write('\x1By')
if self.read(2) != '\x1B0':
yield False
return
yield True
cmd = '\x1Bn\x1Bs'
for i in range(3):
cmd += '\x1B'
cmd += chr(i+1)
chunk = data[i]
if chunk == None:
cmd += '\0'
else:
cmd += chr(len(chunk))
cmd += chunk
res = False
try:
to = self.conn.timeout
self.conn.timeout = 15
self.write(cmd + '?\x1C')
c = self.read(1)
print 'read', `c`
self.conn.timeout = to
if c == '':
self.cancel()
res = 'timeout'
elif c != '\x1B':
res = False
else:
x = self.read(1)
if x == '0':
res = True
else:
res = False
except:
import traceback
traceback.print_exc()
self.unlock()
yield res
def countbits(self, val):
bits = 0
while val:
bits += 1
val >>= 1
return bits
def flip(self, val):
out = 0
for i in range(8):
out = (out << 1) | (val & 1)
val >>= 1
return out
def unformat(self, data):
val = 0
off = 0
for byte in data:
val |= ord(byte) << off
off += 8
while val & 1 == 0:
val >>= 1
#print toBin(val, bits=countbits(val), reverse=True)
val >>= 1
bits = self.countbits(val) - 1
val >>= (bits % 9) + 1
data = ''
while val > 1:
data += chr(val & 0xFF) # noflip
if val & 0x100 != 0x100:
return None
val >>= 9
return '\xE0' + data
def readRaw(self):
self.lock()
to = self.conn.timeout
self.conn.timeout = 15
self.write('\x1Bm')
data = []
while True:
c = self.read(1)
if c == '':
self.conn.timeout = to
self.cancel()
self.unlock()
return (None, None, None), 't'
elif c != '\x1B':
continue
c = self.read(1)
if c == 's':
break
for i in range(3):
assert self.read(1) == '\x1B'
assert self.read(1) == chr(i+1)
size = ord(self.read(1))
if size > 100:# 195:
size = 16
data.append(''.join(map(chr, map(self.flip, map(ord, self.read(size))))))
#data.append(self.serial.read(size))
assert self.read(3) == '?\x1C\x1B'
x = self.read(1)
self.conn.timeout = to
self.unlock()
return data[2], x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment