Last active
November 14, 2019 07:41
-
-
Save miraculixx/114abd17cf7438dce68d to your computer and use it in GitHub Desktop.
A simple stack VM and assembler/compiler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The MIT License (MIT) | |
Copyright (c) 2015 miraculixx | |
Permission is hereby granted, free of charge, to any person obtaining a copy of | |
this software and associated documentation files (the "Software"), to deal in | |
the Software without restriction, including without limitation the rights to | |
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of | |
the Software, and to permit persons to whom the Software is furnished to do so, | |
subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS | |
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR | |
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER | |
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN | |
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a quick experiment in implementing a stack machine that | |
executes a set of given op codes. Just for fun. | |
""" | |
class VMError(Exception): | |
def __init__(self, kind): | |
self.kind = kind | |
self.message = '' | |
def __str__(self): | |
print "MICROVM", self.kind, self.message | |
def __call__(self, message): | |
self.message = message | |
return self | |
StackError = VMError('StackError') | |
class Stack(list): | |
def __init__(self, parent=None): | |
self.parent = parent | |
def pop(self): | |
try: | |
return super(Stack, self).pop() | |
except: | |
raise StackError('stack is empty') | |
class MicroOp(object): | |
""" MicroOp is the implementation of an OpCode """ | |
def __init__(self, bytecode, mnemonic): | |
self.bytecode = bytecode | |
self.mnemonic = mnemonic | |
def execute(self, *args, **kwargs): | |
pass | |
class ADD(MicroOp): | |
def execute(self, *args, **kwargs): | |
stack = kwargs.get('stack') | |
opnum = args[0] | |
opargs = [stack.pop() for i in range(opnum)] | |
result = sum(opargs) | |
stack.append(result) | |
return result | |
class PUSH(MicroOp): | |
def execute(self, *args, **kwargs): | |
stack = kwargs.get('stack') | |
stack.extend(args) | |
class PRINT(MicroOp): | |
def execute(self, *args, **kwargs): | |
stack = kwargs.get('stack') | |
opnum = args[0] if args else 1 | |
opargs = [str(stack.pop()).replace('"', '').replace("'", '') for i in range(opnum)] | |
print ' '.join(opargs) | |
OPCODES = { | |
0x01 : PUSH(0x01, 'PUSH'), | |
0x10 : ADD(0x10, 'ADD'), | |
0x20 : PRINT(0x20, 'PRINT'), | |
} | |
MNEMONICS = { microop.mnemonic : opcode for opcode, microop in OPCODES.iteritems() } | |
class VM(object): | |
""" A VM provides the execution context to run a byte code program """ | |
def __init__(self): | |
self.stack = Stack() | |
def run(self, code): | |
for op, args in code: | |
OPCODES[op].execute(*args, stack=self.stack) | |
class Compiler(object): | |
""" The compiler takes source code and produces byte code """ | |
def _native_value(self, value): | |
tries = [int, float, str] | |
for t in tries: | |
try: | |
return t(value) | |
except: | |
pass | |
return value | |
def compile(self, source): | |
code = [] | |
for loc, line in enumerate(source.split('\n')): | |
if not line: | |
continue | |
parsed = line.split(' ', 1) | |
mnemonic, args = parsed[0], parsed[1:] | |
args = [self._native_value(arg) for arg in args] | |
opcode = MNEMONICS.get(mnemonic) | |
if opcode: | |
code.append((opcode, args)) | |
else: | |
raise SyntaxError("%d: invalid statement >%s<" % (loc, line)) | |
return code | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Enhanced version of microvm. | |
This now supports | |
* JUMP and JUMP_TRUE supporting absolute and relative jumps | |
* labeled sections as JUMP targets | |
* associative RAM to store variables | |
* STORE,LOAD to transfer to/from RAM/stack | |
* better exceptions including pointing to the current code location | |
* smarter micro ops | |
* comments in code (# as first character in line) | |
""" | |
class VMError(Exception): | |
def __init__(self, kind, context=None): | |
self.kind = kind | |
self.message = '' | |
def __str__(self): | |
s = ["MICROVM {kind} {message}".format(**self.__dict__)] | |
if self.vm: | |
s.append("==> {loc}: {code}".format(**dict(loc=self.vm.ic, code=self.vm.get_current_code()))) | |
return '\n'.join(s) | |
def __call__(self, message=None, vm=None): | |
self.message = message or self.message or '' | |
self.vm = vm | |
return self | |
class MicroOpResult(object): | |
""" | |
the result of a micro op | |
MicroOps pop and push | |
:param result: the result of the operator, if any | |
:param target: the target of the operation (code memory, if any) | |
:param relative: is the target a relative or absolute address, defaults to False | |
""" | |
def __init__(self, result=None, target=None, relative=False): | |
self._update(result=result, target=target, relative=relative) | |
def __call__(self, result=None, target=None, relative=False): | |
self._update(result=result, target=target, relative=relative) | |
return self | |
def _update(self, result=None, target=None, relative=False): | |
self.target = target | |
self.result = result | |
self.relative = relative | |
StackError = VMError('StackError') | |
class Stack(list): | |
def __init__(self, parent=None): | |
self.parent = parent | |
def pop(self): | |
try: | |
return super(Stack, self).pop() | |
except: | |
raise StackError('stack is empty') | |
def is_empty(self): | |
return len(self) == 0 | |
class ARAM(dict): | |
""" Associative Random Access Memory """ | |
pass | |
class MicroOp(object): | |
""" MicroOp is the implementation of an OpCode """ | |
def __init__(self, bytecode, mnemonic): | |
self.bytecode = bytecode | |
self.mnemonic = mnemonic | |
self.autopop = True | |
def _parseargs(self, *args, **kwargs): | |
self._args = args | |
self._kwargs = kwargs | |
self.vm = kwargs.get('vm') | |
return self | |
@property | |
def heap(self): | |
return self.vm.heap | |
@property | |
def stack(self): | |
return self.vm.stack | |
def _prepare(self, *args, **kwargs): | |
self._parseargs(*args, **kwargs) | |
stack = self.vm.stack | |
heap = self.vm.heap | |
opnum = self.opnum = args[0] if args else 0 | |
if self.autopop: | |
opargs = self.opargs = [stack.pop() for i in xrange(opnum)] | |
else: | |
opargs = self.opargs = args | |
self.result = MicroOpResult() | |
return stack, opnum, opargs | |
def execute(self, *args, **kwargs): | |
self.stack, self.opnum, self.opargs = self._prepare(*args, **kwargs) | |
pass | |
class ADD(MicroOp): | |
def execute(self, *args, **kwargs): | |
result = sum(self.opargs) | |
self.stack.append(result) | |
return self.result(result=result) | |
class JUMP(MicroOp): | |
def __init__(self, *args, **kwargs): | |
super(JUMP, self).__init__(*args, **kwargs) | |
self.autopop = False | |
def get_target(self): | |
target = self.opargs[0] | |
if isinstance(target, basestring): | |
target = self.vm.labels[target] | |
return target | |
def execute(self, *args, **kwargs): | |
target = self.get_target() | |
relative = self.opargs[1] == "r" if len(self.opargs) > 1 else False | |
return self.result(target=target, relative=relative) | |
class JUMP_IF_TRUE(JUMP): | |
def execute(self, *args, **kwargs): | |
relative = self.opargs[1] == "r" if len(self.opargs) > 1 else False | |
target = self.get_target() | |
value = self.stack.pop() | |
if value: | |
return self.result(result=True, target=target, relative=relative) | |
else: | |
return self.result(result=False) | |
class CMP(MicroOp): | |
def __init__(self, *args, **kwargs): | |
super(CMP, self).__init__(*args, **kwargs) | |
self.autopop = False | |
def execute(self): | |
left = self.opargs[0] | |
right = self.opargs[1] if len(self.opargs) > 1 else 1 | |
if isinstance(left, basestring): | |
left = self.heap[left] | |
else: | |
left = self.stack.pop() | |
if isinstance(right, basestring): | |
right = self.heap[right] | |
else: | |
right = self.stack.pop() | |
result = left == right | |
self.stack.append(result) | |
return self.result(result=result) | |
class PUSH(MicroOp): | |
def __init__(self, *args, **kwargs): | |
super(PUSH, self).__init__(*args, **kwargs) | |
self.autopop = False | |
def execute(self): | |
self.stack.extend(self.opargs) | |
return self.result | |
class STORE(MicroOp): | |
def __init__(self, *args, **kwargs): | |
super(STORE, self).__init__(*args, **kwargs) | |
self.autopop = False | |
def execute(self): | |
name = self.opargs[0] | |
value = self.opargs[1] if len(self._args) > 1 else self.stack.pop() | |
self.heap[name] = value | |
return self.result | |
class LOAD(MicroOp): | |
def __init__(self, *args, **kwargs): | |
super(LOAD, self).__init__(*args, **kwargs) | |
self.autopop = False | |
def execute(self): | |
name = self.opargs[0] | |
value = self.heap[name] | |
self.stack.append(value) | |
return self.result(result=value) | |
class PRINT(MicroOp): | |
def _prepare(self, *args, **kwargs): | |
self._parseargs(*args, **kwargs) | |
stack = self.stack | |
opnum = self.opnum = self._args[0] if self._args else 1 | |
self.opargs = [str(stack.pop()).replace('"', '').replace("'", '') for i in xrange(opnum)] | |
self.result = MicroOpResult() | |
return self.stack, self.opnum, self.opargs | |
def execute(self): | |
print ' '.join(self.opargs).strip() | |
return self.result | |
# VM opcodes | |
OPCODES = { | |
0x01 : PUSH(0x01, 'PUSH'), | |
0x02 : STORE(0x02, 'STORE'), | |
0x03 : LOAD(0x03, 'LOAD'), | |
0x10 : JUMP(0x10, "JUMP"), | |
0x11 : JUMP_IF_TRUE(0x11, "JUMP_TRUE"), | |
0x12 : CMP(0x12, "CMP"), | |
0x20 : ADD(0x20, 'ADD'), | |
0x30 : PRINT(0x30, 'PRINT'), | |
} | |
# compiler mnemonics to opcode mapping | |
MNEMONICS = { microop.mnemonic : opcode for opcode, microop in OPCODES.iteritems() } | |
class VM(object): | |
""" A VM provides the execution context to run a byte code program """ | |
def __init__(self): | |
self.stack = Stack() | |
self.heap = ARAM() | |
self.ic = 0 | |
self.code = None | |
self.labels = None | |
def run(self, program): | |
code = self.code = program['bytecode'] | |
labels = self.labels = program['labels'] | |
while True: | |
ic = self.ic | |
if ic < len(code): | |
op, args = code[ic] | |
try: | |
OPCODES[op]._prepare(*args, vm=self) | |
result = OPCODES[op].execute() | |
except VMError as e: | |
raise e(vm=self) | |
if result.target is not None: | |
if result.relative: | |
self.ic = ic + result.target | |
else: | |
self.ic = result.target | |
else: | |
self.ic += 1 | |
else: | |
break | |
def get_current_code(self): | |
mnemonic, args = self.code[self.ic] | |
return "%s %s" % (OPCODES[mnemonic].mnemonic, args) | |
class Compiler(object): | |
""" The compiler takes source code and produces byte code """ | |
def _native_value(self, value): | |
tries = [int, float, str] | |
for t in tries: | |
try: | |
return t(value) | |
except: | |
pass | |
return value | |
def tokenize(self, source): | |
tokenized = {} | |
code = tokenized['code'] = [] | |
labels = tokenized['labels'] = {} | |
label = None | |
for loc, line in enumerate(source.split('\n')): | |
if not line or line.startswith('#'): | |
continue | |
if line.endswith(':'): | |
label = line.split(':')[0] | |
continue | |
parsed = line.strip().split(' ', 1) | |
mnemonic, args = parsed[0], parsed[1].split(',') | |
code.append((mnemonic, args)) | |
if label: | |
labels[label] = len(code) - 1 | |
label = None | |
return tokenized | |
def compile(self, source): | |
tokenized = self.tokenized = self.tokenize(source) | |
tokens = tokenized['code'] | |
program = {} | |
bytecode = program['bytecode'] = [] | |
labels = program['labels'] = tokenized['labels'] | |
for mnemonic, args in tokens: | |
args = [self._native_value(arg) for arg in args] | |
opcode = MNEMONICS.get(mnemonic) | |
if opcode: | |
bytecode.append((opcode, args)) | |
else: | |
raise SyntaxError("%d: invalid statement >%s<" % (loc, line)) | |
return program |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source = """ | |
PUSH 10 | |
PUSH 20 | |
PUSH 20 | |
ADD 3 | |
PUSH "result is" | |
PRINT 2 | |
""" | |
code = Compiler().compile(source) | |
vm = VM() | |
vm.run(code) | |
print "stack at the end", vm.stack |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# the following assembler code is the equivalent of the following python program | |
python_source = """ | |
foo = 0 | |
sum = 0 | |
while foo < 2: | |
sum += 10 + 20 + 20 | |
print "result is", sum | |
foo += 1 | |
if foo < 2: | |
print "repeat" | |
else: | |
print "stop" | |
print "finished" | |
""" | |
# assembler. this is hand-written for maximum performance :-) | |
source = """ | |
STORE foo,0 | |
STORE sum,0 | |
loop: | |
# cumulative sum | |
LOAD sum | |
PUSH 10 | |
PUSH 20 | |
PUSH 20 | |
ADD 4 | |
STORE sum | |
LOAD sum | |
# print result | |
PUSH "result is" | |
PRINT 2 | |
# increment counter | |
LOAD foo | |
PUSH 1 | |
ADD 2 | |
STORE foo | |
# compare counter to condition | |
PUSH 2 | |
CMP foo | |
# repeat or stop | |
JUMP_TRUE stop_it | |
PUSH "repeat" | |
PRINT 1 | |
JUMP loop | |
stop_it: | |
PUSH "stop" | |
PRINT 1 | |
JUMP end | |
end: | |
PUSH "finished" | |
PRINT 1 | |
""" | |
compiler = Compiler() | |
code = compiler.compile(source) | |
print "tokens" | |
pprint(compiler.tokenized) | |
print "bytecode" | |
pprint(code) | |
vm = VM() | |
vm.run(code) |
the sample assembler program in sample2.py will print
result is 50
repeat
result is 100
stop
finished
bytecode for sample2
{'bytecode': [(2, ['foo', 0]),
(2, ['sum', 0]),
(3, ['sum']),
(1, [10]),
(1, [20]),
(1, [20]),
(32, [4]),
(2, ['sum']),
(3, ['sum']),
(1, ['"result is"']),
(48, [2]),
(3, ['foo']),
(1, [1]),
(32, [2]),
(2, ['foo']),
(1, [2]),
(18, ['foo']),
(17, ['stop_it']),
(1, ['"repeat"']),
(48, [1]),
(16, ['loop']),
(1, ['"stop"']),
(48, [1]),
(16, ['end']),
(1, ['"finished"']),
(48, [1])],
'labels': {'end': 24, 'loop': 2, 'stop_it': 21}}
reproduce in sample2.py as
from pprint import pprint
compiler = Compiler()
code = compiler.compile(source)
print "bytecode"
pprint(code)
this is the tokenized source code representation. tokenized means that all comments have been removed and labelled sections are indexed to the first code object they represent. currently the tokenizer has very limited support for parsing arguments, it is intentionally kept very simple. note that the compiler transforms this tokenized program into it's byte code equivalent by looking up the respective MicroOp, then outputting the MicroOp's bytecode.
tokens
{'code': [('STORE', ['foo', '0']),
('STORE', ['sum', '0']),
('LOAD', ['sum']),
('PUSH', ['10']),
('PUSH', ['20']),
('PUSH', ['20']),
('ADD', ['4']),
('STORE', ['sum']),
('LOAD', ['sum']),
('PUSH', ['"result is"']),
('PRINT', ['2']),
('LOAD', ['foo']),
('PUSH', ['1']),
('ADD', ['2']),
('STORE', ['foo']),
('PUSH', ['2']),
('CMP', ['foo']),
('JUMP_TRUE', ['stop_it']),
('PUSH', ['"repeat"']),
('PRINT', ['1']),
('JUMP', ['loop']),
('PUSH', ['"stop"']),
('PRINT', ['1']),
('JUMP', ['end']),
('PUSH', ['"finished"']),
('PRINT', ['1'])],
'labels': {'end': 24, 'loop': 2, 'stop_it': 21}}
reproduce in sample2.py as follows
from pprint import pprint
compiler = Compiler()
code = compiler.compile(source)
print "tokens"
pprint(compiler.tokenized)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
the sample assembler program in sample.py will print