Last active
September 21, 2021 20:21
-
-
Save povik/66f69f539905bcde6f4cacbae1d24ad6 to your computer and use it in GitHub Desktop.
Play audio through the embedded speaker on Mac mini
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# speaker_amp.py -- play audio through the embedded speaker on Mac mini | |
# | |
# tested with m1n1 64dab9482 | |
# | |
# sample usage with sox: | |
# | |
# sox INPUT_FILE -t raw -r 48000 -c 1 -e signed-int -b 32 -L - gain -63 | python3 ./speaker_amp.py | |
# | |
# (expects mono, 24-bit signed samples padded to 32 bits on the msb side) | |
import argparse | |
import os.path | |
import code | |
import re | |
import sys | |
from m1n1.setup import * | |
from m1n1.hw.dart import DART, DARTRegs | |
#from m1n1.hw.admac import ADMAC, ADMACRegs | |
#from m1n1.hw.i2c import I2C | |
class R_FIFO_TX(Register32): | |
READ = 10 | |
STOP = 9 | |
START = 8 | |
class R_FIFO_RX(Register32): | |
EMPTY = 8 | |
class R_STATUS(Register32): | |
XFER_READY = 27 | |
class R_CONTROL(Register32): | |
ENABLE = 11 | |
CLEAR_RX = 10 | |
CLEAR_TX = 9 | |
CLOCK = 7, 0 | |
class I2CRegs(RegMap): | |
FIFO_TX = 0x00, R_FIFO_TX | |
FIFO_RX = 0x04, R_FIFO_RX | |
STATUS = 0x14, R_STATUS | |
CONTROL = 0x1c, R_CONTROL | |
class I2C: | |
def __init__(self, u, adt_path): | |
self.u = u | |
self.p = u.proxy | |
self.iface = u.iface | |
self.base = u.adt[adt_path].get_reg(0)[0] | |
self.regs = I2CRegs(u, self.base) | |
def clear_fifos(self): | |
self.regs.CONTROL.set(CLEAR_TX=1, CLEAR_RX=1) | |
def clear_status(self): | |
self.regs.STATUS.val = 0xffffffff | |
def _fifo_read(self, nbytes): | |
read = [] | |
for _ in range(nbytes): | |
val = self.regs.FIFO_RX.reg | |
timeout = 1000 | |
while val.EMPTY and timeout > 0: | |
val = self.regs.FIFO_RX.reg | |
timeout -= 1 | |
if timeout == 0: | |
raise Exception("timeout") | |
read.append(int(val) & 0xff) | |
return bytes(read) | |
def _fifo_write(self, buf, stop=False): | |
for no, byte in enumerate(buf): | |
fifo_val = R_FIFO_TX(byte) | |
if stop and no == len(buf) - 1: | |
fifo_val.STOP = 1 | |
self.regs.FIFO_TX.reg = fifo_val | |
if not stop: | |
return | |
timeout = 1000 | |
while not self.regs.STATUS.reg.XFER_READY and timeout > 0: | |
timeout -= 1 | |
if timeout == 0: | |
raise Exception("timeout") | |
def write_reg(self, addr, reg, data): | |
self.clear_fifos() | |
self.regs.CONTROL.set(ENABLE=1, CLOCK=0x4) | |
self.regs.FIFO_TX.reg = R_FIFO_TX(addr << 1, START=1) | |
self._fifo_write(bytes([reg]) + bytes(data), stop=True) | |
self.regs.CONTROL.set(ENABLE=0, CLOCK=0x4) | |
def read_reg(self, addr, reg, nbytes): | |
self.clear_fifos() | |
self.regs.CONTROL.set(ENABLE=1, CLOCK=0x4) | |
self.regs.FIFO_TX.reg = R_FIFO_TX(addr << 1, START=1) | |
self._fifo_write(bytes([reg]), stop=True) | |
self.regs.FIFO_TX.reg = R_FIFO_TX((addr << 1) | 1, START=1) | |
self.regs.FIFO_TX.reg = R_FIFO_TX(nbytes, STOP=1, READ=1) | |
data = self._fifo_read(nbytes) | |
self.regs.CONTROL.set(ENABLE=0, CLOCK=0x4) | |
return data | |
class R_UNK_CONTROL(Register32): | |
UNK1_STOPCOUNT = 0 | |
UNK2_RESETCOUNT = 1 | |
UNK3 = 3 | |
UNK4 = 8 | |
class R_COUNTER_HI(Register32): | |
FLAG = 31 | |
class R_DESC_RING(Register32): | |
UNDERFLOW = 31, 16 | |
# when READ_SLOT==WRITE_SLOT one of the two is set | |
EMPTY = 8 | |
FULL = 9 | |
ERR = 10 | |
UNK1 = 6 | |
# next slot to read | |
READ_SLOT = 5, 4 | |
# next slot to be written to | |
WRITE_SLOT = 1, 0 | |
class R_REPORT_RING(Register32): | |
OVERFLOW = 31, 16 | |
# goes through 0, 1, 2, 3 as the pieces of a report | |
# are being read through REPORT_READ | |
READOUT_PROGRESS = 13, 12 | |
# when READ_SLOT==WRITE_SLOT one of the two is set | |
EMPTY = 8 | |
FULL = 9 | |
ERR = 10 | |
# next slot to read | |
READ_SLOT = 5, 4 | |
# next slot to be written to | |
WRITE_SLOT = 1, 0 | |
class R_TX_STATUS1(Register32): | |
UNK1 = 1 | |
UNK2 = 4 | |
UNK3 = 8 | |
UNK4 = 9 | |
UNK5 = 10 | |
class R_TX_CONTROL(Register32): | |
RESET_RINGS = 0 | |
class ADMACRegs(RegMap): | |
TX_FLAGS = 0x0, Register32 # one bit per channel | |
TX_FLAGS_CLEAR = 0x4, Register32 | |
RX_FLAGS = 0x8, Register32 | |
RX_FLAGS_CLEAR = 0xc, Register32 | |
UNK_CONTROL = 0x10, Register32 | |
STATUS0 = 0x34, Register32 # bit per channel, exports 0x10 from TX_UNK4 of channels | |
STATUS1 = 0x44, Register32 | |
STATUS2 = 0x54, Register32 | |
# a 24 MHz always-running counter | |
COUNTER_LO = 0x70, Register32 | |
COUNTER_HI = 0x74, R_COUNTER_HI | |
TX_CONTROL = (irange(0x8000, 12, 0x400)), R_TX_CONTROL | |
TX_UNK_STATUS0 = irange(0x8010, 12, 0x400), Register32 | |
TX_STATUS1 = irange(0x8014, 12, 0x400), R_TX_STATUS1 | |
TX_UNK_STATUS2 = irange(0x8018, 12, 0x400), Register32 | |
TX_UNK_STATUS3 = irange(0x801c, 12, 0x400), Register32 | |
TX_UNK_STATUS4 = irange(0x801c, 12, 0x400), Register32 | |
TX_UNK3 = irange(0x8060, 12, 0x400), Register32 | |
TX_UNK4 = irange(0x8024, 12, 0x400), Register32 | |
TX_DESC_RING = irange(0x8070, 12, 0x400), R_DESC_RING | |
TX_REPORT_RING = irange(0x8074, 12, 0x400), R_REPORT_RING | |
TX_UNK2 = (irange(0x8000, 12, 0x400), irange(0x78, (0x200-0x78)//4, 4)), Register32 | |
RX_UNK = (irange(0x8200, 12, 0x400), irange(0, 0x200//4, 4)), Register32 | |
DESC_WRITE = irange(0x10000, 12, 4), Register32 | |
REPORT_READ = irange(0x10100, 12, 4), Register32 | |
class ADMADescriptorFlags(Register32): | |
# macos always writes descriptors in pairs, | |
# the second descriptor has this bit set | |
UNK_LAST = 16 | |
DESC_ID = 7, 0 | |
class ADMADescriptor(Reloadable): | |
def __init__(self, addr, length, flags): | |
self.addr, self.length, self.flags = addr, length, ADMADescriptorFlags(flags) | |
def __repr__(self): | |
return f"<descriptor: addr=0x{self.addr:x} len=0x{self.length:x} flags={self.flags}>" | |
def ser(self): | |
return [ | |
self.addr & (1<<32)-1, | |
self.addr>>32 & (1<<32)-1, | |
self.length & (1<<32)-1, | |
int(self.flags) | |
] | |
@classmethod | |
def deser(self, seq): | |
if not len(seq) == 4: | |
raise ValueError | |
return ADMADescriptor( | |
seq[0] | seq[1] << 32, # addr | |
seq[2], # length (in bytes) | |
seq[3] # flags | |
) | |
class ADMAReportFlags(Register32): | |
UNK1 = 24 | |
UNK2 = 25 | |
UNK3 = 27 | |
DESC_ID = 8, 0 | |
class ADMAReport(Reloadable): | |
def __init__(self, countval, unk1, flags): | |
self.countval, self.unk1, self.flags = countval, unk1, ADMAReportFlags(flags) | |
def __repr__(self): | |
return f"<report: countval=0x{self.countval:x} unk1=0x{self.unk1:x} flags={self.flags}>" | |
def ser(self): | |
return [ | |
self.countval & (1<<32)-1, | |
self.countval>>32 & (1<<32)-1, | |
self.unk1 & (1<<32)-1, | |
int(self.flags) | |
] | |
@classmethod | |
def deser(self, seq): | |
if not len(seq) == 4: | |
raise ValueError | |
return ADMAReport( | |
seq[0] | seq[1] << 32, # countval | |
seq[2], # unk1 | |
seq[3] # flags | |
) | |
class ADMAC(Reloadable): | |
def __init__(self, u, devpath, dart=None): | |
self.u = u | |
self.p = u.proxy | |
self.base, _ = u.adt[devpath].get_reg(0) | |
self.regs = ADMACRegs(u, self.base) | |
self.dart = dart | |
def tx_enable(self, channo): | |
self.regs.TX_FLAGS.val = 1<<channo | |
def tx_disable(self, channo): | |
self.regs.TX_FLAGS_CLEAR.val = 1<<channo | |
def tx_reset(self, channo): | |
self.regs.TX_CONTROL[channo].val = 1 | |
self.regs.TX_CONTROL[channo].val = 0 | |
def tx_submit(self, channo, addr, length, flags): | |
desc = ADMADescriptor(addr, length, flags) | |
print(f"submitting: {desc}") | |
if self.regs.TX_DESC_RING[channo].reg.FULL: | |
raise Exception("descriptor ring full") | |
for piece in desc.ser(): | |
self.regs.DESC_WRITE[channo].val = piece | |
def tx_can_submit(self, channo): | |
return not self.regs.TX_DESC_RING[channo].reg.FULL | |
def tx_have_report(self, channo): | |
return not self.regs.TX_REPORT_RING[channo].reg.EMPTY | |
def tx_read_report(self, channo): | |
if self.regs.TX_REPORT_RING[channo].reg.EMPTY: | |
raise Exception("report ring empty") | |
pieces = [] | |
for _ in range(4): | |
pieces.append(self.regs.REPORT_READ[channo].val) | |
return ADMAReport.deser(pieces) | |
def tx_status(self, channo): | |
reg = self.regs.TX_STATUS1[channo].reg | |
self.regs.TX_STATUS1[channo].val = 0xffffffff | |
return reg | |
class PollingConsole(code.InteractiveConsole): | |
def __init__(self, locals=None, filename="<console>"): | |
global patch_stdout, PromptSession, FileHistory | |
global Thread, Queue, Empty | |
from prompt_toolkit import PromptSession | |
from prompt_toolkit.history import FileHistory | |
from prompt_toolkit.patch_stdout import patch_stdout | |
from threading import Thread | |
from queue import Queue, Empty | |
super().__init__(locals, filename) | |
self._qu_input = Queue() | |
self._qu_result = Queue() | |
self._should_exit = False | |
self.session = PromptSession(history=FileHistory(os.path.expanduser("~/.m1n1-history"))) | |
self._other_thread = Thread(target=self._other_thread_main, daemon=False) | |
self._other_thread.start() | |
def __enter__(self): | |
self._patch = patch_stdout() | |
self._patch.__enter__() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self._patch.__exit__(exc_type, exc_val, exc_tb) | |
def _other_thread_main(self): | |
first = True | |
while True: | |
if first: | |
more_input = False | |
first = False | |
else: | |
more_input = self._qu_result.get() | |
try: | |
self._qu_input.put(self.session.prompt("(♫♫) " if not more_input else "... ")) | |
except EOFError: | |
self._qu_input.put(None) | |
return | |
def poll(self): | |
if self._should_exit: | |
return False | |
try: | |
line = self._qu_input.get(timeout=0.01) | |
except Empty: | |
return True | |
if line is None: | |
self._should_exit = True | |
return False | |
self._qu_result.put(self.push(line)) | |
return True | |
class NoConsole: | |
def poll(self): | |
time.sleep(0.01) | |
return True | |
def __enter__(self): | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
pass | |
argparser = argparse.ArgumentParser() | |
argparser.add_argument("--console", action='store_true') | |
argparser.add_argument("-f", "--file", "--input", "--samples", | |
type=str, default=None, | |
help='input filename to take samples from ' \ | |
'(default: standard input)') | |
args = argparser.parse_args() | |
if args.console and args.file is None: | |
print("Specify file with samples (option -f) if using console") | |
sys.exit(1) | |
inp = open(args.file, "rb") if args.file is not None else sys.stdin.buffer | |
p.pmgr_adt_clocks_enable("/arm-io/gpio") | |
p.pmgr_adt_clocks_enable("/arm-io/i2c1") | |
p.pmgr_adt_clocks_enable("/arm-io/admac-sio") | |
p.pmgr_adt_clocks_enable("/arm-io/dart-sio") | |
p.pmgr_adt_clocks_enable("/arm-io/mca-switch") | |
channo = 2 | |
admac = ADMAC(u, "/arm-io/admac-sio") | |
admac.tx_disable(channo) | |
admac.tx_reset(channo) | |
admac.regs.UNK_CONTROL.val = 1 | |
admac.regs.UNK_CONTROL.val = 0 | |
while admac.tx_have_report(channo): | |
print("stale report: ", admac.tx_read_report(channo)) | |
def pmgr_reset(): | |
# pmgr-related, unknown meaning, | |
# needs to be written for the speaker-amp IC to respond over I2C | |
p.write32(0x23d10c000, 0) | |
p.write32(0x23d10c004, 3) | |
p.write32(0x23d10c008, 0) | |
p.write32(0x23d10c00c, 3) | |
pmgr_reset() | |
p.write32(0x238400000, 0x0) | |
p.write32(0x238400000, 0x2) | |
p.write32(0x238400100, 0x0) | |
p.write32(0x238400100, 0x2) | |
p.write32(0x238400300, 0x0) | |
p.write32(0x238400300, 0x2) | |
p.write32(0x238404300, 0x0) | |
p.write32(0x238404300, 0x2) | |
p.write32(0x238404100, 0x0) | |
p.write32(0x238404100, 0x2) | |
p.write32(0x238404000, 0x0) | |
p.write32(0x238404000, 0x2) | |
p.write32(0x238208840, 0x22) | |
p.write32(0x238208854, 0xc00060) | |
p.write32(0x238208854, 0xc00060) | |
mca_switch_base = 0x2_3840_0000 | |
p.write32(0x238404004, 0x100) | |
p.write32(0x238404104, 0x200) | |
p.write32(0x238404108, 0x0) | |
p.write32(0x23840410c, 0xfe) | |
p.write32(0x238408004, 0x100) | |
p.write32(0x23840c004, 0x100) | |
p.write32(0x238308000, 0x102048) | |
# bits 0x0000e0 influence clock | |
# 0x00000f influence sample serialization | |
p.write32(0x23b0400d8, 0x06000000) # 48 ksps, zero-out for ~96 ksps | |
p.write32(0x238400600, 0xe) # 0x8 or have zeroed samples, 0x6 or have no clock | |
p.write32(0x238400604, 0x200) # sensitive in mask 0xf00, any other value disables clock | |
p.write32(0x238400608, 0x4) # 0x4 or zeroed samples | |
chunk_size = 0x10000 | |
heap_start = 0x220000 | |
heap_size = 16*chunk_size | |
heap_end = heap_start + heap_size | |
heap = heap_start | |
dart_base, _ = u.adt["/arm-io/dart-sio"].get_reg(0) # stream index 2 | |
dart = DART(iface, DARTRegs(u, dart_base), util=u) | |
dart.initialize() | |
dart.iomap_at(2, 0x220000, 0x8_0100_0000, 16*chunk_size) | |
dart.invalidate_streams() | |
# the counter isn't necessary for anything, really, it just | |
# demonstrates how descriptor IDs propagate into reports | |
chunk_counter = 1 | |
def fill_data(): | |
global heap, chunk_counter | |
if heap + chunk_size > heap_end: | |
heap = heap_start | |
bytes_ = inp.read(chunk_size) | |
dart.iowrite(2, heap, bytes_) | |
dart.invalidate_streams() | |
admac.tx_submit(channo, heap, chunk_size, 0x100 | (chunk_counter & 0xff)) | |
chunk_counter += 1 | |
heap += chunk_size | |
# toggle the GPIO line driving the speaker-amp IC reset | |
p.write32(0x23c1002d4, 0x76a02) # invoke reset | |
p.write32(0x23c1002d4, 0x76a03) # take out of reset | |
i2c1 = I2C(u, "/arm-io/i2c1") | |
fill_data() | |
admac.tx_enable(channo) | |
# accesses to 0x100-sized blocks in the +0x4000 region require | |
# the associated enable bit cleared, or they cause SErrors | |
def mca_switch_unk_disable(): | |
for off in [0x4000, 0x4100, 0x4300]: | |
p.write32(mca_switch_base + off, 0x0) | |
def mca_switch_unk_enable(): | |
for off in [0x4000, 0x4100, 0x4300]: | |
p.write32(mca_switch_base + off, 0x1) | |
p.write32(0x238404104, 0x202) | |
p.write32(0x238404208, 0x3107) | |
mca_switch_unk_enable() | |
# by ADT and leaked schematic, i2c1 contains TAS5770L, | |
# which is not a public part. but there's e.g. TAS2110 | |
# with similar registers | |
# | |
# https://www.ti.com/product/TAS2110 | |
# | |
# if the speaker-amp IC loses clock on the serial sample input, | |
# it automatically switches to software shutdown. | |
# | |
i2c1.write_reg(0x31, 0x08, [0x40]) | |
i2c1.write_reg(0x31, 0x0a, [0x06]) | |
i2c1.write_reg(0x31, 0x0b, [0x00]) | |
i2c1.write_reg(0x31, 0x0c, [0x1a]) | |
i2c1.write_reg(0x31, 0x1c, [0x82]) | |
i2c1.write_reg(0x31, 0x1d, [0x06]) | |
i2c1.write_reg(0x31, 0x16, [0x50]) | |
i2c1.write_reg(0x31, 0x17, [0x04]) | |
i2c1.write_reg(0x31, 0x1b, [0x01]) | |
i2c1.write_reg(0x31, 0x0d, [0x00]) | |
#i2c1.write_reg(0x31, 0x03, [0x14]) | |
# amplifier gain, presumably this is the lowest setting | |
i2c1.write_reg(0x31, 0x03, [0x0]) | |
# take the IC out of software shutdown | |
i2c1.write_reg(0x31, 0x02, [0x0c]) | |
with (PollingConsole(locals()) if args.console else NoConsole()) as cons: | |
try: | |
while cons.poll(): | |
while (not admac.tx_have_report(channo)) and cons.poll(): | |
pass | |
while admac.tx_have_report(channo): | |
print("report: ", admac.tx_read_report(channo)) | |
while admac.tx_can_submit(channo): | |
fill_data() | |
except KeyboardInterrupt: | |
pass | |
# mute | |
i2c1.write_reg(0x31, 0x02, [0x0d]) | |
# software shutdown | |
i2c1.write_reg(0x31, 0x02, [0x0e]) | |
admac.tx_disable(channo) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment