Skip to content

Instantly share code, notes, and snippets.

@sloev
Last active December 30, 2020 02:04
Show Gist options
  • Select an option

  • Save sloev/b0411b579d925857e6759bb774489161 to your computer and use it in GitHub Desktop.

Select an option

Save sloev/b0411b579d925857e6759bb774489161 to your computer and use it in GitHub Desktop.
etag tracking user journey

usage: uvicorn app:app --reload

tracks your moves through the use of etags in headers. encrypts and compresses user journey as folowing:

etag: 
W/"Fa08xKVTJi1uB-pqhSo93bvSsVE5-bPd3SrwcG7NKPLpTHpZzOrk7-MISWdUy-ZRRPwOJrNaXMjvAAQX7LpjILS2yrS7IS4PalW_jOE6YtEklfX-9l_Je20PG-m3OAFk-sfSlHJzlT4RLp_vLfejm_rfv5sKY-cBtx06xsDj0I7aHC-HaBtTPGQkJjaZIblmeWrHUIb7GY7Fd6csJJxEDqNsfD36LtD_F3NOF5JoHsYCKSffKB09qdfpHtFBiHA-"
           
etag decrypted and decompressed: 
{'id': 'VkMclPoCv_TH7lozaKfy3A7Nuai1uvFAg3yMDZPcCeU', 'j': [{'something': 'hello'}, 'foo', {'something': 'hello'}, {'something': 'hello'}, 'foo', 'http://127.0.0.1:8000/?event=baz', {'something': 'hello'}, {'something': 'hello'}, {'something': 'hello'}, 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'http://127.0.0.1:8000/?event=baz', 'http://127.0.0.1:8000/?event=baz', 'http://127.0.0.1:8000/?event=baz', {'something': 'hello'}, {'something': 'hello'}, {'something': 'hello'}, 'foo', 'foo', 'foo', 'http://127.0.0.1:8000/?event=baz', {'something': 'hello'}, 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', 'foo', {'something': 'hello'}, {'something': 'hello'}, {'something': 'hello'}, {'something': 'hello'}, 'foo', {'something': 'hello'}, 'http://127.0.0.1:8000/?event=baz', 'http://127.0.0.1:8000/?event=baz', 'http://127.0.0.1:8000/?event=baz', {'something': 'hello'}, 'http://127.0.0.1:8000/?event=baz', 'foo', 'foo', 'foo', 'foo', {'something': 'hello'}, 'http://127.0.0.1:8000/?event=baz']}
import msgpack
from typing import Optional
from fastapi import FastAPI, Response
from starlette.requests import Request
from fastapi.responses import HTMLResponse
import crypto
import json
import secrets
app = FastAPI()
password = b"foobar"
def gen_respone(journey, content_type):
buf = crypto.encrypt(json.dumps(
journey).encode(), password).decode()
etag = f'W/"{buf}"'
if 'json' in content_type:
return HTMLResponse(f"""
{journey}
<br>
{etag}""", headers={"ETag": etag})
else:
transparent_gif = b'GIF89a\x01\x00\x01\x00\x80\x00\x00\x00\x00\x00\xff\xff\xff!\xf9\x04\x01\x00\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x01D\x00;'
return Response(transparent_gif, media_type="image/gif", headers={"ETag": etag})
@app.get("/track")
def read_root(response: Response, request: Request):
event = request.headers.get("event")
try:
event = json.loads(event)
except:
pass
referer = request.headers.get("referer")
content_type = request.headers.get("content-type") or ""
# todo use content type to figure out if to return json
print(content_type)
journey = {"id": secrets.token_urlsafe(), "j": []}
event = event or referer
if event == "clear":
return gen_respone(journey, content_type)
incoming_header = request.headers.get("If-None-Match") or None
print(f"incoming header: '{incoming_header}'")
if incoming_header is not None:
buf = incoming_header[3:-1]
if buf:
print("incoming buf", buf)
journey = crypto.decrypt(buf, password)
journey = json.loads(journey)
print("incoming decoded array:", journey)
if event and content_type != "application/json":
journey["j"].append(event)
print("outgoing array:", journey)
return gen_respone(journey, content_type)
@app.get("/")
def read_root(response: Response, request: Request, event: Optional[str] = "nothing"):
if event == "baz":
return HTMLResponse(
"""
<img src="/track" />
<a href="/?event=clear">clear</a> . <a href="/?event=foo">foo</a> . <a href="/?event=bar">bar</a> . <a href="/?event=baz">baz</a>
<img src onerror="fetch('/track', {headers: {'content-type': 'application/json'}}).then(response => response.text()).then((response) => {console.log(response)})" />
""")
elif event == "bar":
return HTMLResponse(
"""
<img src onerror="fetch('/track',{headers: {event: JSON.stringify({something:'hello'})}})" />
<a href="/?event=clear">clear</a> . <a href="/?event=foo">foo</a> . <a href="/?event=bar">bar</a> . <a href="/?event=baz">baz</a>
<img src onerror="fetch('/track', {headers: {'content-type': 'application/json'}}).then(response => response.text()).then((response) => {console.log(response)})" />
""")
else:
return HTMLResponse(
"""
<img src onerror="fetch('/track',{headers: {event:'"""+event+"""'}})" />
<a href="/?event=clear">clear</a> . <a href="/?event=foo">foo</a> . <a href="/?event=bar">bar</a> . <a href="/?event=baz">baz</a>
<img src onerror="fetch('/track', {headers: {'content-type': 'application/json'}}).then(response => response.text()).then((response) => {console.log(response)})" />
""")
from Crypto.Cipher import AES
from Crypto import Random
import hashlib
import zlib
import base64
_block_size = 16
_key_size = 32
_mode = AES.MODE_CBC
# MODE_B64 : encryption result as base64 encoded string (default)
# MODE_BIN : raw encryption result
MODE_B64, MODE_BIN = 0, 1
def encrypt(s, pass_string, mode=MODE_B64):
h = hashlib.sha256()
h.update(pass_string)
key_bytes = h.digest()
s = zlib.compress(s)
pad = _block_size - len(s) % _block_size
data = s + bytes(pad*[pad])
iv_bytes = Random.get_random_bytes(_block_size)
encrypted_bytes = iv_bytes + AES.new(key_bytes, _mode, iv_bytes).encrypt(data)
if mode == MODE_B64:
encrypted_bytes = base64.urlsafe_b64encode(encrypted_bytes)
return encrypted_bytes
def decrypt(s, pass_string, mode=MODE_B64):
h = hashlib.sha256()
h.update(pass_string)
key_bytes = h.digest()
if mode == MODE_B64:
s = base64.urlsafe_b64decode(s)
encrypted_bytes = s
iv_bytes = encrypted_bytes[:_block_size]
encrypted_bytes = encrypted_bytes[_block_size:]
plain_text = AES.new(key_bytes, _mode, iv_bytes).decrypt(encrypted_bytes)
pad = plain_text[-1]
return zlib.decompress(plain_text[:-pad])
if __name__ == "__main__":
org_str = b"Some test data for simple adAES encryption 1234567890"
enc_str = encrypt(org_str, b'password')
print("ENC STR[%d:%s]" % (len(enc_str), enc_str))
dec_str = decrypt(enc_str, b'password')
assert(org_str == dec_str)
print("DEC STR[%d:%s]" % (len(dec_str), dec_str))
pycryptodome
uvicorn
fastapi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment