Skip to content

Instantly share code, notes, and snippets.

@takaswie
Created March 5, 2022 12:10
Show Gist options
  • Save takaswie/2ba5c0c0d7219aaca1ddc700fa8ae00a to your computer and use it in GitHub Desktop.
Save takaswie/2ba5c0c0d7219aaca1ddc700fa8ae00a to your computer and use it in GitHub Desktop.
Asynchronous transaction tool with libhinawa2
#!/usr/bin/env python3
import gi
gi.require_version('GLib', '2.0')
gi.require_version('Hinawa', '3.0')
from gi.repository import GLib, Hinawa
from sys import argv, exit
from threading import Thread
from struct import unpack, pack
def print_help():
print('{} <CDEV> read <ADDR>'.format(argv[0]))
print('{} <CDEV> write <ADDR> <VALUE>'.format(argv[0]))
print(' CDEV: path to Linux FireWire character device for the target device.')
print(' ADDR: address of the target device, hexadecimal')
print(' VALUE: value to write to the target device, hexadecimal')
if len(argv) < 3:
print_help()
exit(1)
path = argv[1]
mode = argv[2]
def handle_read_operation(req, node, addr):
frame = [0] * 4
frame = req.transaction_sync(node, Hinawa.FwTcode.READ_QUADLET_REQUEST,
addr, 4, frame, 100)
quad = unpack('>I', frame)[0]
print('read value: 0x{:08x}, addr: 0x{:012x}'.format(quad, addr))
def handle_write_operation(req, node, addr, value):
handle_read_operation(req, node, addr)
frame = pack('>I', value)
frame = req.transaction_sync(node, Hinawa.FwTcode.WRITE_QUADLET_REQUEST,
addr, 4, frame, 100)
print('write: 0x{:08x}'.format(value))
MODES = {
'read': (handle_read_operation, 1),
'write': (handle_write_operation, 2),
}
if mode not in MODES:
print_help()
exit(1)
op, arg_count = MODES[mode]
if len(argv) < 3 + arg_count:
print_help()
exit(1)
args = [int(a, 16) for a in argv[3:]]
node = Hinawa.FwNode.new()
node.open(path)
ctx = GLib.MainContext.new()
src = node.create_source()
src.attach(ctx)
dispatcher = GLib.MainLoop.new(ctx, False)
th = Thread(target=lambda d: d.run(), args=(dispatcher, ))
th.start()
req = Hinawa.FwReq.new()
try:
op(req, node, *args)
except Exception as e:
print('Error:', argv)
print(e)
dispatcher.quit()
th.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment