Some syntactic sugar for asn1
I'll push this onto a fork eventually but for now it's just a self-contained file. To use it just import the stuff as needed
License is MIT
asn1
I'll push this onto a fork eventually but for now it's just a self-contained file. To use it just import the stuff as needed
License is MIT
import copy | |
from enum import Enum | |
from numbers import Number | |
from typing import Any, Optional, cast | |
from asn1 import Classes | |
from asn1 import Decoder as _Decoder | |
from asn1 import Error, Numbers | |
from asn1 import Tag as _Tag | |
from asn1 import Types | |
class Tag(_Tag): | |
def __new__(cls, tag_number: int, tag_type: int, tag_class: int): | |
return super().__new__( | |
cls, | |
Numbers(tag_number) if tag_class == Classes.Universal and tag_number in list(Numbers) else tag_number, | |
Types(tag_type) if tag_type in list(Types) else tag_type, | |
Classes(tag_class) if tag_class in list(Classes) else tag_class, | |
) | |
@property | |
def primitive(self): | |
return self.typ == Types.Primitive | |
@property | |
def constructed(self): | |
return self.typ == Types.Constructed | |
@property | |
def universal(self): | |
return self.cls == Classes.Universal | |
@property | |
def application(self): | |
return self.cls == Classes.Application | |
@property | |
def context(self): | |
return self.cls == Classes.Context | |
@property | |
def private(self): | |
return self.cls == Classes.Private | |
class UniversalTags(Tag, Enum): | |
# pylint: disable=invalid-name | |
Boolean = Tag(Numbers.Boolean, Types.Primitive, Classes.Universal) | |
Integer = Tag(Numbers.Integer, Types.Primitive, Classes.Universal) | |
BitString = Tag(Numbers.BitString, Types.Primitive, Classes.Universal) | |
OctetString = Tag(Numbers.OctetString, Types.Primitive, Classes.Universal) | |
Null = Tag(Numbers.Null, Types.Primitive, Classes.Universal) | |
ObjectIdentifier = Tag(Numbers.ObjectIdentifier, Types.Primitive, Classes.Universal) | |
Enumerated = Tag(Numbers.Enumerated, Types.Primitive, Classes.Universal) | |
UTF8String = Tag(Numbers.UTF8String, Types.Primitive, Classes.Universal) | |
Sequence = Tag(Numbers.Sequence, Types.Constructed, Classes.Universal) | |
Set = Tag(Numbers.Set, Types.Constructed, Classes.Universal) | |
PrintableString = Tag(Numbers.PrintableString, Types.Primitive, Classes.Universal) | |
IA5String = Tag(Numbers.IA5String, Types.Primitive, Classes.Universal) | |
UTCTime = Tag(Numbers.UTCTime, Types.Primitive, Classes.Universal) | |
GeneralizedTime = Tag(Numbers.GeneralizedTime, Types.Primitive, Classes.Universal) | |
UnicodeString = Tag(Numbers.UnicodeString, Types.Primitive, Classes.Universal) | |
def context_specific(tag_number: int): | |
return Tag(tag_number, Types.Constructed, Classes.Context) | |
class Decoder(_Decoder): | |
def __init__(self, start: Optional[bytes] = None) -> None: | |
super().__init__() | |
if start: | |
self.start(start) | |
def skip(self): | |
self.read() | |
def _read_tag(self): | |
return Tag(*super()._read_tag()) | |
def peek(self) -> Tag: | |
# Pylint is utterly stupid | |
result = super().peek() | |
return cast(Tag, result) or None # type: ignore | |
def read(self, tagnr: Optional[Number] = None) -> tuple[Tag, Any]: # pylint: disable=useless-parent-delegation | |
return super().read(tagnr) # type: ignore | |
def __enter__(self): | |
self.enter() | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.leave() | |
return False | |
def __iter__(self): | |
self.enter() | |
# https://www.peterbe.com/plog/generatorexit | |
try: | |
while self.peek(): | |
tag = self.peek() | |
assert self.m_stack | |
current_stack_pos = len(self.m_stack) - 1 | |
current_layer = self.m_stack[current_stack_pos].copy() | |
yield tag | |
if len(self.m_stack) - 1 < current_stack_pos: | |
# What? | |
raise RuntimeError("Left current level while iterating") | |
elif len(self.m_stack) - 1 > current_stack_pos: | |
# We entered a new level | |
if False: | |
# Strict mode | |
raise RuntimeError("Entered new level without leaving it while iterating") | |
else: | |
# Ignore and pop to the original level | |
while len(self.m_stack) - 1 > current_stack_pos: | |
self.leave() | |
self.read() | |
elif self.m_stack[current_stack_pos] == current_layer: | |
# Function did not do anything, skip over this tag | |
self.read() | |
else: | |
# Function has read a tag (or multiple) and we have already advanced past it | |
if False: | |
# Strict mode: go back to the original position, and read() so that we will always be at | |
# current + 1 | |
self.m_stack[current_stack_pos] = current_layer | |
self.read() | |
else: | |
# Ignore and do nothing | |
pass | |
except GeneratorExit: | |
pass | |
self.leave() | |
def __len__(self): | |
if self._end_of_input(): | |
return 0 | |
if not self.peek() or not self.peek().constructed: | |
if self.peek().typ != Types.Constructed: | |
raise Error("Cannot enter a non-constructed tag.") | |
original_stack = copy.deepcopy(self.m_stack) | |
original_tag = self.m_tag | |
count = 0 | |
for _ in self: | |
count += 1 | |
self.m_stack = original_stack | |
self.m_tag = original_tag | |
return count |