Created
July 10, 2023 07:01
-
-
Save ensean/a814ac2cdbe0eaa0af536b7d3883f21e to your computer and use it in GitHub Desktop.
cloudfront function for request auth
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var crypto = require('crypto'); | |
//Update with your own key. | |
var magic_key = "Set_at_your_wish"; | |
// default validation is 12 hours, update it when necessary | |
var time_delta = 12 * 3600; | |
//Response when token ts does not match. | |
var response403 = { | |
statusCode: 403, | |
statusDescription: 'Unauthorized', | |
headers: { | |
'cache-control': { | |
'value': 'max-age=1296000' | |
} | |
} | |
}; | |
function is_valid_qs(token, ts, uri) { | |
var md5 = crypto.createHash('md5'); | |
var token_calc = md5.update(magic_key + uri + ts).digest('hex'); | |
// token mismatch or ts expired, invalid request | |
if( token_calc !== token || Date.now()/1000 > parseInt(ts) + time_delta){ | |
return false; | |
} | |
else | |
{ | |
return true; | |
} | |
}; | |
function handler(event) { | |
var request_ip = event.viewer.ip; | |
var request = event.request; | |
// ignore auth for prefetching request | |
if (request_ip === '127.0.0.1') { | |
// block previously cached prefetch urls | |
if (request.querystring.token){ | |
return response403; | |
} | |
else { | |
return request; | |
} | |
} | |
// If no token, then generate HTTP 403 response. | |
if(!request.querystring.token || !request.querystring.ts) { | |
console.log("Error: No token or ts in the querystring"); | |
return response403; | |
} | |
var token = request.querystring.token.value; | |
var ts = request.querystring.ts.value; | |
// If no token, then generate HTTP 403 response. | |
if(!token || !ts){ | |
return response403; | |
} | |
var uri = request.uri; | |
// invalid token, return 403 response | |
if (!is_valid_qs(token, ts, uri)){ | |
return response403; | |
} | |
return request; | |
} |
Python 样例后端
#!/usr/bin/env python3
import hashlib
import time
import sys
import statistics
def hash_uri_with_salt_and_time(uri, salt_string):
"""
Create a hash from URI + salt string + current epoch time
Args:
uri: The URI to hash
salt_string: A salt string to add to the hash
Returns:
Tuple of (hash_string, epoch_time_used)
"""
epoch_time = int(time.time())
# Combine the components
combined_string = f"{salt_string}{uri}{epoch_time}"
# Create MD5 hash
hash_obj = hashlib.md5(combined_string.encode())
hash_string = hash_obj.hexdigest()
return hash_string, epoch_time
def benchmark_hash(uri, salt_string, num_iterations=1000):
"""
Benchmark the time cost for creating multiple hash values
Args:
uri: The URI to hash
salt_string: A salt string to add to the hash
num_iterations: Number of hash values to create
Returns:
Dict with benchmark results
"""
times = []
start_total = time.time()
for _ in range(num_iterations):
start = time.time()
hash_uri_with_salt_and_time(uri, salt_string)
end = time.time()
times.append(end - start)
end_total = time.time()
return {
"total_time": end_total - start_total,
"avg_time": statistics.mean(times),
"min_time": min(times),
"max_time": max(times),
"median_time": statistics.median(times),
"iterations": num_iterations,
"hashes_per_second": num_iterations / (end_total - start_total)
}
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python hash_uri.py <uri> <salt_string> [num_iterations]")
sys.exit(1)
uri = sys.argv[1]
salt_string = sys.argv[2]
# Check if benchmark mode is requested
if len(sys.argv) > 3:
try:
num_iterations = int(sys.argv[3])
print(f"Running benchmark with {num_iterations} iterations...")
results = benchmark_hash(uri, salt_string, num_iterations)
print(f"\nBenchmark Results:")
print(f"Total time: {results['total_time']:.4f} seconds")
print(f"Average time per hash: {results['avg_time']*1000:.4f} ms")
print(f"Min time: {results['min_time']*1000:.4f} ms")
print(f"Max time: {results['max_time']*1000:.4f} ms")
print(f"Median time: {results['median_time']*1000:.4f} ms")
print(f"Hashes per second: {results['hashes_per_second']:.2f}")
except ValueError:
print("Error: num_iterations must be an integer")
sys.exit(1)
else:
# Regular single hash mode
hash_result, epoch_time = hash_uri_with_salt_and_time(uri, salt_string)
print(f"Input URI: {uri}")
print(f"Salt string: {salt_string}")
print(f"Epoch time: {epoch_time}")
print(f"Hash: {hash_result}")
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
加盐+文件名+时间戳 取 md5实现CloudFront url防盗链。