Last active
August 29, 2015 14:27
-
-
Save cfelton/1365deee1f3d95a2007c to your computer and use it in GitHub Desktop.
a myhdl block buffer input and output example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from random import randint | |
import myhdl | |
from myhdl import (Signal, ResetSignal, intbv, always_seq, always, | |
always_comb, delay, instance, traceSignals, | |
Simulation, StopSimulation) | |
class BlockOfData(object): | |
def __init__(self, btype, block_size=128): | |
self.size = block_size | |
self.valid = Signal(bool(0)) | |
self.data = [Signal(btype) for _ in range(block_size)] | |
def block_sum(clock, block, sum_out): | |
""" sum the values of a block """ | |
block_size = block.size | |
smax = block.data[0].max * block_size | |
# override the passed signal :) | |
sum_out = Signal(intbv(0, min=-smax, max=smax)) | |
@always(clock.posedge) | |
def rtl(): | |
if block.valid: | |
sums = 0 | |
for ii in range(block_size): | |
sums = sums + block.data[ii] | |
sum_out.next = sums | |
return rtl | |
def block_input_buffer(clock, reset, data_in, block_out): | |
assert isinstance(block_out, BlockOfData) | |
block_size = block_out.size | |
cnt = Signal(intbv(0, min=0, max=block_size)) | |
is_all_ones = Signal(bool(0)) | |
all_ones = (2**len(data_in))-1 | |
@always_seq(clock.posedge, reset=reset) | |
def rtl_input_block(): | |
block_out.data[cnt].next = data_in | |
if cnt == block_size-1: | |
block_out.valid.next = True | |
cnt.next = 0 | |
else: | |
block_out.valid.next = False | |
cnt.next = cnt + 1 | |
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
# this is not used (needed) for block buffer | |
@always_comb | |
def rtl_mon(): | |
if data_in == all_ones: | |
is_all_ones.next = True | |
else: | |
is_all_ones.next = False | |
sum_out = None | |
sum_inst = block_sum(clock, block_out, sum_out) | |
# end not needed | |
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
return rtl_input_block, rtl_mon, sum_inst | |
def block_output_buffer(clock, reset, block_in, data_out): | |
assert isinstance(block_in, BlockOfData) | |
block_size = block_in.size | |
cnt = Signal(intbv(0, min=0, max=block_size)) | |
@always_seq(clock.posedge, reset=reset) | |
def rtl_output_block(): | |
if block_in.valid: | |
assert cnt == 0 | |
cnt.next = 1 | |
data_out.next = block_in.data[cnt] | |
elif cnt >= 1 and cnt < block_size: | |
cnt.next = cnt + 1 if cnt < block_size-1 else 0 | |
data_out.next = block_in.data[cnt] | |
return rtl_output_block | |
def block_processing(clock, block_i, block_o): | |
assert block_i.size == block_o.size | |
block_size = block_i.size | |
@always(clock.posedge) | |
def rtl(): | |
if block_i.valid: | |
block_o.valid.next = True | |
for ii in range(block_size): | |
block_o.data[ii].next = block_i.data[ii] | |
else: | |
block_o.valid.next = False | |
return rtl | |
def block_processing_top(clock, reset, data_in, data_out, block_size=128): | |
block_i = BlockOfData(data_in.val, block_size=block_size) | |
block_o = BlockOfData(data_out.val, block_size=block_size) | |
mod_insts = [] | |
mod_insts += [block_input_buffer(clock, reset, data_in, block_i)] | |
mod_insts += [block_processing(clock, block_i, block_o)] | |
mod_insts += [block_output_buffer(clock, reset, block_o, data_out)] | |
return mod_insts | |
def test_block_buffer(): | |
clock = Signal(bool(0)) | |
reset = ResetSignal(0, active=0, async=True) | |
data_in, data_out = [Signal(intbv(0)[24:]) for _ in range(2)] | |
block_in, block_out = [], [] | |
def _bench(): | |
block_size = 16 | |
tbdut = block_processing_top(clock, reset, data_in, data_out, | |
block_size=block_size) | |
@always(delay(5)) | |
def tbclk(): | |
clock.next = not clock | |
@instance | |
def tbstim(): | |
reset.next = reset.active | |
yield delay(33) | |
yield clock.posedge | |
reset.next = not reset.active | |
di = 0xFFFFFF | |
for ii in range(block_size): | |
data_in.next = di | |
block_in.append(di) | |
yield clock.posedge | |
di = randint(0, data_in.max-1) | |
while len(block_in) > len(block_out): | |
yield clock.posedge | |
raise StopSimulation | |
@instance | |
def tbmon(): | |
grab = False | |
while True: | |
yield clock.posedge | |
if data_out == 0xFFFFFF and not grab: | |
grab = True | |
block_out.append(int(data_out)) | |
elif grab: | |
block_out.append(int(data_out)) | |
return tbdut, tbclk, tbstim, tbmon | |
# debug the hierarchy | |
myhdl.dump_hierarchy(_bench) | |
# remove previous VCD files | |
traceSignals.name = 'test_block_buffer' | |
if os.path.isfile(traceSignals.name+'.vcd'): | |
os.remove(traceSignals.name+'.vcd') | |
# run simulation and check the outputs | |
Simulation(traceSignals(_bench)).run() | |
for ii, di, do in zip(range(len(block_in)), block_in, block_out): | |
assert di == do, "{:5d}: {:06X} != {:06X}".format(ii, di, do) | |
myhdl.toVerilog(block_processing_top, clock, reset, | |
data_in, data_out) | |
myhdl.toVHDL(block_processing_top, clock, reset, | |
data_in, data_out) | |
if __name__ == '__main__': | |
test_block_buffer() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment