-
-
Save whitequark/62e83edd46bb4fd78b7241377615f2f0 to your computer and use it in GitHub Desktop.
from amaranth import * | |
from amaranth.lib import data, wiring, stream, io | |
from amaranth.lib.wiring import In, Out | |
from amaranth.sim import Simulator | |
class BitSerializer(wiring.Component): | |
def __init__(self, *, width, length): | |
self._length = length | |
super().__init__({ | |
"en": In(1), | |
"stream": In(stream.Signature(data.StructLayout({ | |
"o": data.ArrayLayout(width, length), | |
"oe": 1 | |
}))), | |
"buffer": Out(io.FFBuffer.Signature("o", width)), | |
}) | |
def elaborate(self, platform): | |
m = Module() | |
o_reg = Signal.like(self.stream.p.o) | |
oe_reg = Signal.like(self.stream.p.oe) | |
m.d.comb += self.buffer.o.eq(o_reg[0]) | |
m.d.comb += self.buffer.oe.eq(oe_reg) | |
count = Signal(range(self._length)) | |
with m.If(self.en): | |
with m.If(count == 0): | |
with m.If(self.stream.valid): | |
m.d.comb += self.stream.ready.eq(1) | |
m.d.sync += count.eq(self._length - 1) | |
m.d.sync += o_reg.eq(self.stream.p.o) | |
m.d.sync += oe_reg.eq(self.stream.p.oe) | |
with m.Else(): | |
m.d.sync += count.eq(count - 1) | |
m.d.sync += o_reg.eq(o_reg.as_value()[len(o_reg[0]):]) | |
return m | |
class BitDeserializer(wiring.Component): | |
def __init__(self, *, width, length): | |
self._length = length | |
super().__init__({ | |
"en": In(1), | |
"stream": Out(stream.Signature(data.StructLayout({ | |
"i": data.ArrayLayout(width, length), | |
}))), | |
"buffer": Out(io.FFBuffer.Signature("i", width)), | |
}) | |
def elaborate(self, platform): | |
m = Module() | |
i_reg = Signal.like(self.stream.p.i) | |
m.d.comb += self.stream.p.i.eq(i_reg) | |
count = Signal(range(self._length)) | |
with m.If(self.stream.valid): | |
with m.If(self.stream.ready): | |
m.d.sync += self.stream.valid.eq(0) | |
with m.Elif(self.en): | |
with m.If(count == self._length - 1): | |
m.d.sync += count.eq(0) | |
m.d.sync += self.stream.valid.eq(1) | |
with m.Else(): | |
m.d.sync += count.eq(count + 1) | |
m.d.sync += i_reg.eq(Cat(i_reg.as_value()[len(i_reg[0]):], self.buffer.i)) | |
return m | |
class BitEnableGenerator(wiring.Component): | |
cycles: In(stream.Signature(8)) # how many cycles to produce | |
clk_en: Out(1) # high for clock negedge and clock posedge | |
o_en: Out(1) # high for clock negedge | |
i_en: Out(1) # high for clock posedge, delayed by `latency` cycles | |
def __init__(self, *, half_period, latency): | |
self._half_period = half_period | |
self._latency = latency | |
super().__init__() | |
def elaborate(self, platform): | |
m = Module() | |
negedge = Signal() | |
posedge = Signal() | |
m.d.comb += self.clk_en.eq(negedge | posedge) | |
m.d.comb += self.o_en.eq(negedge) | |
i_en = posedge | |
for _ in range(self._latency): | |
i_en_delay = Signal() | |
m.d.sync += i_en_delay.eq(i_en) | |
i_en = i_en_delay | |
m.d.comb += self.i_en.eq(i_en) | |
count = Signal.like(self.cycles.payload) | |
timer = Signal(range(self._half_period)) | |
phase = Signal() | |
with m.If(count == 0): | |
m.d.comb += self.cycles.ready.eq(1) | |
with m.If(self.cycles.valid): | |
m.d.sync += count.eq(self.cycles.payload) | |
m.d.sync += timer.eq(0) | |
m.d.sync += phase.eq(0) | |
with m.Else(): | |
# meow | |
with m.If(timer == self._half_period - 1): | |
m.d.sync += count.eq(Mux(phase, count - 1, count)) | |
m.d.sync += timer.eq(0) | |
m.d.sync += phase.eq(~phase) | |
m.d.comb += negedge.eq(phase == 0) | |
m.d.comb += posedge.eq(phase == 1) | |
with m.Else(): | |
m.d.sync += timer.eq(timer + 1) | |
return m | |
class SPIControllerBus(wiring.Component): | |
o_stream: In(stream.Signature(8)) | |
i_stream: Out(stream.Signature(8)) | |
sck_buffer: Out(io.FFBuffer.Signature("o", 1)) | |
copi_buffer: Out(io.FFBuffer.Signature("o", 1)) | |
cipo_buffer: Out(io.FFBuffer.Signature("i", 1)) | |
def __init__(self, *, half_period): | |
self._half_period = half_period | |
super().__init__() | |
def elaborate(self, platform): | |
m = Module() | |
m.submodules.en_gen = en_gen = \ | |
BitEnableGenerator(half_period=self._half_period, latency=1) | |
m.d.comb += [ | |
en_gen.cycles.p.eq(8), | |
en_gen.cycles.valid.eq(self.o_stream.valid & self.i_stream.ready), | |
] | |
m.submodules.sck_ser = sck_ser = BitSerializer(width=1, length=16) | |
wiring.connect(m, clk=sck_ser.buffer, buf=wiring.flipped(self.sck_buffer)) | |
m.d.comb += [ | |
sck_ser.en.eq(en_gen.clk_en), | |
sck_ser.stream.p.o.eq(0b1010101010101010), | |
sck_ser.stream.p.oe.eq(1), | |
sck_ser.stream.valid.eq(1), | |
] | |
m.submodules.copi_ser = copi_ser = BitSerializer(width=1, length=8) | |
wiring.connect(m, ser=copi_ser.buffer, buf=wiring.flipped(self.copi_buffer)) | |
m.d.comb += [ | |
copi_ser.en.eq(en_gen.o_en), | |
copi_ser.stream.p.o.eq(self.o_stream.p), | |
copi_ser.stream.p.oe.eq(1), | |
copi_ser.stream.valid.eq(self.o_stream.valid), | |
self.o_stream.ready.eq(copi_ser.stream.ready), | |
] | |
m.submodules.cipo_des = cipo_des = BitDeserializer(width=1, length=8) | |
wiring.connect(m, des=cipo_des.buffer, buf=wiring.flipped(self.cipo_buffer)) | |
m.d.comb += [ | |
cipo_des.en.eq(en_gen.i_en), | |
self.i_stream.p.eq(cipo_des.stream.p), | |
self.i_stream.valid.eq(cipo_des.stream.valid), | |
cipo_des.stream.ready.eq(self.i_stream.ready), | |
] | |
return m | |
dut = Module() | |
dut.submodules.bus = bus = SPIControllerBus(half_period=10) | |
dut.d.sync += bus.cipo_buffer.i.eq(bus.copi_buffer.o) | |
async def stream_get(ctx, stream): | |
ctx.set(stream.ready, 1) | |
payload, = await ctx.tick().sample(stream.payload).until(stream.valid) | |
ctx.set(stream.ready, 0) | |
return payload | |
async def stream_put(ctx, stream, payload): | |
ctx.set(stream.valid, 1) | |
ctx.set(stream.payload, payload) | |
await ctx.tick().until(stream.ready) | |
ctx.set(stream.valid, 0) | |
async def testbench_ser(ctx): | |
await stream_put(ctx, bus.o_stream, 0x69) | |
await stream_put(ctx, bus.o_stream, 0xAA) | |
async def testbench_des(ctx): | |
assert await stream_get(ctx, bus.i_stream) == 0x69 | |
assert await stream_get(ctx, bus.i_stream) == 0xAA | |
sim = Simulator(dut) | |
sim.add_clock(1e-6) | |
sim.add_testbench(testbench_ser) | |
sim.add_testbench(testbench_des) | |
with sim.write_vcd("test.vcd"): | |
sim.run() |
This should be addressed in SPIControllerBus itself; am I mistaken?
Ah, yeah, that handles it. In that case I think the code in BitDeserializer will never cover the valid && !ready
branch, but that is fine as it's a somewhat generic module, and it may sometimes be safe to pause sampling on backpressure. Looking at the Stream RFC I think there is still a safety issue here, since present ready does not imply future ready:
- Once the receiver asserts
ready
it may deassert it at any time.
But this is just part of a larger implied contract for using the SPIControllerBus. The example is 100% fine as is and gets the point across. Sorry for the noise.
Looking at the Stream RFC I think there is still a safety issue here, since present ready does not imply future ready:
I don't think that's a problem since there's a 1-element FIFO in the payload/valid registers and that'll hold the data until the ready is asserted gain.
Or in other words: the contract of
BitSerializer
/BitDeserializer
is that at the time you start strobingen
, you need to have the data or the space for data available. In a proper reusable module rather than just a demo this would be written out explicitly.