Last active
August 6, 2022 11:11
-
-
Save michalmonday/4c6cfd40031025563e06ae2ff148f18c to your computer and use it in GitHub Desktop.
Efficient (hopefully) pynq dma receiver with multiple buffers that allows to get the data as a list.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' Example usage with dma.sendchannel supplying the data: | |
import os, subprocess, sys, re, time | |
from pathlib import Path | |
from pynq import Overlay | |
from pynq import allocate | |
from threading import Thread | |
from dma_receiver import DmaReceiver | |
BASE_DIR = Path('/home/xilinx/Django/pynq') | |
PATH = BASE_DIR / 'tools/make' | |
base = Overlay(str(BASE_DIR / 'dma_test.bit')) # design like this: https://www.youtube.com/watch?v=K4OkNH17hiA | |
#base = Overlay(str(BASE_DIR / 'onboard.bit')) | |
output_buffer = allocate(shape=(16,), dtype='u8') | |
dma_send = base.axi_dma_0.sendchannel | |
from dma_receiver import DmaReceiver | |
def receiver_thread(base): | |
dma_receiver = DmaReceiver(base.axi_dma_0) | |
while True: | |
print( dma_receiver.get() ) | |
rt = Thread(target=receiver_thread, args=[base], daemon=True) | |
rt.start() | |
output_buffer[:] = [i for i in range(1, len(output_buffer)+1)] | |
dma_send.transfer(output_buffer) | |
dma_send.wait() | |
output_buffer[:] = [i + 0xFF for i in range(1, len(output_buffer)+1)] | |
dma_send.transfer(output_buffer) | |
dma_send.wait() | |
''' | |
from queue import Queue | |
from threading import Thread, Lock | |
import re | |
import time | |
from pynq import allocate | |
class DmaReceiver: | |
def __init__(self, dma, items_limit=10000, buffer_count=2, allocate_args=[], allocate_kwargs={'shape':(16,), 'dtype':'u8'}): | |
''' Dma receiver will not read more data while items_limit is reached. | |
items_limit limits the self.out_data. ''' | |
self.dma = dma | |
self.buffer_count = buffer_count | |
# item_size = the number of bytes in a single FIFO item | |
self.item_size = int( re.search('\d+$', allocate_kwargs.get('dtype') ).group(0) ) | |
# input buffers | |
self.ibs = [allocate(*allocate_args, **allocate_kwargs) for _ in range(buffer_count)] | |
# Non-empty queue indicate that particular input buffer transferred data which | |
# wasn't copied yet. Queues are used because they're thread safe and provide blocking "get" method | |
self.ib_queues = [Queue() for _ in range(buffer_count)] | |
self.rec_len_queue = Queue() | |
self.recv_thread = Thread(target=self.recv_thread, daemon=True) | |
self.copying_thread = Thread(target=self.copying_thread, daemon=True) | |
self.items_limit = items_limit | |
self.out_data_lock = Lock() | |
self.out_data = [] | |
self.recv_thread.start() | |
self.copying_thread.start() | |
def recv_thread(self): | |
dma_rec = self.dma.recvchannel | |
active_ib = 0 | |
while True: | |
ib = self.ibs[active_ib] | |
ib_queue = self.ib_queues[active_ib] | |
active_ib += 1 | |
active_ib %= self.buffer_count | |
# if input buffer was used previously and received data wasn't copied to "self.out_data" | |
# then wait until that happens before issuing another transfer | |
while not ib_queue.empty(): | |
time.sleep(0.001) | |
self.wait_until_under_storage_limit() | |
dma_rec.transfer(ib) | |
dma_rec.wait() | |
bytes_count = self.get_last_recv_length() | |
ib_queue.put(1) # make queue non-empty, indicating the data it received wasn't copied yet | |
self.rec_len_queue.put(bytes_count) | |
def copying_thread(self): | |
active_ib = 0 | |
while True: | |
ib = self.ibs[active_ib] | |
ib_queue = self.ib_queues[active_ib] | |
active_ib += 1 | |
active_ib %= self.buffer_count | |
bytes_count = self.rec_len_queue.get(block=True) # blocking until recv_thread receives data | |
items_count = bytes_count // self.item_size | |
#print(f'bytes_count={bytes_count}') | |
data = list(ib[0:items_count]) # make a copy of data | |
# with self.out_data_lock: | |
# self.out_data.extend(ib[:bytes_count].tolist()) | |
#print('ib slice type is =',type(ib[:2])) | |
ib_queue.get(block=True) # allow another transfer for the same input buffer | |
with self.out_data_lock: | |
self.out_data.extend(data) | |
#if len(self.out_data) >= self.items_limit: | |
# print(f'Storage limit reached (out_data length={len(self.out_data)})') | |
def get_last_recv_length(self): | |
# dma = base.axi_dma_0 | |
# 0x58 is the address of S2MM_length as shown in the table from: | |
# https://docs.xilinx.com/r/en-US/pg021_axi_dma/AXI-DMA-Register-Address-Map | |
# return self.dma.mmio.read(0x58) // self.item_size | |
return self.dma.recvchannel.transferred | |
def available(self): | |
with self.out_data_lock: | |
return len(self.out_data) != 0 | |
def get(self): | |
while not self.available(): | |
time.sleep(0.001) | |
with self.out_data_lock: | |
data = list(self.out_data) | |
self.out_data = [] | |
return data | |
def storage_limit_reached(self): | |
with self.out_data_lock: | |
out_data_len = len(self.out_data) | |
return out_data_len >= self.items_limit | |
def wait_until_under_storage_limit(self): | |
while self.storage_limit_reached(): | |
time.sleep(0.001) # this delay allows other threads to use the lock to update self.out_data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment