Created
October 18, 2015 07:04
-
-
Save gulan/a0993369935c3048baf9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| """ | |
| Exercises with abstract state machines. | |
| My remote goal is to learn how to specify systems using TLA+. I begin | |
| by writing some simple programs is the style of those kinds of | |
| specifications. | |
| Any basic operator maps a before-state to an after-state. The operator | |
| may be specified with a predicate relating the before and after | |
| states. A running processes generate a sequence of states: | |
| s0,s1,s2,... Specifications can be written as predicates on the set of | |
| all such sequences. | |
| The machines defined here write out such histories as traces. I have | |
| not yet written any specification predicates. | |
| """ | |
| def state(names,sid=''): | |
| # A state is a set of name-value pairs. Think of a state as a C | |
| # struct: the values may be mutable, but the member names are fixed. | |
| names = names.split() | |
| class _state(dict): | |
| '''Like a dict, except I can write d.x instead of d['x']. No | |
| new keys may be added after the state is created. Keys | |
| beginning with _ are not shown as part of the state, usually | |
| because the values are constant.''' | |
| def __init__(self): | |
| dict.__init__(self) | |
| # Because the keys are fixed from the beginning, the | |
| # machine's actions cannot create them. | |
| for n in names: | |
| self[n] = None | |
| def __getattr__(self,name): | |
| try: | |
| return self.__getitem__(name) | |
| except: | |
| raise AttributeError(name) | |
| def __setattr__(self,name,value): | |
| if name in names: | |
| self[name] = value | |
| else: | |
| raise AttributeError, 'Cannot make a new state entry %r' % name | |
| def __repr__(self): | |
| a = '[%s|' % sid | |
| b = ['%s -> %s' % (k,self[k]) for k in names if not k.startswith('_')] | |
| c = '|]' | |
| return a + ', '.join(b) + c | |
| return _state() | |
| def run(st,trace=False): | |
| # 'CPU' cycle | |
| proc_name = st.pc | |
| while proc_name != 'EXIT': | |
| if trace: | |
| print st | |
| (st[proc_name])(st) | |
| proc_name = st.pc | |
| def demo1(): | |
| s = state('x y') | |
| s.x = 13 | |
| s.y = 47.1 | |
| print s | |
| def demo2(): | |
| s = state('x y') | |
| assert s.y == None | |
| s.x = 13 | |
| print s | |
| def demo3(): | |
| s = state('x y') | |
| s.x = 13 | |
| s.y = 47.1 | |
| try: | |
| s.z = 'BAD' | |
| except AttributeError: | |
| pass | |
| print s | |
| def demo4(): | |
| theta = unichr(0x03f4).encode('utf-8') | |
| s = state('x y',sid=theta) | |
| s.x = 13 | |
| s.y = 47.1 | |
| print s | |
| def program_1(): | |
| """ Increment x 3 times, starting from 0.""" | |
| # Define instruction set | |
| def zero(nxt): | |
| def _zero(st): | |
| st.x = 0 | |
| st.pc = nxt | |
| return _zero | |
| def inc(nxt): | |
| def _inc(st): | |
| st.x = st.x + 1 | |
| st.pc = nxt | |
| return _inc | |
| def exit(st): | |
| st.pc = 'EXIT' | |
| # Define state | |
| s = state('pc x _zero _inc_a _inc_b _inc_c _exit') | |
| # Assemble program | |
| s._zero = zero('_inc_a') | |
| s._inc_a = inc('_inc_b') | |
| s._inc_b = inc('_inc_c') | |
| s._inc_c = inc('_exit') | |
| s._exit = exit | |
| # Execute | |
| s.pc = '_zero' # entry point | |
| run(s,True) | |
| print "x == %s" % s.x | |
| if 0: | |
| demo1() | |
| demo2() | |
| demo3() | |
| demo4() | |
| program_1() | |
| # * * * Exchange Sort * * * | |
| # Why exchange sort? No real reason. It was just the first thing that | |
| # came to mind. | |
| # I implement it in a traditional style first, and follow it with a state | |
| # machine version. | |
| def find_smallest(cs): | |
| N = len(cs) | |
| assert N > 0 | |
| m,j = 0,1 | |
| while j < N: | |
| if cs[j] < cs[m]: | |
| m = j | |
| j += 1 | |
| return m | |
| def exchange_sort(d): | |
| i,N = 0,len(d) | |
| while i < N-1: | |
| m = find_smallest(d[i:]) + i | |
| d[i],d[m] = d[m],d[i] | |
| i += 1 | |
| return d | |
| def check_find_smallest(): | |
| assert find_smallest([3,1,4]) == 1 | |
| assert find_smallest([3,4,1]) == 2 | |
| assert find_smallest([1,4,3]) == 0 | |
| assert find_smallest([1,3,4]) == 0 | |
| assert find_smallest([1,1,1]) == 0 | |
| assert find_smallest([99999]) == 0 | |
| def check_sort(before,after): | |
| assert len(before) == len(after) | |
| assert set(before) == set(after) | |
| for (x,y) in zip(after,after[1:]): | |
| assert x <= y | |
| # before = list("abc") | |
| # before = list("bac") | |
| # before = list("aaa") | |
| # before = list("complicated") | |
| if 0: | |
| check_find_smallest() | |
| after = exchange_sort(before[:]) | |
| check_sort(before,after) | |
| ### Transliterate to psuedo-assembly: | |
| # exchange_sort: | |
| # s.N = len(s.d) | |
| # s.i = 0 | |
| # es_while_guard: | |
| # goto es_while_exit, unless s.i < s.N-1 | |
| # ds = d[i:] | |
| # find_smallest: | |
| # s.M = len(s.cs) | |
| # assert s.M > 0 | |
| # s.m,s.j = 0,1 | |
| # fs_while_guard: | |
| # goto fs_while_exit, unless s.j < s.M | |
| # s.m = s.j if s.cs[s.j] < s.cs[s.m] else s.m | |
| # s.j += 1 | |
| # goto fs_while_guard | |
| # fs_while_exit: | |
| # s.m += s.i | |
| # s.d[s.i],s.d[s.m] = s.d[s.m],s.d[s.i] | |
| # s.i += 1 | |
| # goto es_while_guard | |
| # es_while_exit: | |
| # print s.d | |
| ## Convert to state machine actions: | |
| def exchange_sort(s): | |
| s.N = len(s.d) | |
| s.i = 0 | |
| s.pc = '_es_while_guard' | |
| def es_while_guard(s): # while s.i < s.N-1 | |
| if s.i < s.N-1: | |
| s.cs = s.d[s.i:] | |
| s.pc = '_find_smallest' | |
| else: | |
| s.pc = '_es_while_exit' | |
| def find_smallest(s): | |
| s.M = len(s.cs) | |
| assert s.M > 0 | |
| s.m,s.j = 0,1 | |
| s.pc = '_fs_while_guard' | |
| def fs_while_guard(s): # while s.j < s.M | |
| if s.j < s.M: | |
| s.m = s.j if s.cs[s.j] < s.cs[s.m] else s.m | |
| s.j += 1 | |
| s.pc = '_fs_while_guard' | |
| else: | |
| s.pc = '_fs_while_exit' | |
| def fs_while_exit(s): | |
| s.m += s.i | |
| s.d[s.i],s.d[s.m] = s.d[s.m],s.d[s.i] | |
| s.i += 1 | |
| s.pc = '_es_while_guard' | |
| def es_while_exit(s): | |
| print s.d | |
| s.pc = 'EXIT' | |
| if 1: | |
| varx = 'pc i j m N M cs d ' | |
| labels = '_exchange_sort _es_while_guard _find_smallest _fs_while_guard _fs_while_exit _es_while_exit' | |
| s = state(varx + labels) | |
| s._exchange_sort = exchange_sort | |
| s._es_while_guard = es_while_guard | |
| s._find_smallest = find_smallest | |
| s._fs_while_guard = fs_while_guard | |
| s._fs_while_exit = fs_while_exit | |
| s._es_while_exit = es_while_exit | |
| s.pc = '_exchange_sort' # entry point | |
| # before = list("abc") | |
| # before = list("aaa") | |
| # before = list("cbacccba") | |
| # before = list("cb") | |
| # before = list("c") | |
| # before = list("") | |
| before = list("baacb") | |
| s.d = before[:] | |
| run(s,True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment