Last active
June 13, 2019 14:35
-
-
Save daanzu/f17d2cbcb60b3bc1cd7c8819756e42db to your computer and use it in GitHub Desktop.
blink(1) driver utility class -- can use either a CLI util or a native USB HID driver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import time | |
import contextlib | |
class Blink1(object): | |
def __init__(self, args=None): | |
if not args: args = ['blink1-tool.exe', '-q'] | |
self.args = args | |
self.color = (0,0,0) | |
self.raw_color = self.color | |
self.brightness = 100 | |
self.led = None | |
self.driver = None | |
# self.set_driver('blink1_ctypes') | |
def set_driver(self, driver): | |
if driver == 'blink1_ctypes': | |
import blink1_ctypes | |
self.blink1_ctypes = blink1_ctypes.Blink1() | |
self.driver = driver | |
def emitCommand(self, command): | |
args = self.args + command.split() | |
try: | |
# print "emitCommand():", args | |
output = subprocess.check_call(args) | |
except Exception as e: | |
print "Exception in emitCommand():", e | |
return None | |
return output | |
@staticmethod | |
def process_set(curr, seq): | |
if isinstance(seq, (list, tuple)): | |
if curr in seq: seq = seq[(seq.index(curr) + 1) % len(seq)] | |
else: seq = seq[0] | |
return seq | |
def set_brightness(self, brightness): | |
brightness = Blink1.process_set(self.brightness, brightness) | |
self.brightness = brightness | |
# r, g, b = [c * self.brightness / 100 for c in color] | |
# self.fade(self.color) | |
def set_led(self, led): | |
led = Blink1.process_set(self.led, led) | |
color = self.color | |
self.fade('#000') | |
self.led = led | |
self.fade(color) | |
def set_color(self, color, raw=None): | |
self.color = self.parse_color(color) | |
self.raw_color = self.process_color(self.color, raw=raw) | |
# @color.setter | |
# def color(self, value): | |
# pass | |
def parse_color(self, color): | |
if not color: | |
color = (0,0,0) | |
elif isinstance(color, basestring): | |
if color.startswith('#'): color = color[1:] | |
if len(color) == 3: color = (color[0] * 2) + (color[1] * 2) + (color[2] * 2) | |
assert len(color) == 6 | |
color = (color[0:2], color[2:4], color[4:6]) | |
color = [int(c, base=16) for c in color] | |
assert len(color) == 3 | |
r, g, b = color | |
return (r, g, b) | |
def process_color(self, color, raw=None): | |
color = self.parse_color(color) | |
r, g, b = [c * self.brightness / 100 for c in color] if not raw else color | |
return (r, g, b) | |
def fade(self, color=None, delay=100, pause=None, colors=None, async=False, raw=None): | |
if colors: | |
if color: | |
self.fade(color) | |
for c in colors: | |
self.fade(c, delay=delay) | |
return | |
color = self.parse_color(color) | |
r, g, b = self.process_color(color, raw) | |
if self.driver == 'blink1_ctypes': self.blink1_ctypes.fade_to_rgbn(delay, r, g, b, (not raw and self.led) or 0) | |
else: | |
arg = "--rgb %d,%d,%d -m %d" % (r, g, b, delay) | |
if self.led and not raw: arg += " --led=%d" % self.led | |
emitCommand(arg) | |
self.color = color | |
self.raw_color = (r, g, b) | |
if not async: | |
time.sleep(delay/1000.0) | |
if pause: | |
time.sleep(pause/1000.0) | |
def fade_lazy(self, color, force=None, raw=None, *args, **kwargs): | |
if force or self.parse_color(color) != self.color: | |
print "self.fade()" | |
self.fade(color, raw=raw, *args, **kwargs) | |
# set = fade | |
@contextlib.contextmanager | |
def fading(self, color='#fff', *args, **kwargs): | |
old_color = self.color | |
try: | |
self.fade(color, *args, **kwargs) | |
yield | |
finally: | |
self.fade(old_color) | |
@contextlib.contextmanager | |
def fading_result(self, color='#fff', color_fail='#f00', color_pass=None, *args, **kwargs): | |
old_color = self.color | |
try: | |
self.fade(color, *args, **kwargs) | |
yield | |
except Exception as e: | |
# print e | |
self.fade(color_fail) | |
time.sleep(0.5) | |
self.fade(old_color) | |
raise | |
else: | |
if color_pass: | |
self.fade(color_pass) | |
time.sleep(0.5) | |
self.fade(old_color) | |
@contextlib.contextmanager | |
def Brightness(self, brightness): | |
old_brightness = self.brightness | |
self.brightness = brightness | |
try: | |
yield | |
finally: | |
self.set_brightness(old_brightness) | |
def with_fading(self, func, *args, **kwargs): | |
with self.fading(*args, **kwargs): | |
func() | |
def with_fading_result(self, func, *args, **kwargs): | |
with self.fading_result(*args, **kwargs): | |
func() | |
# def pulse(self, color, brightness): | |
# old_color = self.color | |
# old_brightness = self.brightness | |
# self.brightness = brightness | |
# self.fade('#000') | |
# self.fade(color) | |
# self.brightness = old_brightness | |
# self.fade(old_color) | |
def pulse(self, color=None, brightness=None): | |
if self.raw_color != (0,0,0): | |
color = (0,0,0) | |
else: | |
assert color | |
self.blink(color) | |
def blink(self, color, count=1, delay=0.5, color_end=None, *args, **kwargs): | |
if not color_end: color_end = self.raw_color | |
for i in range(1, count+1): | |
self.fade(color, *args, **kwargs) | |
time.sleep(delay) | |
self.fade(color_end, raw=True) | |
if i < count: | |
# self.fade((0,0,0)) | |
time.sleep(delay) | |
# def sequence(self, colors, delay=1000): | |
# pass | |
def random(self, count=4): | |
arg = "--running=%d --delay=%d" % (count, 250) | |
emitCommand(arg) | |
self.fade(self.color, delay=1) | |
def running(self, color, count=1): | |
r, g, b = self.parse_color(color) | |
# r, g, b = self.process_color(color) | |
arg = "--rgb %d,%d,%d --running=%d" % (r, g, b, count) | |
emitCommand(arg) | |
self.fade(self.color, delay=1) | |
_blink1 = Blink1() | |
emitCommand = _blink1.emitCommand |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
blink1_ctypes.py -- blink(1) Python library | |
Uses ctypes wrapper around blink1-lib C library (which in turn wraps HIDAPI) | |
Make sure you have the blink1-lib shared library in the same directory | |
as blink1_ctypes.py or in your LD_LIBRARY_PATH | |
Based on Stephen Youndt's script on how to wrap the C library | |
2013, Tod E. Kurt, http://thingm.com/ | |
""" | |
import time | |
from ctypes import * | |
from ctypes.util import find_library | |
import inspect, os | |
import glob | |
# Find the blink1-lib C library | |
localpath = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) | |
libname = find_library("blink1") | |
if libname is None: | |
libname = find_library("Blink1") | |
if libname is None: # daanzu: bitness-specific | |
import platform | |
bitness = platform.architecture()[0][0:2] # '32' or '64' | |
# libname = find_library('blink1-lib-%s' % bitness) | |
libname = find_library(os.path.join(localpath, 'blink1-lib-%s' % bitness)) | |
if libname is None: | |
libname = find_library("blink1-lib") | |
if libname is None: | |
pathlist = glob.glob(os.path.join(localpath, '[Bb]link1-lib.so')) # unix | |
if pathlist: libname = pathlist[-1] | |
if libname is None: | |
pathlist = glob.glob(os.path.join(localpath, 'blink1-lib.dll')) # windows | |
if pathlist: libname = pathlist[-1] | |
if libname is None: | |
pathlist = glob.glob(os.path.join(localpath, 'lib[Bb]link1*')) # mac | |
if pathlist: libname = pathlist[-1] | |
# If we found the library, load it | |
if not libname: | |
raise ImportError("no blink1-lib shared library found") | |
libblink1 = CDLL(libname) | |
enumerate = libblink1.blink1_enumerate | |
enumerate.restype = c_int | |
enumerateByVidPid = libblink1.blink1_enumerateByVidPid | |
enumerateByVidPid.restype = c_int | |
enumerateByVidPid.argtypes = [c_int, c_int] | |
getCachedPath = libblink1.blink1_getCachedPath | |
getCachedPath.restype = c_char_p | |
getCachedPath.argtypes = [c_int] | |
getCachedSerial = libblink1.blink1_getCachedSerial | |
getCachedSerial.restype = c_char_p | |
getCachedSerial.argtypes = [c_int] | |
getCachedCount = libblink1.blink1_getCachedCount | |
getCachedCount.restype = c_int | |
open = libblink1.blink1_open | |
open.restype = c_void_p | |
openByPath = libblink1.blink1_openByPath | |
openByPath.restype = c_void_p | |
openByPath.argtypes = [c_char_p] | |
openBySerial = libblink1.blink1_openBySerial | |
openBySerial.restype = c_void_p | |
openBySerial.argtypes = [c_char_p] | |
openById = libblink1.blink1_openById | |
openById.restype = c_void_p | |
openById.argtypes = [c_int] | |
close = libblink1.blink1_close | |
close.argtypes = [c_void_p] | |
# | |
getVersion = libblink1.blink1_getVersion | |
getVersion.restype = c_int | |
getVersion.argtypes = [c_void_p] | |
fadeToRGB = libblink1.blink1_fadeToRGB | |
fadeToRGB.restype = c_int | |
fadeToRGB.argtypes = [c_void_p, c_ushort, c_ubyte, c_ubyte, c_ubyte] | |
fadeToRGBN = libblink1.blink1_fadeToRGBN | |
fadeToRGBN.restype = c_int | |
fadeToRGBN.argtypes = [c_void_p, c_ushort, c_ubyte, c_ubyte, c_ubyte, c_ubyte] | |
setRGB = libblink1.blink1_setRGB | |
setRGB.restype = c_int | |
setRGB.argtypes = [c_void_p, c_ubyte, c_ubyte, c_ubyte] | |
serverdown = libblink1.blink1_serverdown | |
serverdown.restype = c_int | |
serverdown.argtypes = [c_void_p, c_ubyte, c_ushort] | |
play = libblink1.blink1_play | |
play.restype = c_int | |
play.argtypes = [c_void_p, c_ubyte, c_ubyte] | |
writePatternLine = libblink1.blink1_writePatternLine | |
writePatternLine.restype = c_int | |
writePatternLine.argtypes = [c_void_p, c_ushort, c_ubyte, c_ubyte, c_ubyte, c_ubyte] | |
readPatternLine = libblink1.blink1_readPatternLine | |
readPatternLine.restype = c_int | |
readPatternLine.argtypes = [c_void_p, c_void_p,c_void_p,c_void_p,c_void_p,c_void_p] | |
enableDegamma = libblink1.blink1_enableDegamma | |
disableDegamma = libblink1.blink1_disableDegamma | |
################################################################################# | |
debug_rw = False | |
class Blink1: | |
def __init__(self): | |
self.dev = None | |
self.open() | |
def find(self): | |
return self.open() | |
def enumerate(self): | |
enumerate() | |
def open(self): | |
self.close() | |
self.dev = open() | |
def open_by_id(self,id): | |
self.dev = openById(id) | |
def close(self): | |
if self.dev != None: | |
close(self.dev) | |
self.dev = None | |
def notfound(self): | |
return None # fixme what to do here | |
def fade_to_rgbn(self, fade_millis, red,green,blue, ledn): | |
""" | |
Command blink(1) to fade to RGB color | |
""" | |
return fadeToRGBN( self.dev, fade_millis, red,green,blue, ledn) | |
def fade_to_rgb(self, fade_millis, red,green,blue): | |
""" | |
Command blink(1) to fade to RGB color | |
""" | |
return self.fade_to_rgbn( fade_millis, red,green,blue, 0) | |
def playloop(self, play,startpos,endpos,count): | |
""" | |
""" | |
playloop(self.dev, play, startpos,endpos, count) | |
def play(self, play,startpos): | |
""" | |
""" | |
playloop(self.dev, play, startpos,endpos, count) | |
def get_version(self): | |
""" | |
Get blink(1) firmware version | |
""" | |
return str(getVersion(self.dev)) | |
def get_serialnumber(self): | |
''' | |
Get blink(1) serial number | |
''' | |
sernum = getCachedSerial(0) | |
if not sernum : sernum = '00000000' | |
return sernum | |
def get_serialnumbers(self): # FIXME: | |
seriallist = [] | |
for i in range(0, getCachedCount()): | |
seriallist.append( getCachedSerial(i) ) | |
return seriallist | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment