Some syntactic sugar for asn1
I'll push this onto a fork eventually but for now it's just a self-contained file. To use it just import the stuff as needed
License is MIT
asn1I'll push this onto a fork eventually but for now it's just a self-contained file. To use it just import the stuff as needed
License is MIT
| import copy | |
| from enum import Enum | |
| from numbers import Number | |
| from typing import Any, Optional, cast | |
| from asn1 import Classes | |
| from asn1 import Decoder as _Decoder | |
| from asn1 import Error, Numbers | |
| from asn1 import Tag as _Tag | |
| from asn1 import Types | |
| class Tag(_Tag): | |
| def __new__(cls, tag_number: int, tag_type: int, tag_class: int): | |
| return super().__new__( | |
| cls, | |
| Numbers(tag_number) if tag_class == Classes.Universal and tag_number in list(Numbers) else tag_number, | |
| Types(tag_type) if tag_type in list(Types) else tag_type, | |
| Classes(tag_class) if tag_class in list(Classes) else tag_class, | |
| ) | |
| @property | |
| def primitive(self): | |
| return self.typ == Types.Primitive | |
| @property | |
| def constructed(self): | |
| return self.typ == Types.Constructed | |
| @property | |
| def universal(self): | |
| return self.cls == Classes.Universal | |
| @property | |
| def application(self): | |
| return self.cls == Classes.Application | |
| @property | |
| def context(self): | |
| return self.cls == Classes.Context | |
| @property | |
| def private(self): | |
| return self.cls == Classes.Private | |
| class UniversalTags(Tag, Enum): | |
| # pylint: disable=invalid-name | |
| Boolean = Tag(Numbers.Boolean, Types.Primitive, Classes.Universal) | |
| Integer = Tag(Numbers.Integer, Types.Primitive, Classes.Universal) | |
| BitString = Tag(Numbers.BitString, Types.Primitive, Classes.Universal) | |
| OctetString = Tag(Numbers.OctetString, Types.Primitive, Classes.Universal) | |
| Null = Tag(Numbers.Null, Types.Primitive, Classes.Universal) | |
| ObjectIdentifier = Tag(Numbers.ObjectIdentifier, Types.Primitive, Classes.Universal) | |
| Enumerated = Tag(Numbers.Enumerated, Types.Primitive, Classes.Universal) | |
| UTF8String = Tag(Numbers.UTF8String, Types.Primitive, Classes.Universal) | |
| Sequence = Tag(Numbers.Sequence, Types.Constructed, Classes.Universal) | |
| Set = Tag(Numbers.Set, Types.Constructed, Classes.Universal) | |
| PrintableString = Tag(Numbers.PrintableString, Types.Primitive, Classes.Universal) | |
| IA5String = Tag(Numbers.IA5String, Types.Primitive, Classes.Universal) | |
| UTCTime = Tag(Numbers.UTCTime, Types.Primitive, Classes.Universal) | |
| GeneralizedTime = Tag(Numbers.GeneralizedTime, Types.Primitive, Classes.Universal) | |
| UnicodeString = Tag(Numbers.UnicodeString, Types.Primitive, Classes.Universal) | |
| def context_specific(tag_number: int): | |
| return Tag(tag_number, Types.Constructed, Classes.Context) | |
| class Decoder(_Decoder): | |
| def __init__(self, start: Optional[bytes] = None) -> None: | |
| super().__init__() | |
| if start: | |
| self.start(start) | |
| def skip(self): | |
| self.read() | |
| def _read_tag(self): | |
| return Tag(*super()._read_tag()) | |
| def peek(self) -> Tag: | |
| # Pylint is utterly stupid | |
| result = super().peek() | |
| return cast(Tag, result) or None # type: ignore | |
| def read(self, tagnr: Optional[Number] = None) -> tuple[Tag, Any]: # pylint: disable=useless-parent-delegation | |
| return super().read(tagnr) # type: ignore | |
| def __enter__(self): | |
| self.enter() | |
| return self | |
| def __exit__(self, exc_type, exc_value, traceback): | |
| self.leave() | |
| return False | |
| def __iter__(self): | |
| self.enter() | |
| # https://www.peterbe.com/plog/generatorexit | |
| try: | |
| while self.peek(): | |
| tag = self.peek() | |
| assert self.m_stack | |
| current_stack_pos = len(self.m_stack) - 1 | |
| current_layer = self.m_stack[current_stack_pos].copy() | |
| yield tag | |
| if len(self.m_stack) - 1 < current_stack_pos: | |
| # What? | |
| raise RuntimeError("Left current level while iterating") | |
| elif len(self.m_stack) - 1 > current_stack_pos: | |
| # We entered a new level | |
| if False: | |
| # Strict mode | |
| raise RuntimeError("Entered new level without leaving it while iterating") | |
| else: | |
| # Ignore and pop to the original level | |
| while len(self.m_stack) - 1 > current_stack_pos: | |
| self.leave() | |
| self.read() | |
| elif self.m_stack[current_stack_pos] == current_layer: | |
| # Function did not do anything, skip over this tag | |
| self.read() | |
| else: | |
| # Function has read a tag (or multiple) and we have already advanced past it | |
| if False: | |
| # Strict mode: go back to the original position, and read() so that we will always be at | |
| # current + 1 | |
| self.m_stack[current_stack_pos] = current_layer | |
| self.read() | |
| else: | |
| # Ignore and do nothing | |
| pass | |
| except GeneratorExit: | |
| pass | |
| self.leave() | |
| def __len__(self): | |
| if self._end_of_input(): | |
| return 0 | |
| if not self.peek() or not self.peek().constructed: | |
| if self.peek().typ != Types.Constructed: | |
| raise Error("Cannot enter a non-constructed tag.") | |
| original_stack = copy.deepcopy(self.m_stack) | |
| original_tag = self.m_tag | |
| count = 0 | |
| for _ in self: | |
| count += 1 | |
| self.m_stack = original_stack | |
| self.m_tag = original_tag | |
| return count |