Skip to content

Instantly share code, notes, and snippets.

@FoamyGuy
Created August 25, 2025 14:53
Show Gist options
  • Save FoamyGuy/6375c15b8e5d67753d23017e63023e51 to your computer and use it in GitHub Desktop.
Save FoamyGuy/6375c15b8e5d67753d23017e63023e51 to your computer and use it in GitHub Desktop.
class PollyHTTPClient:
def __init__(self, requests_instance, access_key, secret_key, region='us-east-1'):
self._requests = requests_instance
self.access_key = access_key
self.secret_key = secret_key
self.region = region
self.service = 'polly'
self.host = f'polly.{region}.amazonaws.com'
self.endpoint = f'https://{self.host}'
def _sign(self, key, msg):
"""Helper function for AWS signature"""
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def _get_signature_key(self, date_stamp):
"""Generate AWS signature key"""
k_date = self._sign(('AWS4' + self.secret_key).encode('utf-8'), date_stamp)
k_region = self._sign(k_date, self.region)
k_service = self._sign(k_region, self.service)
k_signing = self._sign(k_service, 'aws4_request')
return k_signing
def _create_canonical_request(self, method, uri, query_string, headers, payload):
"""Create canonical request for AWS Signature V4"""
canonical_uri = url_encode(uri, safe='/')
canonical_querystring = query_string
# Create canonical headers
canonical_headers = ''
signed_headers = ''
header_names = sorted(headers.keys())
for name in header_names:
canonical_headers += f'{name.lower()}:{headers[name].strip()}\n'
signed_headers += f'{name.lower()};'
signed_headers = signed_headers[:-1] # Remove trailing semicolon
# Create payload hash
payload_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()
# Create canonical request
canonical_request = f'{method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{payload_hash}'
return canonical_request, signed_headers
def _create_string_to_sign(self, timestamp, credential_scope, canonical_request):
"""Create string to sign for AWS Signature V4"""
algorithm = 'AWS4-HMAC-SHA256'
canonical_request_hash = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
string_to_sign = f'{algorithm}\n{timestamp}\n{credential_scope}\n{canonical_request_hash}'
return string_to_sign
def synthesize_speech(self, text, voice_id='Joanna', output_format='mp3',
engine='standard', text_type='text'):
"""
Synthesize speech using Amazon Polly via direct HTTP request
Args:
text (str): Text to convert to speech
voice_id (str): Voice to use (e.g., 'Joanna', 'Matthew', 'Amy')
output_format (str): Audio format ('mp3', 'ogg_vorbis', 'pcm')
engine (str): Engine type ('standard' or 'neural')
text_type (str): 'text' or 'ssml'
Returns:
bytes: Audio data if successful, None if failed
"""
# Prepare request
method = 'POST'
uri = '/v1/speech'
# Create request body
request_body = {
'Text': text,
'OutputFormat': output_format,
'VoiceId': voice_id,
'Engine': engine,
'TextType': text_type
}
payload = json.dumps(request_body)
# Get current time
now = datetime.now()
# amzdate = now.strftime('%Y%m%dT%H%M%SZ')
amzdate = f'{now.year}{_zero_pad(now.month)}{_zero_pad(now.day)}T{_zero_pad(now.hour)}{_zero_pad(now.minute)}{_zero_pad(now.second)}Z'
# datestamp = now.strftime('%Y%m%d')
datestamp = f'{now.year}{_zero_pad(now.month)}{_zero_pad(now.day)}'
# Create headers
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'Host': self.host,
'X-Amz-Date': amzdate,
'X-Amz-Target': 'AWSPollyService.SynthesizeSpeech'
}
# Create canonical request
canonical_request, signed_headers = self._create_canonical_request(
method, uri, '', headers, payload
)
# Create string to sign
credential_scope = f'{datestamp}/{self.region}/{self.service}/aws4_request'
string_to_sign = self._create_string_to_sign(amzdate, credential_scope, canonical_request)
# Create signature
signing_key = self._get_signature_key(datestamp)
signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
# Add authorization header
authorization_header = (
f'AWS4-HMAC-SHA256 '
f'Credential={self.access_key}/{credential_scope}, '
f'SignedHeaders={signed_headers}, '
f'Signature={signature}'
)
headers['Authorization'] = authorization_header
# Make request
try:
url = f'{self.endpoint}{uri}'
response = self._requests.post(url, headers=headers, data=payload)
if response.status_code == 200:
return response.content
else:
print(f"Error: HTTP {response.status_code}")
print(f"Response: {response.text}")
return None
except Exception as e:
print(f"Request failed: {e}")
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment