Created
April 11, 2018 19:28
-
-
Save ndmanvar/782075420e88c9305e872a9beb8fa203 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import sys | |
import fileinput | |
import binascii | |
import re | |
import json | |
import six | |
import zlib | |
# Used to find end of the Headers section | |
EMPTY_LINE = b'\r\n\r\n' | |
def log(msg): | |
""" | |
Logging to STDERR as STDOUT and STDIN used for data transfer | |
@type msg: str or byte string | |
@param msg: Message to log to STDERR | |
""" | |
try: | |
msg = str(msg) + '\n' | |
except: | |
pass | |
sys.stderr.write(msg) | |
sys.stderr.flush() | |
def find_end_of_headers(byte_data): | |
""" | |
Finds where the header portion ends and the content portion begins. | |
@type byte_data: str or byte string | |
@param byte_data: Hex decoded req or resp string | |
""" | |
return byte_data.index(EMPTY_LINE) + 4 | |
def decompress_gzip(encoded_data): | |
try: | |
fp = BytesIO(encoded_data) | |
try: | |
f = GzipFile(fileobj=fp) | |
return f.read().decode('utf-8') | |
finally: | |
f.close() | |
except Exception as e: | |
# This error should be caught as it suggests that there's a | |
# bug somewhere in the client's code. | |
# self.log.debug(six.text_type(e), exc_info=True) | |
raise APIError('Bad data decoding request (%s, %s)' % | |
(type(e).__name__, e)) | |
def decompress_deflate(encoded_data): | |
try: | |
return zlib.decompress(encoded_data).decode('utf-8') | |
except Exception as e: | |
# This error should be caught as it suggests that there's a | |
# bug somewhere in the client's code. | |
# self.log.debug(six.text_type(e), exc_info=True) | |
raise APIError('Bad data decoding request (%s, %s)' % | |
(type(e).__name__, e)) | |
def decode_and_decompress_data(encoded_data): | |
try: | |
try: | |
return zlib.decompress(base64.b64decode(encoded_data)).decode('utf-8') | |
except zlib.error: | |
return base64.b64decode(encoded_data).decode('utf-8') | |
except Exception as e: | |
# This error should be caught as it suggests that there's a | |
# bug somewhere in the client's code. | |
# self.log.debug(six.text_type(e), exc_info=True) | |
raise APIError('Bad data decoding request (%s, %s)' % | |
(type(e).__name__, e)) | |
def decode_data(encoded_data): | |
try: | |
return encoded_data.decode('utf-8') | |
except UnicodeDecodeError as e: | |
# This error should be caught as it suggests that there's a | |
# bug somewhere in the client's code. | |
# self.log.debug(six.text_type(e), exc_info=True) | |
raise APIError('Bad data decoding request (%s, %s)' % | |
(type(e).__name__, e)) | |
def safely_load_json_string(json_string): | |
try: | |
if isinstance(json_string, six.binary_type): | |
json_string = json_string.decode('utf-8') | |
obj = json.loads(json_string) | |
assert isinstance(obj, dict) | |
except Exception as e: | |
raise APIError('Bad data reconstructing object (%s, %s)' % | |
(type(e).__name__, e)) | |
return obj | |
def process_stdin(): | |
""" | |
Process STDIN and output to STDOUT | |
""" | |
for raw_line in fileinput.input(): | |
line = raw_line.rstrip() | |
# Decode base64 encoded line | |
decoded = bytes.fromhex(line) | |
# Split into metadata and payload, the payload is headers + body | |
(raw_metadata, payload) = decoded.split(b'\n', 1) | |
# Split into headers and payload | |
headers_pos = find_end_of_headers(payload) | |
raw_headers = payload[:headers_pos] | |
raw_content = payload[headers_pos:] | |
raw_headers = raw_headers.decode('utf-8') | |
raw_headers = re.sub(r'POST /api/([0-9]+)/', r'POST /api/2/', raw_headers) # need to add in new proj_id | |
raw_headers = re.sub(r'&sentry_key=([0-9a-z]+)', r'&sentry_key=79d5b2ca3792415da40d03780fd50115', raw_headers) # need to add in new proj_id | |
if raw_content: # check if post API? | |
# get content-type and act accordingly | |
# if isinstance(data, six.binary_type): | |
# if content_encoding == 'gzip': | |
# data = decompress_gzip(data) | |
# elif content_encoding == 'deflate': | |
# data = decompress_deflate(data) | |
# elif data[0] != b'{': | |
# data = decode_and_decompress_data(data) | |
# else: | |
# data = decode_data(data) | |
# if isinstance(raw_content, six.text_type): | |
# data = safely_load_json_string(raw_content) | |
# data["project"] = "2" # TODO | |
# data = json.dumps(raw_content).encode('utf-8') | |
# new_str = 'Content-Length: %s' % len(data) | |
# raw_headers = re.sub(r'Content-Length: [0-9]+', new_str, raw_headers) | |
# raw_headers = raw_headers.encode('utf-8') # decode back... maybe utf-8? | |
# encoded = binascii.hexlify(raw_metadata + b'\n' + raw_headers + data).decode('ascii') | |
# sys.stdout.write(encoded + '\n') | |
# log('getting here\n\n\n') | |
raw_content = safely_load_json_string(raw_content) # decoding | |
# raw_content["project"] = "NEW_PROJECT" | |
raw_content["project"] = "2" | |
raw_content = json.dumps(raw_content).encode('utf-8') | |
new_str = 'Content-Length: %s' % len(raw_content) | |
raw_headers = re.sub(r'Content-Length: [0-9]+', new_str, raw_headers) | |
raw_headers = raw_headers.encode('utf-8') # decode back... maybe utf-8? | |
encoded = binascii.hexlify(raw_metadata + b'\n' + raw_headers + raw_content).decode('ascii') | |
log('here\n\n\n') | |
sys.stdout.write(encoded + '\n') | |
if __name__ == '__main__': | |
process_stdin() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment