Skip to content

Instantly share code, notes, and snippets.

@Arachnid
Created August 11, 2018 12:34
Show Gist options
  • Save Arachnid/7471c4a0e4cc873f6626e61a20d6917e to your computer and use it in GitHub Desktop.
Save Arachnid/7471c4a0e4cc873f6626e61a20d6917e to your computer and use it in GitHub Desktop.
import logging
import json
import sys
import mapreduce
def mapper(key, value):
if value['event'] == 'AuctionStarted':
yield (value['args']['hash'], {'registered': value['args']['registrationDate']})
elif value['event'] == 'BidRevealed' and value['args']['status'] in (2, 3):
yield (value['args']['hash'], {'bid': value['args']['value'] / 1e18, 'bidder': value['args']['owner']})
def reducer(key, values):
maxBid = 0.01
winPrice = None
registrationDate = None
bidder = None
for value in values:
if 'registered' in value:
# New auction started
maxBid = 0.01
winPrice = None
registrationDate = value['registered']
bidder = None
elif value['bid'] >= maxBid:
winPrice = maxBid
maxBid = value['bid']
bidder = value['bidder']
elif value['bid'] >= winPrice:
winPrice = value['bid']
return {'price': winPrice, 'registered': registrationDate, 'owner': bidder}
def indexEvent(event):
return ((event['transactionHash'], event['logIndex']), event)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
result = mapreduce.mapreduce(
mapper,
reducer,
(indexEvent(row) for row in mapreduce.jsonIterator(sys.stdin)))
for entry in result:
sys.stdout.write(json.dumps(entry) + "\n")
from eth_utils import force_bytes
from web3.utils.events import get_event_data, event_abi_to_log_topic
import json
import logging
import sys
def decode_event(abi_map, event):
event.update({
'blockHash': None,
'logIndex': 0 if event['logIndex'] == '0x' else event['logIndex'],
'transactionIndex': 0 if event['transactionIndex'] == '0x' else event['transactionIndex'],
})
abi = abi_map[event['topics'][0][2:].decode('hex')]
decoded = get_event_data(abi, event)
for arg in abi['inputs']:
if arg['type'] == 'bytes32':
decoded['args'][arg['name']] = '0x' + force_bytes(decoded['args'][arg['name']]).encode('hex')
return decoded
def main():
abi = json.load(open(sys.argv[1]))
abi_map = {
event_abi_to_log_topic(a): a
for a in abi
if a['type'] == 'event'
}
for line in sys.stdin:
line = json.loads(line)
try:
event = decode_event(abi_map, line)
sys.stdout.write(json.dumps(event) + "\n")
except:
logging.exception("Decoding event")
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
import logging
import requests
import sys
import time
import json
import web3.utils as utils
from web3 import Web3, RPCProvider
#LOG_URL = "https://api.etherscan.io/api?module=logs&action=getLogs&fromBlock=%(fromblock)s&toBlock=%(toblock)s&address=%(address)s&apikey=%(apikey)s"
INITIAL_BATCH_SIZE = 1000
TARGET_RESULT_SIZE = 2000
#API_KEY = "DAG3G62PXEPYXBMTEMUVP3TDSB1CQR1U9B"
web3 = Web3(RPCProvider())
def get_events(fromblock, toblock, address):
logging.info("Fetching events for %s starting at %d for %d blocks", address, fromblock, toblock - fromblock)
filter = web3.eth.filter({'address': address, 'fromBlock': fromblock, 'toBlock': toblock})
entries = filter.get(False)
web3.eth.uninstallFilter(filter.filter_id)
return entries
# response = requests.get(LOG_URL % {
# 'fromblock': fromblock,
# 'toblock': toblock,
# 'address': address,
# 'apikey': API_KEY,
# })
# response.raise_for_status()
# data = response.json()
# if int(data['status']) != 1 and data['message'] != "No records found":
# raise Exception("Invalid return status %s: %s" % (data['status'], data['message']))
# return data['result']
def get_all_events(fromblock, toblock, address):
batch_size = INITIAL_BATCH_SIZE
while fromblock < toblock:
limit = min(fromblock + batch_size, toblock)
while True:
try:
events = get_events(fromblock, limit, address)
break
except Exception, e:
logging.exception("Error fetching events, retrying")
time.sleep(5)
if len(events) >= 1000:
# We might be missing some events
blocknums = map(lambda x:x['blockNumber'], events)
maxblock, minblock = max(blocknums), min(blocknums)
events_per_block = len(events) / float(max(blocknums) - min(blocknums))
batch_size = int(TARGET_RESULT_SIZE / events_per_block)
fromblock = min(blocknums)
# Filter out events on the last block, since that one might be incomplete
events = [event for event in events if event['blockNumber'] < maxblock]
elif len(events) > 0:
events_per_block = len(events) / float(limit - fromblock)
new_batch_size = int(TARGET_RESULT_SIZE / events_per_block)
batch_size = min(batch_size * 2, max(batch_size / 2, new_batch_size))
else:
batch_size *= 2
logging.info("Outputting %d events", len(events))
for event in events:
yield event
fromblock = limit
def main():
fromblock, toblock, address = int(sys.argv[1]), int(sys.argv[2]), sys.argv[3]
for event in get_all_events(fromblock, toblock, address):
sys.stdout.write(json.dumps(event) + "\n")
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
import logging
import json
import sys
import mapreduce
def mapper(key, value):
if value['event'] == 'BidRevealed':
yield (value['args']['hash'], None)
def identityReducer(key, values):
return key
def indexEvent(event):
return ((event['transactionHash'], event['logIndex']), event)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
result = mapreduce.mapreduce(
mapper,
identityReducer,
(indexEvent(row) for row in mapreduce.jsonIterator(sys.stdin)))
for entry in result:
sys.stdout.write(entry[0] + "\n")
from collections import defaultdict
import json
import logging
def mapreduce(mapper, reducer, input):
"""
Runs a mapreduce.
Arguments:
- mapper: A function (key, value) -> [(key, value), ...]
- reducer: A function (key, [value, ...]) -> value
- input: An iterator of (key, value) pairs
Returns:
An iterator of (key, value) outputs from the reducers.
"""
state = defaultdict(list)
for i, (k, v) in enumerate(input):
if i % 10000 == 0:
logging.info("Mapped %d entries", i)
for ok, ov in mapper(k, v):
state[ok].append(ov)
for i, (k, v) in enumerate(state.iteritems()):
if i % 10000 == 0:
logging.info("Reduced %d entries", i)
yield (k, reducer(k, v))
def jsonIterator(stream):
for line in stream:
yield json.loads(line)
import csv
import gzip
import bisect
import logging
import json
import sha3
import sys
import mapreduce
def loadPreimages(path):
global preimages
logging.info("Loading preimages...")
preimages = [x.strip('\n') for x in gzip.open(path)]
logging.info("Preimages loaded.")
def findPreimage(hash, off=0, length=None):
if length is None:
length = len(preimages)
if length <= 1:
return preimages[off] if sha3.keccak_256(preimages[off]).digest() == hash else None
idx = off + length / 2
h = sha3.keccak_256(preimages[idx]).digest()
if hash < h:
return findPreimage(hash, off, length / 2)
elif hash > h:
return findPreimage(hash, idx + 1, length / 2)
else:
return preimages[idx]
prices = [(int(row['time']), float(row['close'])) for row in csv.DictReader(open(sys.argv[2]))]
def get_ether_price(timestamp):
p = prices[bisect.bisect_left(prices, (timestamp * 1000, 0.0))]
return p[1]
def mapper(key, value):
key = key[2:].decode('hex')
preimage = findPreimage(key)
if preimage and value['price'] is not None and value['registered'] is not None:
value['pricegbp'] = get_ether_price(value['registered']) * value['price']
yield (preimage, value)
def maxReducer(key, values):
return max(values)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
loadPreimages(sys.argv[1])
result = mapreduce.mapreduce(
mapper,
maxReducer,
mapreduce.jsonIterator(sys.stdin))
for entry in result:
sys.stdout.write(json.dumps(entry) + "\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment