Created
April 12, 2020 18:28
-
-
Save devanlai/4f56c11e24efcfeefa73cc7a9a48d7fe to your computer and use it in GitHub Desktop.
WIP pure-python sigrok decoder host
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__all__ = ["Decoder", "OUTPUT_ANN", "OUTPUT_PYTHON", "OUTPUT_BINARY", "OUTPUT_META", "SRD_CONF_SAMPLERATE"] | |
from .decoder import Decoder | |
from .constants import OUTPUT_ANN, OUTPUT_PYTHON, OUTPUT_BINARY, OUTPUT_META, SRD_CONF_SAMPLERATE |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__all__ = ["OUTPUT_ANN", "OUTPUT_PYTHON", "OUTPUT_BINARY", "OUTPUT_META", "SRD_CONF_SAMPLERATE"] | |
OUTPUT_ANN, OUTPUT_PYTHON, OUTPUT_BINARY, OUTPUT_META = range(4) | |
SRD_CONF_SAMPLERATE = 10000 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__all__ = ["Decoder"] | |
from array import array | |
from typing import Any, Optional, Sequence, Tuple | |
from queue import Queue | |
from .constants import * | |
class BaseDecoder(object): | |
def __init__(self) -> None: | |
pass | |
def put(self, startsample:int, endsample:int, output_id:int, data:Any) -> None: | |
print("Put", startsample, endsample, data) | |
def register(self, output_type:int, proto_id:str = None, meta:Tuple[int, str, str] = None) -> int: | |
pass | |
def has_channel(self, index: int) -> bool: | |
return self._channel_map.get(index) is not None | |
def start(self) -> None: | |
raise NotImplementedError | |
def reset(self) -> None: | |
raise NotImplementedError | |
def metadata(self, key:int, value:Any) -> None: | |
pass | |
class RawDecoder(BaseDecoder): | |
def __init__(self) -> None: | |
pass | |
def _initialize(self, options: dict, channel_map: dict, pin_initializer: Optional[int]) -> None: | |
self._chunk = None | |
self._sample_offset = 0 | |
self._sample_base = 0 | |
self._queue = Queue() | |
self._channel_map = channel_map | |
self._initial_pins = pin_initializer | |
self._prev_pins = pin_initializer | |
self._num_channels = 0 | |
if hasattr(self, "channels"): | |
self._num_channels += len(self.channels) | |
if hasattr(self, "optional_channels"): | |
self._num_channels += len(self.optional_channels) | |
self.options = options | |
self.matched = None | |
self.samplenum = 0 | |
def _reset(self) -> None: | |
# TODO: handle the input queue | |
self._chunk = None | |
self._sample_offset = 0 | |
self._sample_base = 0 | |
self._prev_pins = self._initial_pins | |
self.matched = None | |
self.samplenum = 0 | |
def _lookup_pin(self, channel: int) -> int: | |
return self._channel_map.get(channel) | |
def _lookup_channel(self, channel:int) -> dict: | |
if hasattr(self, "channels") and channel < len(self.channels): | |
return self.channels[channel] | |
else: | |
return self.optional_channels[channel] | |
def wait(self, conditions:Sequence[dict] = None) -> Tuple[int, ...]: | |
if not conditions: | |
conditions = [{"skip": 1}] | |
# TODO: handle case where we cross chunk boundaries | |
matched = [False for cond in conditions] | |
initial_samplenum = self.samplenum | |
while not any(matched): | |
if self._chunk is None: | |
# Fetch the next chunk | |
self._sample_base = self.samplenum | |
self._sample_offset = 0 | |
next_chunk = self._queue.get() | |
if next_chunk is None: | |
# Received end-of-data signal; terminate this thread | |
self._release_chunk() | |
raise EOFError | |
self._chunk = next_chunk | |
if self.samplenum == 0 and self._initial_pins is None: | |
self._prev_pins = self._chunk[0] | |
# Evaluate conditions | |
current_pins = self._chunk[self._sample_offset] | |
for i,product in enumerate(conditions): | |
matched[i] = False | |
for channel, pin_condition in product.items(): | |
if channel == 'skip': | |
skip_count = pin_condition | |
if self.samplenum + 1 < initial_samplenum + skip_count: | |
# Have not waited for enough samples | |
break | |
elif 0 <= channel < self._num_channels: | |
pin_index = self._lookup_pin(channel) | |
if pin_index is None: | |
raise ValueError("optional channel '{0.id}' is not mapped".format(self._lookup_channel(channel))) | |
bitmask = (1 << pin_index) | |
if pin_condition == "l": | |
if (current_pins & bitmask) != 0: | |
break | |
elif pin_condition == "h": | |
if (current_pins & bitmask) != bitmask: | |
break | |
elif pin_condition == "r": | |
if (self._prev_pins & bitmask) != 0 or (current_pins & bitmask) != bitmask: | |
break | |
elif pin_condition == "f": | |
if (self._prev_pins & bitmask) != bitmask or (current_pins & bitmask) != 0: | |
break | |
elif pin_condition == "e": | |
if ((self._prev_pins ^ current_pins) & bitmask) == 0: | |
break | |
elif pin_condition == "s": | |
if ((self._prev_pins ^ current_pins) & bitmask) == bitmask: | |
break | |
else: | |
raise ValueError("Invalid pin state condition '{}'".format(pin_condition)) | |
else: | |
raise ValueError("Invalid channel index {:d}".format(channel)) | |
else: | |
matched[i] = True | |
# Advance to the next sample | |
self._prev_pins = current_pins | |
self._sample_offset += 1 | |
self.samplenum += 1 | |
if self._sample_offset >= len(self._chunk): | |
# Done with this chunk | |
self._chunk = None | |
self._release_chunk() | |
# Matched at least one condition | |
self.matched = tuple(matched) | |
channel_values = [] | |
for i in range(self._num_channels): | |
pin_index = self._lookup_pin(i) | |
if pin_index is not None: | |
value = 1 if (current_pins & (1 << pin_index)) else 0 | |
else: | |
value = None | |
channel_values.append(value) | |
return tuple(channel_values) | |
def decode(self) -> None: | |
raise NotImplementedError | |
def _feed_chunk(self, data: array) -> None: | |
self._queue.put(data) | |
def _release_chunk(self) -> None: | |
self._queue.task_done() | |
def _signal_end(self) -> None: | |
self._queue.put(None) | |
Decoder = RawDecoder | |
class StackedDecoder(BaseDecoder): | |
def decode(self, startsample:int, endsample:int, data:Any) -> None: | |
raise NotImplementedError | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import array | |
import configparser | |
import importlib | |
import os | |
import sys | |
import threading | |
import zipfile | |
from sigrokdecode.constants import * | |
def parse_samplerate(s): | |
if s.endswith("MHz"): | |
return int(s[:-3]) * 1000000 | |
elif s.endwith("kHz"): | |
return int(s[:-3]) * 1000 | |
else: | |
return int(s) | |
def load_sr(f): | |
z = zipfile.ZipFile(f) | |
sr = {} | |
with z.open("version", "r") as version_file: | |
sr["version"] = int(version_file.read().decode("ascii")) | |
with z.open("metadata", "r") as meta_file: | |
config = configparser.ConfigParser() | |
config.read_string(meta_file.read().decode("ascii"), source=meta_file.name) | |
sr["metadata"] = config | |
sr["devices"] = [] | |
for section in sr["metadata"].sections(): | |
if section.startswith("device"): | |
device = {} | |
device["driver"] = config[section].get("driver") | |
device["base_filename"] = config[section]["capturefile"] | |
device["samplerate_hz"] = parse_samplerate(config[section]["samplerate"]) | |
device["channel_count"] = int(config[section]["total probes"]) | |
device["unit_size"] = int(config[section]["unitsize"]) | |
device["channel_names"] = {} | |
for i in range(1, device["channel_count"]+1): | |
digital_probe_opt_name = "probe{:d}".format(i) | |
if config.has_option(section, digital_probe_opt_name): | |
device["channel_names"][i] = config[section][digital_probe_opt_name] | |
analog_probe_opt_name = "analog{:d}".format(i) | |
if config.has_option(section, analog_probe_opt_name): | |
device["channel_names"][i] = config[section][analog_probe_opt_name] | |
sr["devices"].append(device) | |
typecodes = { 1: "B", 2: "H", 4: "L", 8: "Q" } | |
for device in sr["devices"]: | |
data_filenames = sorted(fname for fname in z.namelist() if fname.startswith(device["base_filename"])) | |
chunks = [] | |
for filename in data_filenames: | |
with z.open(filename, "r") as data_file: | |
chunks.append(data_file.read()) | |
device["data"] = array.array(typecodes[device["unit_size"]], b"".join(chunks)) | |
return sr | |
def load_decoder_class(name, srd_dir): | |
path = os.path.join(srd_dir, name, "pd.py") | |
spec = importlib.util.spec_from_file_location("pd", path) | |
module = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(module) | |
return module.Decoder | |
def run_decoder(decoder, metadata): | |
for key, value in metadata.items(): | |
decoder.metadata(key, value) | |
print("starting decoder") | |
decoder.start() | |
try: | |
decoder.decode() | |
except EOFError: | |
# TODO: cleanup decoder | |
pass | |
print("exiting decoder") | |
def spawn_decoder_thread(decoder_class, metadata, options, channel_map, pin_initializer): | |
decoder = decoder_class() | |
decoder._initialize(options, channel_map, pin_initializer) | |
# TODO: add decoder instance to list of decoders pending input | |
thread = threading.Thread(target=run_decoder, args=(decoder, metadata)) | |
thread.start() | |
return thread, decoder | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("input", type=argparse.FileType("rb")) | |
parser.add_argument("decoder") | |
args = parser.parse_args() | |
srd_dir_var = os.environ.get("SIGROKDECODE_DIR", "~/.local/share/libsigrokdecode/decoders") | |
srd_dir = os.path.expanduser(os.path.expandvars(srd_dir_var)) | |
sys.path.append(srd_dir) | |
sr = load_sr(args.input) | |
decoder_class = load_decoder_class(args.decoder, srd_dir) | |
options = {} | |
for option in decoder_class.options: | |
options[option["id"]] = option["default"] | |
metadata = { | |
SRD_CONF_SAMPLERATE: sr["devices"][0]["samplerate_hz"] | |
} | |
pin_initializer = None | |
channel_map = { 0: 0 } | |
thread, decoder = spawn_decoder_thread(decoder_class, metadata, options, channel_map, pin_initializer) | |
print("Feeding data") | |
for device in sr["devices"]: | |
# Feed in artificial 1k chunks | |
i = 0 | |
while (i * 1024) < len(device["data"]): | |
chunk = device["data"][i*1024:(i+1)*1024] | |
decoder._feed_chunk(chunk) | |
i += 1 | |
# Signal the end of input | |
decoder._signal_end() | |
print("Signalled end") | |
# Wait for the thread to finish processing input | |
thread.join() | |
print("Done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment