Created
August 3, 2012 06:15
-
-
Save daeken/3245016 to your computer and use it in GitHub Desktop.
Encoder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Encoder(object): | |
| instance = None | |
| @staticmethod | |
| def get(): | |
| if Encoder.instance != None: #and Encoder.instance.test(): | |
| return Encoder.instance | |
| ports = getPorts() | |
| port = None | |
| for tport in ports: | |
| if tport[2] == 'Prolific USB-to-Serial Comm Port' and tport[3] == 'Prolific': | |
| port = tport[0] | |
| break | |
| if port == None: | |
| return None | |
| Encoder.instance = Encoder().init(port) | |
| return Encoder.instance | |
| def init(self, port): | |
| try: | |
| self.conn = serial.Serial(port, 9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=None) | |
| except: | |
| return None | |
| self._lock = thread.allocate_lock() | |
| self.reset() | |
| return self | |
| def write(self, data): | |
| self.conn.write(data) | |
| def read(self, size): | |
| return self.conn.read(size) | |
| def lock(self): | |
| self._lock.acquire() | |
| def unlock(self): | |
| self._lock.release() | |
| def reset(self, lock=True): | |
| if lock: | |
| self.lock() | |
| self.write('\x1Ba') | |
| self.write('\x1Bo\x08\x08\x08') | |
| if self.read(5) != '\x1B0\x08\x08\x08': | |
| return False | |
| self.write('\x1By') | |
| if self.read(2) != '\x1B0': | |
| return False | |
| ret = self.setLeadingZeros(40) | |
| if lock: | |
| self.unlock() | |
| return ret | |
| def cancel(self): | |
| self.reset(lock=False) | |
| def test(self): | |
| self.write('\x1Ba') | |
| to = self.conn.timeout | |
| try: | |
| self.conn.timeout = 0.1 | |
| except: | |
| self.reset(lock=False) | |
| self.conn.timeout = 0.1 | |
| found = False | |
| for i in range(10): | |
| #while self.read(1) != '': | |
| # pass | |
| self.write('\x1Be') | |
| data = '' | |
| while True: | |
| c = self.read(1) | |
| if c == '': | |
| break | |
| data += c | |
| if data == '\x1By': | |
| found = True | |
| break | |
| self.conn.timeout = to | |
| if not found: | |
| self.reset(lock=False) | |
| return found | |
| def setLeadingZeros(self, count): | |
| self.write('\x1Bz' + chr(count) + chr(count)) | |
| return self.read(2) == '\x1B0' | |
| def format(self, data): | |
| out = 1 | |
| off = 1 | |
| for byte in map(ord, data): | |
| bout = byte | 0x100 | |
| out |= bout << off | |
| off += 9 | |
| data = '' | |
| while out != 0: | |
| data += chr(out & 0xFF) | |
| out >>= 8 | |
| return data | |
| def initWriteCard(self, card, crypt=True, hico=False): | |
| if crypt: | |
| data = encryptCard(state.sitecode, card) | |
| else: | |
| data = ''.join(chr(int(card[i:i+2], 16)) for i in range(0, len(card), 2)) | |
| data += checksum(data) | |
| print ''.join('%02X' % ord(x) for x in data) | |
| first, last, middle = ord(data[0]), ord(data[-1]), data[1:-1] | |
| if last & 1: | |
| first |= 1 | |
| last &= 0xFE | |
| data = chr(first) + middle + chr(last) | |
| print ''.join('%02X' % ord(x) for x in data) | |
| self.writer = self.writeRaw((None, None, self.format(data)), hico) | |
| return self.writer() | |
| def finishWriteCard(self): | |
| return self.writer() | |
| @callableGenerator | |
| def writeRaw(self, data, hico): | |
| self.lock() | |
| self.reset(lock=False) | |
| #found = False | |
| #for i in range(10): | |
| # if self.test(): | |
| # found = True | |
| # break | |
| #if not found: | |
| # self.unlock() | |
| # yield 'Encoder communication error' | |
| # return | |
| if hico: | |
| self.write('\x1Bx') | |
| if self.read(2) != '\x1B0': | |
| yield False | |
| return | |
| else: | |
| self.write('\x1By') | |
| if self.read(2) != '\x1B0': | |
| yield False | |
| return | |
| yield True | |
| cmd = '\x1Bn\x1Bs' | |
| for i in range(3): | |
| cmd += '\x1B' | |
| cmd += chr(i+1) | |
| chunk = data[i] | |
| if chunk == None: | |
| cmd += '\0' | |
| else: | |
| cmd += chr(len(chunk)) | |
| cmd += chunk | |
| res = False | |
| try: | |
| to = self.conn.timeout | |
| self.conn.timeout = 15 | |
| self.write(cmd + '?\x1C') | |
| c = self.read(1) | |
| print 'read', `c` | |
| self.conn.timeout = to | |
| if c == '': | |
| self.cancel() | |
| res = 'timeout' | |
| elif c != '\x1B': | |
| res = False | |
| else: | |
| x = self.read(1) | |
| if x == '0': | |
| res = True | |
| else: | |
| res = False | |
| except: | |
| import traceback | |
| traceback.print_exc() | |
| self.unlock() | |
| yield res | |
| def countbits(self, val): | |
| bits = 0 | |
| while val: | |
| bits += 1 | |
| val >>= 1 | |
| return bits | |
| def flip(self, val): | |
| out = 0 | |
| for i in range(8): | |
| out = (out << 1) | (val & 1) | |
| val >>= 1 | |
| return out | |
| def unformat(self, data): | |
| val = 0 | |
| off = 0 | |
| for byte in data: | |
| val |= ord(byte) << off | |
| off += 8 | |
| while val & 1 == 0: | |
| val >>= 1 | |
| #print toBin(val, bits=countbits(val), reverse=True) | |
| val >>= 1 | |
| bits = self.countbits(val) - 1 | |
| val >>= (bits % 9) + 1 | |
| data = '' | |
| while val > 1: | |
| data += chr(val & 0xFF) # noflip | |
| if val & 0x100 != 0x100: | |
| return None | |
| val >>= 9 | |
| return '\xE0' + data | |
| def readRaw(self): | |
| self.lock() | |
| to = self.conn.timeout | |
| self.conn.timeout = 15 | |
| self.write('\x1Bm') | |
| data = [] | |
| while True: | |
| c = self.read(1) | |
| if c == '': | |
| self.conn.timeout = to | |
| self.cancel() | |
| self.unlock() | |
| return (None, None, None), 't' | |
| elif c != '\x1B': | |
| continue | |
| c = self.read(1) | |
| if c == 's': | |
| break | |
| for i in range(3): | |
| assert self.read(1) == '\x1B' | |
| assert self.read(1) == chr(i+1) | |
| size = ord(self.read(1)) | |
| if size > 100:# 195: | |
| size = 16 | |
| data.append(''.join(map(chr, map(self.flip, map(ord, self.read(size)))))) | |
| #data.append(self.serial.read(size)) | |
| assert self.read(3) == '?\x1C\x1B' | |
| x = self.read(1) | |
| self.conn.timeout = to | |
| self.unlock() | |
| return data[2], x |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment