Last active
August 3, 2018 07:16
-
-
Save chiller/bcc59179d9c7221776113cb802a261cf to your computer and use it in GitHub Desktop.
find event in Eventstore stream by timestamp; find category stream index for event
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import eventstore | |
import requests | |
import sys | |
class RequestBuilder(): | |
def __init__(self, host, stream): | |
self.headers = { | |
"Accept": "application/vnd.eventstore.atom+json" | |
} | |
self.host = host | |
self.stream = stream | |
self.last_request = "" | |
def request(self, url, headers): | |
self.last_request = url | |
r = requests.get(url, headers=self.headers) | |
return r.json() | |
def last_event(self): | |
URL = "http://{host}:2113/streams/{stream}/head/backward/1".format( | |
host=self.host, stream=self.stream | |
) | |
return self.request(URL, headers=self.headers) | |
def forward(self, from_counter, page_size): | |
URL = "http://{host}:2113/streams/{stream}/{from_counter}/forward/{page_size}".format( | |
host=self.host, stream=self.stream, from_counter=from_counter, page_size=page_size | |
) | |
return self.request(URL, headers=self.headers) | |
def forward_tryharder(self, from_counter, page_size): | |
URL = "http://{host}:2113/streams/{stream}/{from_counter}/forward/{page_size}?embed=tryharder".format( | |
host=self.host, stream=self.stream, from_counter=from_counter, page_size=page_size | |
) | |
return self.request(URL, headers=self.headers) | |
def backward_tryharder(self, from_counter, page_size): | |
URL = "http://{host}:2113/streams/{stream}/{from_counter}/backward/{page_size}?embed=tryharder".format( | |
host=self.host, stream=self.stream, from_counter=from_counter, page_size=page_size | |
) | |
return self.request(URL, headers=self.headers) | |
def get(self, index): | |
URL = "http://{host}:2113/streams/{stream}/{index}?embed=tryharder".format( | |
host=self.host, stream=self.stream, index=index | |
) | |
return self.request(URL, headers=self.headers) | |
class Finder(): | |
def __init__(self, host, stream): | |
self.builder = RequestBuilder(host, stream) | |
def find_number_of_last_event(self): | |
resp_json = self.builder.last_event() | |
event_number = resp_json['eTag'].split(';')[0] | |
return int(event_number) | |
def find_number_of_first_event(self): | |
page_size = 10 | |
from_counter = 0 | |
entries = [] | |
while not entries: | |
args = [from_counter, page_size] | |
resp_json = self.builder.forward(*args) | |
entries = resp_json['entries'] | |
from_counter = from_counter + page_size | |
if page_size < 2000: | |
page_size = page_size * 2 | |
resp_json = self.builder.forward_tryharder(*args) | |
entries = resp_json['entries'] | |
return entries[-1]['positionEventNumber'] | |
def get_event_time_for_index(self, index): | |
resp_json = self.builder.get(index) | |
return resp_json['updated'] | |
def _bisect(self, from_index, to_index, timestamp): | |
if from_index >= to_index: | |
print("giving up at " + str(from_index)) | |
print(self.builder.last_request) | |
return | |
next_index = int(round((from_index + to_index) / 2.0)) | |
mid_timestamp = (self.get_event_time_for_index(next_index)) | |
if mid_timestamp.startswith(timestamp): | |
print("found " + str(next_index)) | |
print( | |
self.builder.last_request.replace( | |
'/streams/', '/web/index.html#/streams/' | |
) | |
) | |
return next_index | |
elif mid_timestamp < timestamp: | |
return self._bisect(next_index, to_index, timestamp) | |
else: | |
return self._bisect(from_index, next_index, timestamp) | |
def bisect(self, timestamp=''): | |
to_index = self.find_number_of_last_event() | |
from_index = self.find_number_of_first_event() | |
return self._bisect(from_index, to_index, timestamp.replace(' ', 'T')) | |
if __name__ == "__main__": | |
host, stream, timestamp = sys.argv[1:] | |
Finder(host, stream).bisect(timestamp=timestamp) | |
# example usage: python find_by_time.py localhost mystream '2017-05-12 16:49:01' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import find_by_time | |
import sys | |
class Finder(): | |
def __init__(self, host, stream, index): | |
self.builder = find_by_time.RequestBuilder(host, stream) | |
event = self.builder.get(index) | |
timestamp = event['updated'][:19] | |
category_stream = '$ce-' + stream.split('-')[0] | |
bisect_finder = find_by_time.Finder(host, category_stream) | |
result = bisect_finder.bisect(timestamp) | |
print(result) | |
if __name__ == "__main__": | |
host, stream, index = sys.argv[1:] | |
Finder(host, stream, index) | |
# example usage: python find_category_index.py localhost 'user-1001' 5 | |
# this will find /streams/user-1001/5 's position in /streams/$ce-user |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment