Skip to content

Instantly share code, notes, and snippets.

@Jim-Holmstroem
Last active February 5, 2017 21:20
Show Gist options
  • Save Jim-Holmstroem/46e7ae51ba9640308de5db301214361a to your computer and use it in GitHub Desktop.
Save Jim-Holmstroem/46e7ae51ba9640308de5db301214361a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2
from __future__ import print_function, division
"""
Made for: "PlayStation 3 Dual Shock Controller"
Based on:
https://gist.github.com/rdb/8864666
"""
import os, struct, array
from fcntl import ioctl
from operator import methodcaller
def get_devices():
devices = map(
"/dev/input/{}".format,
filter(
methodcaller('startswith', 'js'),
os.listdir("/dev/input")
)
)
return devices
devices = get_devices()
print("devices={}".format(devices))
def device_name(device):
with open(device, 'rb') as jsdev:
buf = array.array('c', ['\0', ] * 64)
ioctl(jsdev, 0x80006a13 + (0x10000 * len(buf)), buf)
js_name = buf.tostring()
return js_name
print("name=\"{}\"".format(device_name(devices[0])))
def get_n_buttons(device):
with open(device, 'rb') as jsdev:
buf = array.array('B', [0, ])
ioctl(jsdev, 0x80016a12, buf)
n_buttons = buf[0]
return n_buttons
print("n_buttons={}".format(get_n_buttons(devices[0])))
def get_n_axes(device):
with open(device, 'rb') as jsdev:
buf = array.array('B', [0, ])
ioctl(jsdev, 0x80016a11, buf)
n_axes = buf[0]
return n_axes
print("n_axes={}".format(get_n_axes(devices[0])))
def get_button_map(device):
n_buttons = get_n_buttons(device)
with open(device, 'rb') as jsdev:
buf = array.array('H', [0, ] * 200)
ioctl(jsdev, 0x80406a34, buf)
buttons = buf[:n_buttons]
button_names = {
0x00: 'select',
0x01: 'L3',
0x02: 'R3',
0x03: 'start',
0x04: 'up',
0x05: 'right',
0x06: 'down',
0x07: 'left',
0x10: 'PS',
}
# TODO for axis in axes: axis_names.get(axis, 'unknown(0x%02x)' % axis)
return button_names
buttons = get_button_map(devices[0])
inv_buttons = {v: k for k, v in buttons.items()}
print("buttons={}".format(buttons))
def get_axis_map(device):
n_axes = get_n_axes(device)
with open(device, 'rb') as jsdev:
buf = array.array('B', [0, ] * 0x40)
ioctl(jsdev, 0x80406a32, buf)
axes = buf[:n_axes]
axis_names = {
0x00: 'Lx',
0x01: 'Ly',
0x02: 'Rx',
0x03: 'Ry',
0x0c: 'L2',
0x0d: 'R2',
0x0e: 'L1',
0x0f: 'R1',
0x10: 'triangle',
0x11: 'circle',
0x12: 'cross',
0x13: 'square',
}
# TODO for axis in axes: axis_names.get(axis, 'unknown(0x%02x)' % axis)
return axis_names
axes = get_axis_map(devices[0])
inv_axes = {v: k for k, v in axes.items()}
print("axes={}".format(axes))
def main(device, listeners=[]):
print("Start listen to Joystick: {}".format(device))
with open(device, 'rb') as jsdev:
while True:
evbuf = jsdev.read(8)
if evbuf:
time, value, type_, id_ = struct.unpack('IhBB', evbuf)
list(map(
lambda listener: listener(time=time, value=value, type_=type_, id_=id_),
listeners
))
#if type_ & 0x01 and number in buttons:
#print(
# "{}: {}".format(
# buttons[number],
# value,
# )
#)
#if type_ & 0x02 and number in axes:
#if axes[number] == "Lx":
# max_point = 128
# point = int(max_point*(1+value/32767)/2)
# print("[{}x{}]".format(" "*point, " "*(max_point-point)))
#print(
# "{}: {:.3f}".format(
# axes[number],
# value / 32767,
# )
#)
class Listener(object):
def __init__(self, name, f):
BUTTON, AXIS = 1, 2
is_button = name in buttons.values()
self.name = name
self.f = f
self.type_ = BUTTON if is_button else AXIS
self.id_ = (inv_buttons if is_button else inv_axes)[name]
print(self.type_, self.id_)
def __call__(self, time, value, type_, id_):
if (type_, id_) == (self.type_, self.id_):
return self.f(time=time, value=value, type_=type_, id_=id_)
import requests
def post_axis(post_id, domain, codomain):
domain_low, domain_high = domain
codomain_low, codomain_high = codomain
def post_axis_(time, value, type_, id_):
in_domain = value >= domain_low and value <= domain_high
if in_domain:
value_unit = (value - domain_low) / (domain_high - domain_low)
value_codomain = int((codomain_high - codomain_low) * value_unit + codomain_low)
try:
requests.post(
"http://192.168.1.69/output/{post_id}".format(
post_id=post_id,
),
timeout=0.1,
data={"value": value_codomain},
)
print("{}: {}".format(post_id, value_codomain))
except Exception as e:
print("timeout")
return post_axis_
main(
devices[0],
listeners=[
Listener("Lx", post_axis(13, domain=(-32767, 32767), codomain=(0, 4095))),
]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment