Last active
December 16, 2022 03:19
-
-
Save tzechienchu/43d85b837c6b5e42eb55b697e6d536ca to your computer and use it in GitHub Desktop.
Amaranth HDL I2C Slave via amaranth-community-unofficial
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from amaranth import Signal, Module, Cat, Elaboratable, Record, Mux | |
from amaranth.hdl.rec import DIR_FANIN, DIR_FANOUT | |
from amaranth.hdl.ast import Rose, Fell | |
from amaranth.compat import TSTriple | |
from amaranth.lib.cdc import FFSynchronizer | |
class I2CTarget(Elaboratable): | |
""" | |
Simple I2C target. | |
Clock stretching is not supported. | |
Builtin responses (identification, general call, etc.) are not provided. | |
Note that the start, stop, and restart strobes are transaction delimiters rather than direct | |
indicators of bus conditions. A transaction always starts with a start strobe and ends with | |
either a stop or a restart strobe. That is, a restart strobe, similarly to a stop strobe, may | |
be only followed by another start strobe (or no strobe at all if the device is not addressed | |
again). | |
:attr address: | |
The 7-bit address the target will respond to. | |
:attr start: | |
Start strobe. Active for one cycle immediately after acknowledging address. | |
:attr stop: | |
Stop stobe. Active for one cycle immediately after a stop condition that terminates | |
a transaction that addressed this device. | |
:attr restart: | |
Repeated start strobe. Active for one cycle immediately after a repeated start condition | |
that terminates a transaction that addressed this device. | |
:attr write: | |
Write strobe. Active for one cycle immediately after receiving a data octet. | |
:attr data_i: | |
Data octet received from the initiator. Valid when ``write`` is high. | |
:attr ack_o: | |
Acknowledge strobe. If active for at least one cycle during the acknowledge bit | |
setup period (one half-period after write strobe is asserted), acknowledge is asserted. | |
Otherwise, no acknowledge is asserted. May use combinatorial feedback from ``write``. | |
:attr read: | |
Read strobe. Active for one cycle immediately before latching ``data_o``. | |
:attr data_o: | |
Data octet to be transmitted to the initiator. Latched immediately after receiving | |
a read command. | |
""" | |
def __init__(self): | |
self.scl_t = TSTriple() | |
self.sda_t = TSTriple() | |
self.scl_i0 = Signal(name="scl_i") #self.scl_t.i | |
self.scl_o = Signal(name="scl_o",reset=1) | |
self.scl_oe = Signal(name="scl_oe") #self.scl_t.oe | |
self.sda_i0 = Signal(name="sda_i") #self.sda_t.i | |
self.sda_o = Signal(name="sda_o",reset=1) | |
self.sda_oe = Signal(name="sda_oe") #self.sda_t.oe | |
self.scl_i = Signal() | |
self.sda_i = Signal() | |
self.sample = Signal(name="bus_sample") | |
self.setup = Signal(name="bus_setup") | |
self.bus_start = Signal(name="bus_start") | |
self.bus_stop = Signal(name="bus_stop") | |
self.address = Signal(7) | |
self.busy = Signal() # clock stretching request (experimental, undocumented) | |
self.start = Signal() | |
self.stop = Signal() | |
self.restart = Signal() | |
self.write = Signal() | |
self.data_i = Signal(8) | |
self.ack_o = Signal() | |
self.read = Signal() | |
self.data_o = Signal(8) | |
self.ack_i = Signal() | |
def elaborate(self, platform): | |
m = Module() | |
bitno = Signal(3) | |
shreg_i = Signal(8) | |
shreg_o = Signal(8) | |
scl_r = Signal(reset=1,name="scl_r") | |
sda_r = Signal(reset=1,name="sda_r") | |
m.d.comb += [ | |
self.scl_t.i.eq(self.scl_i0), | |
self.scl_t.o.eq(self.scl_o), | |
self.scl_t.oe.eq(self.scl_oe), | |
#self.scl_o.eq(0), | |
self.scl_oe.eq(~self.scl_o), | |
self.sda_t.i.eq(self.sda_i0), | |
self.sda_t.o.eq(self.sda_o), | |
self.sda_t.oe.eq(self.sda_oe), | |
#self.sda_o.eq(0), | |
self.sda_oe.eq(~self.sda_o), | |
self.sample.eq(~scl_r & self.scl_i), | |
self.setup.eq(scl_r & ~self.scl_i), | |
self.bus_start.eq(self.scl_i & sda_r & ~self.sda_i), | |
self.bus_stop.eq(self.scl_i & ~sda_r & self.sda_i), | |
] | |
m.d.sync += [ | |
scl_r.eq(self.scl_i), | |
sda_r.eq(self.sda_i), | |
] | |
m.submodules += [ | |
FFSynchronizer(self.scl_t.i, self.scl_i, reset=1), | |
FFSynchronizer(self.sda_t.i, self.sda_i, reset=1) | |
] | |
with m.FSM(name="i2c_fsm") as fsm: | |
with m.State('IDLE'): | |
with m.If(self.bus_start): | |
m.next = 'START' | |
with m.State('START'): | |
with m.If(self.bus_stop): | |
m.next = 'IDLE' | |
with m.Elif(self.setup): | |
m.next = 'ADDR-SHIFT' | |
m.d.sync += bitno.eq(0) | |
with m.State('ADDR-SHIFT'): | |
with m.If(self.bus_stop): | |
m.next = 'IDLE' | |
with m.Elif(self.bus_start): | |
m.next = 'START' | |
with m.Elif(self.sample): | |
m.d.sync += shreg_i.eq((shreg_i << 1) | self.sda_i) | |
with m.Elif(self.setup): | |
m.d.sync += bitno.eq(bitno + 1) | |
with m.If(bitno == 7): | |
with m.If(shreg_i[1:] == self.address): | |
m.d.comb += self.start.eq(1) | |
m.d.sync += [ | |
self.sda_o.eq(0), | |
bitno.eq(0) | |
] | |
m.next = 'ADDR-ACK' | |
with m.Else(): | |
m.next = 'IDLE' | |
with m.State('ADDR-ACK'): | |
with m.If(self.bus_stop): | |
m.d.comb += self.stop.eq(1) | |
m.next = 'IDLE' | |
with m.Elif(self.bus_start): | |
m.d.comb += self.restart.eq(1) | |
m.next = 'START' | |
with m.Elif(self.setup): | |
with m.If(~shreg_i[0]): | |
m.d.sync += self.sda_o.eq(1) | |
m.next = 'WRITE-SHIFT' | |
with m.Elif(self.sample): | |
with m.If(shreg_i[0]): | |
m.d.sync += shreg_o.eq(self.data_o) | |
m.d.comb += self.read.eq(1) #Befre Enter READ-STRETCH | |
m.next = 'READ-STRETCH' | |
with m.State('WRITE-SHIFT'): | |
with m.If(self.bus_stop): | |
m.d.comb += self.stop.eq(1) | |
m.next = 'IDLE' | |
with m.Elif(self.bus_start): | |
m.d.comb += self.restart.eq(1) | |
m.next = 'START' | |
with m.Elif(self.sample): | |
m.d.sync += shreg_i.eq((shreg_i << 1) | self.sda_i) ###SDA | |
with m.If(self.setup): | |
m.d.sync += bitno.eq(bitno + 1) | |
with m.If(bitno == 7): | |
m.d.sync += self.data_i.eq(shreg_i) | |
m.next = 'WRITE-ACK' | |
with m.State('WRITE-ACK'): | |
m.d.comb += self.write.eq(1) #After Enter Write Ack | |
with m.If(self.bus_stop): | |
m.d.comb += self.stop.eq(1) | |
m.next = 'IDLE' | |
with m.Elif(self.bus_start): | |
m.d.comb += self.restart.eq(1) | |
m.next = 'START' | |
with m.Elif(self.setup): | |
m.d.sync += self.sda_o.eq(1) | |
m.next = 'WRITE-SHIFT' | |
with m.Elif(~self.scl_i): | |
m.d.sync += self.scl_o.eq(~self.busy) | |
with m.If(self.ack_o): | |
m.d.sync += self.sda_o.eq(0) | |
with m.State('READ-STRETCH'): | |
with m.If(self.busy): | |
m.d.sync += shreg_o.eq(self.data_o) | |
with m.If(self.bus_stop): | |
m.d.comb += self.stop.eq(1) | |
m.next = 'IDLE' | |
with m.Elif(self.bus_start): | |
m.next = 'START' | |
with m.Elif(self.busy): | |
with m.If(~self.scl_i): | |
m.d.sync += self.scl_o.eq(0) | |
with m.Else(): | |
with m.If(~self.scl_i): | |
m.d.sync += self.sda_o.eq(shreg_o[7]) | |
m.d.sync += self.scl_o.eq(1) | |
m.next = "READ-SHIFT" | |
with m.State('READ-SHIFT'): | |
with m.If(self.bus_stop): | |
m.d.comb += self.stop.eq(1) | |
m.next = 'IDLE' | |
with m.Elif(self.bus_start): | |
m.d.comb += self.restart.eq(1) | |
m.next = 'START' | |
with m.Elif(self.setup): | |
m.d.sync += [ | |
self.sda_o.eq(shreg_o[7]), | |
#shreg_o.eq(shreg_o << 1), | |
] | |
with m.Elif(self.sample): | |
m.d.sync += [ | |
shreg_o.eq(shreg_o << 1), | |
bitno.eq(bitno + 1) | |
] | |
with m.If(bitno == 7): | |
m.next = 'READ-ACK' | |
with m.State('READ-ACK'): | |
with m.If(self.bus_stop): | |
m.d.comb += self.stop.eq(1) | |
m.next = 'IDLE' | |
with m.Elif(self.bus_start): | |
m.d.comb += self.restart.eq(1) | |
m.next = 'START' | |
with m.Elif(self.setup): | |
m.d.sync += self.sda_o.eq(1) | |
with m.Elif(self.sample): | |
with m.If(~self.sda_i): | |
m.d.sync += shreg_o.eq(self.data_o) | |
m.d.comb += self.read.eq(1) | |
m.next = 'READ-STRETCH' | |
with m.Else(): | |
m.d.comb += self.stop.eq(1) | |
m.next = 'IDLE' | |
return m | |
if __name__ == "__main__": | |
from amaranth.sim import Simulator | |
top = I2CTarget() | |
sim = Simulator(top) | |
period = 8 | |
wait_cycle = period // 4 | |
def half_period(): | |
for _ in range(period): | |
yield | |
def start(bus): | |
yield bus.sda_i0.eq(0) | |
yield from half_period() | |
def wait(): | |
for _ in range(wait_cycle): | |
yield | |
def rep_start(bus): | |
yield bus.scl_i0.eq(0) | |
yield # tHD;DAT | |
yield bus.sda_i0.eq(1) | |
yield from half_period() | |
yield bus.scl_i0.eq(1) | |
yield from half_period() | |
yield from start() | |
def stop(bus): | |
yield bus.scl_i0.eq(0) | |
yield # tHD;DAT | |
yield bus.sda_i0.eq(0) | |
yield from half_period() | |
yield bus.scl_i0.eq(1) | |
yield from half_period() | |
yield bus.sda_i0.eq(1) | |
yield from half_period() | |
def write_bit(bus, bit): | |
yield bus.scl_i0.eq(0) | |
yield # tHD;DAT | |
yield bus.sda_i0.eq(bit) | |
yield from half_period() | |
yield bus.scl_i0.eq(1) | |
yield from half_period() | |
yield bus.sda_i0.eq(1) | |
def write_octet(bus, octet): | |
for bit in range(8)[::-1]: | |
yield from write_bit(bus, (octet >> bit) & 1) | |
def read_bit(bus): | |
yield bus.scl_i0.eq(0) | |
yield from half_period() | |
yield bus.scl_i0.eq(1) | |
bit = (yield bus.sda_o) | |
yield from half_period() | |
return bit | |
def read_octet(bus): | |
octet = 0 | |
for bit in range(8): | |
octet = (octet << 1) | (yield from read_bit(bus)) | |
return octet | |
def start_addr(bus, read): | |
yield from start(bus) | |
yield from write_octet(bus, 0b01010000 | read) | |
yield from read_bit(top) | |
def startup(): | |
yield top.scl_i0.eq(1) | |
yield from half_period() | |
yield top.sda_i0.eq(1) | |
yield from half_period() | |
yield from half_period() | |
yield from half_period() | |
yield top.address.eq(0b0101000) | |
#Read | |
def test_bench(): | |
yield from startup() | |
yield top.data_o.eq(0b10100101) | |
yield from start_addr(top, read=True) | |
yield from read_octet(top) | |
yield top.data_o.eq(0b00110011) | |
yield from write_bit(top, 0) | |
yield from read_octet(top) | |
yield from write_bit(top, 0) | |
#yield from self.assertState(tb, "READ-SHIFT") | |
#Read | |
def test_bench1(): | |
yield from startup() | |
yield top.data_o.eq(0b10100101) | |
yield from start_addr(top, read=True) | |
yield from read_octet(top) | |
yield from write_bit(top, 0) | |
#yield from self.assertState(tb, "READ-SHIFT") | |
#Write | |
def test_bench2(): | |
yield from startup() | |
yield from start_addr(top, read=False) | |
yield from write_octet(top,0b10100101) | |
yield from read_bit(top) | |
yield from write_octet(top,0b00100100) | |
yield from read_bit(top) | |
yield top.scl_i0.eq(0) | |
yield top.sda_i.eq(0) | |
yield from half_period() | |
yield top.scl_i0.eq(1) | |
yield from half_period() | |
yield top.sda_i0.eq(1) | |
yield from wait() | |
#yield from self.assertCondition(tb, lambda: (yield tb.dut.stop)) | |
yield | |
#yield from self.assertState(tb, "IDLE") | |
#Read | |
def test_bench3(): | |
yield from startup() | |
yield top.data_o.eq(0b10100101) | |
yield from start_addr(top, read=True) | |
yield from read_octet(top) | |
yield top.data_o.eq(0b00110011) | |
yield from write_bit(top,0) | |
yield from read_octet(top) | |
yield from write_bit(top,0) | |
#yield from self.assertState(tb, "READ-SHIFT") | |
yield from wait() | |
yield from wait() | |
sim.add_clock(10e-6) # 10 MHz | |
sim.add_sync_process(test_bench3) | |
with sim.write_vcd("i2cslave.vcd"): | |
sim.run() | |
from amaranth.back import verilog | |
with open("i2cslave.v", "w") as f: | |
f.write(verilog.convert(top, ports=[ | |
top.start, | |
top.scl_i, | |
top.scl_o, | |
top.scl_oe, | |
top.sda_i, | |
top.sda_o, | |
top.sda_oe, | |
])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment