Created
September 14, 2024 04:19
-
-
Save ariankordi/0b990239daa1f69d571c7de3bec57cc4 to your computer and use it in GitHub Desktop.
Scripts to assist in deobfuscating Miitomo/DeNA XOR + LZ4 encryption, with a mitmproxy script, Go reverse proxy, and Python decoding script. (Note: AI slop)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mitmproxy.http | |
from mitmproxy import ctx | |
import lz4.block | |
import binascii | |
# NOTE: miitomo common key is '9ec1c78fa2cb34e2bed5691c08432f04' | |
COMMON_KEY = "9ec1c78fa2cb34e2bed5691c08432f04" | |
SESSION_ID_COOKIE_NAME = "player_session_id" | |
def transform_common_key(s): | |
""" | |
Transforms the common key by subtracting 0x62 from each character | |
and negating the result, similar to FUN_0004a120 in libsakasho.so. | |
Args: | |
s (str): Input common key string. | |
Returns: | |
bytes: Processed bytes. | |
""" | |
return bytes([(-0x62 - ord(c)) & 0xFF for c in s]) | |
def build_xor_table(common_key, session_id): | |
""" | |
Build the XOR table by processing the common key and session ID | |
and concatenating the results. | |
Args: | |
common_key (str): The common key string. | |
session_id (str): The session ID string. | |
Returns: | |
bytes: The XOR table. | |
""" | |
transformed_common = transform_common_key(common_key) | |
return transformed_common + session_id.encode('ascii') | |
def conditional_xor(data, xor_table, encode=False): | |
""" | |
Apply the conditional XOR operation to the data using the XOR table. | |
For each byte in the data: | |
- If (key_byte & 7) == 0, XOR the data byte with the key byte. | |
- Else, perform a bit rotation based on (key_byte & 7). | |
Args: | |
data (bytes): The obfuscated or plain data. | |
xor_table (bytes): The XOR table. | |
encode (bool): If True, perform encoding; if False, perform decoding. | |
Returns: | |
bytes: The data after applying the XOR operation. | |
""" | |
output = bytearray(len(data)) | |
table_len = len(xor_table) | |
for i in range(len(data)): | |
key_byte = xor_table[(i + 1) % table_len] | |
if (key_byte & 7) == 0: | |
# Perform XOR | |
output[i] = data[i] ^ key_byte | |
else: | |
# Perform bit rotation | |
shift = key_byte & 7 | |
if encode: | |
rotated = ((data[i] << (8 - shift)) | (data[i] >> shift)) & 0xFF | |
else: | |
rotated = ((data[i] >> (8 - shift)) | (data[i] << shift)) & 0xFF | |
output[i] = rotated | |
return bytes(output) | |
def decode_varint(data): | |
""" | |
Decode a varint from the beginning of the data. | |
Args: | |
data (bytes): The data containing the varint at the start. | |
Returns: | |
tuple: (decoded integer, number of bytes consumed) | |
Raises: | |
ValueError: If the varint is too long or incomplete. | |
""" | |
value = 0 | |
shift = 0 | |
for i, byte in enumerate(data): | |
value |= (byte & 0x7F) << shift | |
if (byte & 0x80) == 0: | |
return value, i + 1 | |
shift += 7 | |
if shift >= 35: | |
raise ValueError("Varint too long") | |
raise ValueError("Incomplete varint") | |
def encode_varint(value): | |
""" | |
Encode an integer into varint format. | |
Args: | |
value (int): The integer to encode. | |
Returns: | |
bytes: The varint-encoded bytes. | |
""" | |
parts = [] | |
while True: | |
byte = value & 0x7F | |
value >>= 7 | |
if value: | |
parts.append(byte | 0x80) | |
else: | |
parts.append(byte) | |
break | |
return bytes(parts) | |
class DenaObfuscation: | |
def __init__(self): | |
# Constant common key | |
self.common_key = COMMON_KEY | |
def get_session_id(self, flow: mitmproxy.http.HTTPFlow): | |
""" | |
Retrieve the session ID from cookies. | |
It first checks the response cookies, then the request cookies. | |
If not found, returns None. | |
Args: | |
flow (mitmproxy.http.HTTPFlow): The HTTP flow. | |
Returns: | |
str or None: The session ID if found, else None. | |
""" | |
# Check response cookies | |
if flow.response: | |
cookies = flow.response.cookies.get(SESSION_ID_COOKIE_NAME) | |
if cookies: | |
return cookies[0] if isinstance(cookies, tuple) else cookies | |
# Check request cookies | |
if flow.request: | |
cookies = flow.request.cookies.get(SESSION_ID_COOKIE_NAME) | |
if cookies: | |
return cookies[0] if isinstance(cookies, tuple) else cookies | |
return None | |
def should_process_flow(self, flow: mitmproxy.http.HTTPFlow) -> bool: | |
""" | |
Determine whether to process the flow based on specific criteria. | |
This function checks if the User-Agent contains "SakashoClient". | |
Args: | |
flow (mitmproxy.http.HTTPFlow): The HTTP flow. | |
Returns: | |
bool: True if the flow should be processed, otherwise False. | |
""" | |
# Check if User-Agent contains "SakashoClient" | |
user_agent = flow.request.headers.get("User-Agent", "") | |
if "SakashoClient" not in user_agent: | |
return False | |
# Skip /v1/session specifically | |
if flow.request.path == "/v1/session": | |
return False | |
return True | |
def request(self, flow: mitmproxy.http.HTTPFlow): | |
""" | |
Handle the HTTP request. | |
Encode the request body if it starts with "{". | |
""" | |
session_id = self.get_session_id(flow) | |
if not session_id or not self.should_process_flow(flow): | |
return | |
# Build XOR table | |
xor_table = build_xor_table(self.common_key, session_id) | |
if not xor_table: | |
ctx.log.error("XOR table is empty. Skipping flow.") | |
return | |
# Store XOR table in flow metadata | |
flow.metadata['xor_table'] = xor_table | |
if flow.request.content and flow.request.content.startswith(b'{'): | |
# If the request body starts with '{', encode it | |
try: | |
# Encode the content | |
encoded_data = self.encode_content(flow.request.content, xor_table) | |
# Replace the request content with encoded data | |
flow.request.content = encoded_data | |
ctx.log.info(f"Encoded request for flow {flow.id}") | |
except Exception as e: | |
ctx.log.error(f"Error encoding request for flow {flow.id}: {e}") | |
def response(self, flow: mitmproxy.http.HTTPFlow): | |
""" | |
Handle the HTTP response. | |
Decode the response body if applicable. | |
""" | |
session_id = self.get_session_id(flow) | |
if not session_id or not self.should_process_flow(flow): | |
return | |
# Build XOR table | |
xor_table = build_xor_table(self.common_key, session_id) | |
if not xor_table: | |
ctx.log.error("XOR table is empty. Skipping flow.") | |
return | |
# Store XOR table in flow metadata | |
flow.metadata['xor_table'] = xor_table | |
if flow.response.content: | |
try: | |
# Decode the response body | |
data_after_xor = conditional_xor(flow.response.content, xor_table, encode=False) | |
# Decode varint | |
varint, varint_length = decode_varint(data_after_xor) | |
# Extract compressed data | |
compressed_data = data_after_xor[varint_length:] | |
#print(data_after_xor) | |
# Decompress using LZ4 | |
decompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=varint) | |
# Replace the response content with decompressed data | |
flow.response.content = decompressed_data | |
flow.response.headers['Content-Type'] = 'application/json; charset=UTF-8' | |
# Mark that the response has been decoded | |
flow.metadata['decoded_response'] = { | |
"varint": varint, | |
"varint_length": varint_length, | |
"compressed_data": compressed_data, | |
"decompressed_data": decompressed_data | |
} | |
ctx.log.info(f"Decoded response for flow {flow.request.url} ({flow.id})") | |
except Exception as e: | |
ctx.log.warn(f"Error decoding response for flow {flow.request.url} ({flow.id}): {e}") | |
if flow.request.content: | |
# If the request body does not start with '{', decode it after response is done | |
try: | |
# Decode the request body | |
data_after_xor = conditional_xor(flow.request.content, xor_table, encode=False) | |
# Decode varint | |
varint, varint_length = decode_varint(data_after_xor) | |
# Extract compressed data | |
compressed_data = data_after_xor[varint_length:] | |
# Decompress using LZ4 | |
decompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=varint) | |
# Replace the request content with decompressed data | |
flow.request.content = decompressed_data | |
ctx.log.info(f"Decoded request for flow {flow.id} after response done") | |
except Exception as e: | |
ctx.log.error(f"Error decoding request for flow {flow.id} after response done: {e}") | |
def encode_content(self, content: bytes, xor_table: bytes): | |
""" | |
Encode the content using the provided XOR table. | |
Args: | |
content (bytes): The data to encode or decode. | |
xor_table (bytes): The XOR table. | |
encode (bool): Encoding or decoding. | |
Returns: | |
bytes: The encoded or decoded data. | |
""" | |
# Compress using LZ4 | |
compressed_data = lz4.block.compress(content) | |
# Encode varint (size of decompressed data) | |
varint = encode_varint(len(content)) | |
# Concatenate varint and compressed data | |
data_to_encode = varint + compressed_data[4:] | |
# Apply conditional XOR encoding | |
encoded_data = conditional_xor(data_to_encode, xor_table, encode=True) | |
return encoded_data | |
def encode_request(self, flow: mitmproxy.http.HTTPFlow): | |
""" | |
Encode the modified request content before sending it to the server. | |
""" | |
if 'decoded_request' in flow.metadata: | |
xor_table = flow.metadata['xor_table'] | |
decoded_data = flow.request.content | |
try: | |
# Encode the content | |
encoded_data = self.encode_content(decoded_data, xor_table) | |
# Replace the request content with encoded data | |
flow.request.content = encoded_data | |
ctx.log.info(f"Encoded modified request for flow {flow.request.url} ({flow.id})") | |
except Exception as e: | |
ctx.log.error(f"Error encoding modified request for flow {flow.request.url} ({flow.id}): {e}") | |
def encode_response(self, flow: mitmproxy.http.HTTPFlow): | |
""" | |
Encode the modified response content before sending it to the client. | |
""" | |
if 'decoded_response' in flow.metadata: | |
xor_table = flow.metadata['xor_table'] | |
decoded_data = flow.response.content | |
try: | |
# Encode the content | |
encoded_data = self.encode_content(decoded_data, xor_table) | |
# Replace the response content with encoded data | |
flow.response.content = encoded_data | |
ctx.log.info(f"Encoded modified response for flow {flow.request.url} ({flow.id})") | |
except Exception as e: | |
ctx.log.error(f"Error encoding modified response for flow {flow.request.url} ({flow.id})") | |
addons = [ | |
DenaObfuscation() | |
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import sys | |
import lz4.block | |
# NOTE: miitomo common key is 9ec1c78fa2cb34e2bed5691c08432f04 | |
# session id is player_session_id cookie | |
def transform_common_key(s): | |
""" | |
Transforms the common key by subtracting 0x62 from each character | |
and negating the result, similar to FUN_0004a120 in libcocos2dcpp.so. | |
Args: | |
s (str): Input common key string. | |
Returns: | |
bytes: Processed bytes. | |
""" | |
return bytes([(-0x62 - ord(c)) & 0xFF for c in s]) | |
def build_xor_table(common_key, session_id): | |
""" | |
Build the XOR table by processing the common key and session ID | |
and concatenating the results. | |
Args: | |
common_key (str): The common key string. | |
session_id (str): The session ID string. | |
Returns: | |
bytes: The XOR table. | |
""" | |
transformed_common = transform_common_key(common_key) | |
return transformed_common + session_id.encode('ascii') | |
def conditional_xor(data, xor_table): | |
""" | |
Apply the conditional XOR operation to the data using the XOR table. | |
For each byte in the data: | |
- If (key_byte & 7) == 0, XOR the data byte with the key byte. | |
- Else, perform a bit rotation based on (key_byte & 7). | |
Args: | |
data (bytes): The obfuscated data. | |
xor_table (bytes): The XOR table. | |
Returns: | |
bytes: The data after applying the XOR operation. | |
""" | |
output = bytearray(len(data)) | |
table_len = len(xor_table) | |
for i in range(len(data)): | |
key_byte = xor_table[(i + 1) % table_len] | |
if (key_byte & 7) == 0: | |
# Perform XOR | |
output[i] = data[i] ^ key_byte | |
else: | |
# Perform bit rotation | |
shift = key_byte & 7 | |
rotated = ((data[i] >> (8 - shift)) | (data[i] << shift)) & 0xFF | |
output[i] = rotated | |
return bytes(output) | |
def decode_varint(data): | |
""" | |
Decode a varint from the beginning of the data. | |
Args: | |
data (bytes): The data containing the varint at the start. | |
Returns: | |
tuple: (decoded integer, number of bytes consumed) | |
Raises: | |
ValueError: If the varint is too long or incomplete. | |
""" | |
value = 0 | |
shift = 0 | |
for i, byte in enumerate(data): | |
value |= (byte & 0x7F) << shift | |
if (byte & 0x80) == 0: | |
return value, i + 1 | |
shift += 7 | |
if shift >= 35: | |
raise ValueError("Varint too long") | |
raise ValueError("Incomplete varint") | |
def main(): | |
# Set up argument parsing | |
parser = argparse.ArgumentParser(description='Deobfuscate data using common key and session ID.') | |
parser.add_argument('--common-key', required=True, help='Common key string from setCommonKey.') | |
parser.add_argument('--session-id', required=True, help='Session ID string from updateSessionId.') | |
parser.add_argument('input_file', help='Path to the obfuscated data file.') | |
args = parser.parse_args() | |
# Build the XOR table | |
xor_table = build_xor_table(args.common_key, args.session_id) | |
if not xor_table: | |
print("Error: XOR table is empty. Check the common key and session ID inputs.") | |
sys.exit(1) | |
# Read the obfuscated data | |
try: | |
with open(args.input_file, 'rb') as f: | |
obfuscated_data = f.read() | |
except Exception as e: | |
print(f"Error reading input file: {e}") | |
sys.exit(1) | |
# Apply the conditional XOR operation | |
data_after_xor = conditional_xor(obfuscated_data, xor_table) | |
# Decode varint | |
try: | |
varint, varint_length = decode_varint(data_after_xor) | |
except ValueError as e: | |
print(f"Error decoding varint: {e}") | |
sys.exit(1) | |
print(f"=== Varint (Decompressed Size) ===") | |
print(varint) | |
print(f"Bytes consumed for varint: {varint_length}") | |
# Extract compressed data | |
compressed_data = data_after_xor[varint_length:] | |
# Decompress using LZ4 | |
try: | |
decompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=varint) | |
except lz4.block.LZ4BlockError as e: | |
print(f"Error during LZ4 decompression: {e}") | |
# Optionally, print the compressed data for inspection | |
print("\nCompressed Data (Hex):") | |
print(compressed_data.hex()) | |
sys.exit(1) | |
# Print the decompressed data | |
print("\n=== Decompressed Data ===") | |
# Attempt to decode as UTF-8; if fails, print as hex | |
try: | |
print(decompressed_data.decode('utf-8')) | |
except UnicodeDecodeError: | |
print(decompressed_data.hex()) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"crypto/tls" | |
"flag" | |
"io" | |
"log" | |
"net/http" | |
"net/http/httputil" | |
"net/http/httptest" | |
"net/url" | |
"strings" | |
//"strconv" | |
// for access logs | |
"fmt" | |
"net" | |
"os" | |
"time" | |
"github.com/pierrec/lz4" | |
) | |
const commonKey = "9ec1c78fa2cb34e2bed5691c08432f04" | |
// transformCommonKey transforms the common key by subtracting 0x62 from each character | |
// and negating the result, similar to FUN_0004a120 in libsakasho.so. | |
func transformCommonKey(s string) []byte { | |
transformed := make([]byte, len(s)) | |
for i, c := range s { | |
transformed[i] = byte((-0x62 - int(c)) & 0xFF) | |
} | |
return transformed | |
} | |
// buildXorTable builds the XOR table by processing the common key and session ID | |
// and concatenating the results. | |
func buildXorTable(commonKey string, sessionID string) []byte { | |
transformedCommon := transformCommonKey(commonKey) | |
return append(transformedCommon, []byte(sessionID)...) | |
} | |
// conditionalXOR applies the conditional XOR operation to the data using the XOR table. | |
// For each byte in the data: | |
// - If (key_byte & 7) == 0, XOR the data byte with the key byte. | |
// - Else, perform a bit rotation based on (key_byte & 7). | |
func conditionalXOR(data []byte, xorTable []byte, encode bool) []byte { | |
output := make([]byte, len(data)) | |
tableLen := len(xorTable) | |
for i := 0; i < len(data); i++ { | |
keyByte := xorTable[(i+1)%tableLen] | |
if keyByte&7 == 0 { | |
// Perform XOR | |
output[i] = data[i] ^ keyByte | |
} else { | |
// Perform bit rotation | |
shift := keyByte & 7 | |
if encode { | |
rotated := ((data[i] << (8 - shift)) | (data[i] >> shift)) & 0xFF | |
output[i] = rotated | |
} else { | |
rotated := ((data[i] >> (8 - shift)) | (data[i] << shift)) & 0xFF | |
output[i] = rotated | |
} | |
} | |
} | |
return output | |
} | |
// decodeVarint decodes a varint from the beginning of the data. | |
func decodeVarint(data []byte) (int, int) { | |
var value int | |
var shift int | |
for i, byte := range data { | |
value |= int(byte&0x7F) << shift | |
if byte&0x80 == 0 { | |
return value, i + 1 | |
} | |
shift += 7 | |
if shift >= 35 { | |
panic("Varint too long") | |
} | |
} | |
panic("Incomplete varint") | |
} | |
// encodeVarint encodes an integer into varint format. | |
func encodeVarint(value int) []byte { | |
var parts []byte | |
for { | |
b := byte(value & 0x7F) | |
value >>= 7 | |
if value != 0 { | |
parts = append(parts, b|0x80) | |
} else { | |
parts = append(parts, b) | |
break | |
} | |
} | |
return parts | |
} | |
// decodeAndDecompress takes the request body, decodes it using XOR, | |
// extracts the compressed data, decompresses it using LZ4, and returns the result. | |
func decodeAndDecompress(body io.Reader, xorTable []byte) ([]byte, error) { | |
// Stream read | |
buf := new(bytes.Buffer) | |
_, err := io.Copy(buf, body) | |
if err != nil { | |
return nil, err | |
} | |
data := buf.Bytes() | |
dataAfterXOR := conditionalXOR(data, xorTable, false) | |
// Decode varint | |
varint, varintLength := decodeVarint(dataAfterXOR) | |
compressedData := dataAfterXOR[varintLength:] | |
// Decompress the data using LZ4 | |
decompressed := make([]byte, varint) | |
decompressedLen, err := lz4.UncompressBlock(compressedData, decompressed) | |
if err != nil { | |
return nil, err | |
} | |
return decompressed[:decompressedLen], nil | |
} | |
// compressAndEncode compresses the given data using LZ4, | |
// encodes it using XOR, and returns the result. | |
func compressAndEncode(body []byte, xorTable []byte) ([]byte, error) { | |
// Compress using LZ4 | |
compressed := make([]byte, lz4.CompressBlockBound(len(body))) | |
compressedLen, err := lz4.CompressBlock(body, compressed, nil) | |
if err != nil { | |
return nil, err | |
} | |
compressed = compressed[:compressedLen] | |
// Encode varint (size of decompressed data) | |
varint := encodeVarint(len(body)) | |
// Concatenate varint and compressed data | |
dataToEncode := append(varint, compressed...) | |
// Apply conditional XOR encoding | |
encodedData := conditionalXOR(dataToEncode, xorTable, true) | |
return encodedData, nil | |
} | |
// proxyHandler handles incoming requests, decodes them, forwards to upstream, and re-encodes responses. | |
func proxyHandler(upstreamURL *url.URL, certFile, keyFile string) func(w http.ResponseWriter, r *http.Request) { | |
upstreamProxy := httputil.NewSingleHostReverseProxy(upstreamURL) | |
upstreamProxy.Transport = &http.Transport{ | |
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, | |
Proxy: http.ProxyFromEnvironment, // Allow proxy config from env | |
} | |
// Override the Director function to ensure we pass the original URL's host header | |
upstreamProxy.Director = func(req *http.Request) { | |
// Set the scheme and host to the upstream server's | |
req.URL.Scheme = upstreamURL.Scheme | |
req.URL.Host = upstreamURL.Host | |
req.Host = upstreamURL.Host | |
// Copy over the original path and query | |
req.URL.Path = req.URL.Path | |
req.URL.RawQuery = req.URL.RawQuery | |
// Set the Host header explicitly | |
req.Header.Set("Host", upstreamURL.Host) | |
//req.Header.Del("Content-Length") | |
} | |
return func(w http.ResponseWriter, r *http.Request) { | |
sessionIDCookie, _ := r.Cookie("player_session_id") | |
userAgent := r.Header.Get("User-Agent") | |
// Bypass de/obfuscation logic if sessionID is missing, User-Agent contains "SakashoClient", or it's /v1/session | |
if sessionIDCookie == nil || !strings.Contains(userAgent, "SakashoClient") || r.URL.Path == "/v1/session" { | |
// Directly proxy without de/obfuscation | |
upstreamProxy.ServeHTTP(w, r) | |
return | |
} | |
sessionID := sessionIDCookie.Value | |
xorTable := buildXorTable(commonKey, sessionID) | |
// Properly handle empty request bodies | |
if r.Body != nil { | |
buf := new(bytes.Buffer) | |
_, err := io.Copy(buf, r.Body) | |
if err != nil && err != io.EOF { | |
http.Error(w, "Failed to read request body", http.StatusInternalServerError) | |
return | |
} | |
// Only decode if the body is not empty | |
if buf.Len() > 0 { | |
decodedBody, err := decodeAndDecompress(buf, xorTable) | |
if err != nil { | |
http.Error(w, "Failed to decode request", http.StatusInternalServerError) | |
return | |
} | |
//r.Header.Set("Content-Length", strconv.Itoa(len(decodedBody))) | |
// Replace body with decoded data | |
r.Body = io.NopCloser(bytes.NewReader(decodedBody)) | |
// Remove Content-Length header since the body size has changed | |
r.Header.Del("Content-Length") | |
r.ContentLength = -1 | |
} | |
} | |
// Modify headers for upstream server if necessary | |
r.Host = upstreamURL.Host | |
r.URL.Scheme = upstreamURL.Scheme | |
r.URL.Host = upstreamURL.Host | |
// Set the Host header to match the upstream server | |
r.Header.Set("Host", upstreamURL.Host) | |
// Capture the upstream response | |
rec := httptest.NewRecorder() | |
upstreamProxy.ServeHTTP(rec, r) | |
// Pass through upstream headers to client | |
for k, v := range rec.Header() { | |
w.Header()[k] = v | |
} | |
// Re-encode the response body | |
if rec.Body != nil && rec.Body.Len() > 0 { | |
encodedBody, err := compressAndEncode(rec.Body.Bytes(), xorTable) | |
if err != nil { | |
http.Error(w, "Failed to encode response", http.StatusInternalServerError) | |
return | |
} | |
// Copy encoded response to original response writer | |
w.WriteHeader(rec.Code) | |
/*for k, v := range rec.Header() { | |
w.Header()[k] = v | |
}*/ | |
w.Write(encodedBody) | |
} | |
} | |
} | |
func main() { | |
// Argument parsing for certificate, upstream, and hostname to client | |
var upstreamAddr string | |
var hostname string | |
var certFile string | |
var keyFile string | |
flag.StringVar(&upstreamAddr, "upstream", "https://upstream.server", "Upstream server URL") | |
flag.StringVar(&hostname, "hostname", "localhost:8080", "Hostname and port for the client to connect to") | |
flag.StringVar(&certFile, "cert", "cert.pem", "TLS certificate file") | |
flag.StringVar(&keyFile, "key", "key.pem", "TLS key file") | |
flag.Parse() | |
upstreamURL, err := url.Parse(upstreamAddr) | |
if err != nil { | |
log.Fatalf("Invalid upstream URL: %v", err) | |
} | |
http.Handle("/", logRequest(http.HandlerFunc(proxyHandler(upstreamURL, certFile, keyFile)))) | |
log.Printf("Starting proxy server on %s", hostname) | |
log.Fatal(http.ListenAndServeTLS(hostname, certFile, keyFile, nil)) | |
} | |
// fancy access logs | |
const ( | |
// ANSI color codes for access logs | |
ANSIReset = "\033[0m" | |
ANSIRed = "\033[31m" | |
ANSIGreen = "\033[32m" | |
ANSIYellow = "\033[33m" | |
ANSIPurple = "\033[35m" | |
ANSIFaint = "\033[2m" | |
ANSIBold = "\033[1m" | |
ANSICyan = "\033[36m" | |
ANSIBgRed = "\033[101m" | |
ANSIBgBlue = "\033[104m" | |
ANSIBgMagenta = "\033[105m" | |
) | |
func isColorTerminal() bool { | |
// NOTE: hack | |
return os.Getenv("TERM") == "xterm-256color" | |
} | |
// getClientIP retrieves the client IP address from the request, | |
// considering the X-Forwarded-For header if present. | |
func getClientIP(r *http.Request) string { | |
host, _, _ := net.SplitHostPort(r.RemoteAddr) | |
return host | |
} | |
// responseWriter is a custom http.ResponseWriter that captures the status code | |
type responseWriter struct { | |
http.ResponseWriter | |
statusCode int | |
} | |
// newResponseWriter creates a new responseWriter | |
func newResponseWriter(w http.ResponseWriter) *responseWriter { | |
return &responseWriter{w, http.StatusOK} | |
} | |
// WriteHeader captures the status code | |
func (rw *responseWriter) WriteHeader(code int) { | |
rw.statusCode = code | |
rw.ResponseWriter.WriteHeader(code) | |
} | |
// logRequest logs each request in Apache/Nginx standard format with ANSI colors | |
func logRequest(handler http.Handler) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
start := time.Now() | |
rw := newResponseWriter(w) | |
handler.ServeHTTP(rw, r) | |
status := rw.statusCode | |
latency := time.Since(start) | |
clientIP := getClientIP(r) | |
if isColorTerminal() { | |
statusColor := ANSIGreen | |
// Determine the status color | |
if status >= 400 && status < 500 { | |
statusColor = ANSIYellow | |
} else if status >= 500 { | |
statusColor = ANSIRed | |
} | |
latencyColor := getLatencyGradientColor(latency) | |
clientIPColor := ANSICyan | |
if r.Header.Get("X-Forwarded-For") != "" { | |
clientIPColor = ANSIBgMagenta | |
} | |
var query string | |
if r.URL.RawQuery != "" { | |
query += "?" | |
} | |
query += r.URL.RawQuery | |
queryColored := colorQueryParameters(query) | |
// so many colors..... | |
fmt.Println(clientIPColor + clientIP + ANSIReset + | |
" - - [" + start.Format("02/Jan/2006:15:04:05 -0700") + "] \"" + | |
ANSIGreen + r.Method + " " + r.URL.Path + queryColored + " " + ANSIReset + | |
ANSIFaint + r.Proto + ANSIReset + "\" " + | |
statusColor + fmt.Sprint(status) + ANSIReset + " " + | |
fmt.Sprint(r.ContentLength) + " \"" + | |
ANSIPurple + r.Referer() + ANSIReset + "\" \"" + | |
ANSIFaint + r.UserAgent() + ANSIReset + "\" " + | |
latencyColor + fmt.Sprint(latency) + ANSIReset) | |
} else { | |
// apache/nginx request format with latency at the end | |
fmt.Println(clientIP + " - - [" + start.Format("02/Jan/2006:15:04:05 -0700") + "] \"" + | |
r.Method + " " + r.RequestURI + " " + r.Proto + "\" " + | |
fmt.Sprint(status) + " " + fmt.Sprint(r.ContentLength) + " \"" + | |
r.Referer() + "\" \"" + r.UserAgent() + "\" " + | |
fmt.Sprint(latency)) | |
} | |
}) | |
} | |
// Color ranges for latency gradient | |
var latencyColors = []string{ | |
"\033[38;5;39m", // Blue | |
"\033[38;5;51m", // Light blue | |
"\033[38;5;27m", // Added color (Dark blue) | |
"\033[38;5;82m", // Green | |
"\033[38;5;34m", // Added color (Forest green) | |
"\033[38;5;154m", // Light green | |
"\033[38;5;220m", // Yellow | |
"\033[38;5;208m", // Orange | |
"\033[38;5;198m", // Light red | |
} | |
// getLatencyGradientColor returns a gradient color based on the latency | |
func getLatencyGradientColor(latency time.Duration) string { | |
millis := latency.Milliseconds() | |
// Define latency thresholds | |
thresholds := []int64{40, 60, 85, 100, 150, 230, 400, 600} | |
for i, threshold := range thresholds { | |
if millis < threshold { | |
return latencyColors[i] | |
} | |
} | |
return latencyColors[len(latencyColors)-1] | |
} | |
// colorQueryParameters colors the query parameters | |
func colorQueryParameters(query string) string { | |
if query == "" { | |
return "" | |
} | |
// NOTE: the question mark and first query key are colored the same | |
params := strings.Split(query, "&") | |
var coloredParams []string | |
for _, param := range params { | |
keyValue := strings.Split(param, "=") | |
if len(keyValue) == 2 { | |
coloredParams = append(coloredParams, fmt.Sprintf("%s%s%s=%s%s%s", ANSICyan, keyValue[0], ANSIReset, ANSIYellow, keyValue[1], ANSIReset)) | |
} else { | |
coloredParams = append(coloredParams, param) | |
} | |
} | |
return strings.Join(coloredParams, "&") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment