Last active
March 13, 2019 23:54
-
-
Save ekimekim/940688f2d4c89a0c5c4cd06bb3509a90 to your computer and use it in GitHub Desktop.
A very hacky way of creating a python object which lazily looks up keys of a jsonnet object as needed, without re-evaluating the entire jsonnet each time.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import json | |
import threading | |
from Queue import Queue | |
import _jsonnet as jsonnet | |
class LazyExecutor(threading.Thread): | |
"""Runs a jsonnet snippet that produces an object. | |
You can subsequently request keys using get_key(). | |
This will only evaluate and return the requested key, so you can effectively "lazily evaluate" | |
your jsonnet as you need each key. The jsonnet execution context is maintained throughout. | |
When you are done, you should stop() the thread to release resources. | |
""" | |
daemon = True | |
# Sentinel value that goes in queue to indicate a graceful shutdown | |
STOP = object() | |
def __init__(self, code): | |
super(LazyExecutor, self).__init__() | |
self.code = code | |
self.lock = threading.Lock() # governs access to queue, pending and values | |
# Ordered queue of keys to be requested. Requesters also check pending to ensure | |
# there are no repeats. | |
self.queue = Queue() | |
# Map from keys to get, to the Event object that threads are waiting on for that key | |
self.pending = {} | |
# We shouldn't repeat return values from callback, so we cache the value for each | |
# key we've already retrieved | |
self.values = {} | |
# Keeps track of what key we returned for each input, so that we don't break our | |
# pure-function guarentee if we're called twice. | |
self.return_values = {} | |
# Indicates if we initiated shut down cleanly | |
self.finished = False | |
# Indicates if we've stopped and can no longer service requests | |
self.stopped = False | |
# If we did not shut down cleanly, contains the error message | |
self.error = None | |
def run(self): | |
try: | |
jsonnet.evaluate_snippet( | |
filename="<inline>", | |
src=""" | |
function(code) | |
local f(key) = f( | |
std.native("callback")( | |
key, | |
if key == null then null | |
else if key == true then std.manifestJson(std.objectFields(code)) | |
else std.manifestJson(code[key]), | |
) | |
) tailstrict; | |
f(null) | |
""", | |
tla_codes={"code": self.code}, | |
native_callbacks={"callback": (("key", "value"), self._callback)}, | |
) | |
except Exception as e: | |
with self.lock: | |
# If this was a graceful stop, no further action needed | |
if self.finished: | |
return | |
# Otherwise, trigger all remaining waiters and record the error | |
self.stopped = True | |
self.error = e | |
for event in self.pending.values(): | |
event.set() | |
def _callback(self, key, value): | |
# If this is a repeat call, return the same result and do nothing else | |
if key in self.return_values: | |
return self.return_values[key] | |
# Save passed back value (ignoring null case) | |
if key is not None: | |
value = json.loads(value) | |
with self.lock: | |
# save the value | |
assert key not in self.values | |
self.values[key] = value | |
# inform waiters | |
assert key in self.pending | |
event = self.pending.pop(key) | |
event.set() | |
# Return what key to ask for next, or stop if we're done | |
next_key = self.queue.get() | |
if next_key is self.STOP: | |
self.finished = True | |
raise Exception("done") | |
self.return_values[key] = next_key | |
return next_key | |
def get_key(self, key): | |
"""Can be called from another thread to retrieve a specific key""" | |
with self.lock: | |
if self.stopped: | |
raise ValueError("Cannot get key - executor is stopped") | |
if key in self.values: | |
return self.values[key] | |
if key not in self.pending: | |
self.pending[key] = threading.Event() | |
self.queue.put(key) | |
event = self.pending[key] | |
event.wait() | |
if key not in self.values: | |
# The only case where this can happen is on ungraceful shutdown | |
raise ValueError("Failed to get key: {}".format(self.error)) | |
return self.values[key] | |
def get_keys(self): | |
"""As per get_key(), but returns the list of available keys""" | |
# True is used as a sentinel value that can pass the jsonnet-python barrier safely | |
return self.get_key(True) | |
def stop(self): | |
with self.lock: | |
if self.stopped: | |
return | |
self.stopped = True | |
self.queue.put(self.STOP) | |
class LazyProxy(collections.Mapping): | |
"""Wraps some jsonnet code that returns an object. | |
Acts like a dict, but only evaluates each key of the object the first time it's requested. | |
Call close() to release resources. | |
""" | |
def __init__(self, code): | |
self.executor = LazyExecutor(code) | |
self.executor.start() | |
def __iter__(self): | |
return iter(self.executor.get_keys()) | |
def __len__(self): | |
return len(self.executor.get_keys()) | |
def __getitem__(self, key): | |
return self.executor.get_key(key) | |
def __del__(self): | |
self.close() | |
def close(self): | |
self.executor.stop() | |
def test(code, *keys): | |
proxy = LazyProxy(code) | |
print proxy.keys() | |
for key in keys: | |
print proxy[key] | |
if __name__ == '__main__': | |
import argh | |
argh.dispatch_command(test) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment