Skip to content

Instantly share code, notes, and snippets.

@squeaky-pl
Created October 21, 2015 13:05
Show Gist options
  • Save squeaky-pl/461a8f80912f4367a971 to your computer and use it in GitHub Desktop.
Save squeaky-pl/461a8f80912f4367a971 to your computer and use it in GitHub Desktop.
apns on gevent
import gevent.socket
import gevent.ssl
import struct
import json
import binascii
from functools import partial
class Frame(object):
def __init__(self, items):
self.items = items
def serialize(self):
serialized_items = b''.join(i.serialize() for i in self.items)
return struct.pack('!BL', 2, len(serialized_items)) + serialized_items
class Item(object):
def __init__(self, id_, data):
self.id_ = id_
self.data = data
def serialize(self):
return struct.pack('!BH', self.id_, len(self.data)) + self.data
DeviceToken = partial(Item, 1)
Payload = partial(Item, 2)
NotificationIdentifier = partial(Item, 3)
ExpirationDate = partial(Item, 4)
Priority = partial(Item, 5)
class JSONPayload(object):
def __init__(self, alert, badge=None, sound='default'):
self.alert = alert
self.badge = badge
self.sound = sound
def serialize(self):
payload = {'aps': {'alert': self.alert, 'sound': self.sound}}
if self.badge:
payload['aps']['badge'] = self.badge
return json.dumps(payload, separators=(',',':'), ensure_ascii=False).encode('utf-8')
def connect(key_file, cert_file):
plain_socket = gevent.socket.socket()
plain_socket.connect(('gateway.sandbox.push.apple.com', 2195))
ssl_socket = gevent.ssl.wrap_socket(plain_socket, key_file, cert_file)
return ssl_socket
device_ids = '743385a09a0dda88101ae22b050685caabd0a9933f1b1240145cbe48d336cfef,485d41160aabec2dd09c62bd13e398eb2f968bfc199d1b733c6be3620a7db7f7'
device_ids = [binascii.unhexlify(d) for d in device_ids.split(',')]
def main():
print('Connecting')
c = connect('key_nopass.pem', 'cert.pem')
print('Connected')
payload = JSONPayload('Kakita test!', 3).serialize()
for d in device_ids:
items = [
DeviceToken(d),
Payload(payload),
NotificationIdentifier(b'asdf'),
ExpirationDate(bytes(4)),
Priority(bytes([10]))]
serialized_frame = Frame(items).serialize()
print('Sending', serialized_frame)
c.sendall(serialized_frame)
print('Sent')
print('Waiting for data')
while 1:
command = c.read(1)
status = c.read(1)
print('Status', status)
identifier = c.read(4)
print('Identifier', identifier)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment