Skip to content

Instantly share code, notes, and snippets.

@teeler
Created July 12, 2012 01:13
Show Gist options
  • Select an option

  • Save teeler/3094975 to your computer and use it in GitHub Desktop.

Select an option

Save teeler/3094975 to your computer and use it in GitHub Desktop.
synapse python client
import base64
import hashlib
import hmac
import jsonlib2
import requests
import sys
import time
class SynapseLoginException(Exception):
pass
class SynapseNotLoggedInException(Exception):
pass
class SynapseNoResourceException(Exception):
pass
class SynapseClient:
"""SynapsClient base library
Useful for making a very limited subset of calls to the Synapse API.
Currently supported is:
* logging in
* accessing a resource via a resource token.
Enjoy.
"""
BASE = "https://auth-%s.sagebase.org"
LOGIN = "/auth/v1/session"
RESOURCE_SESSION = "/auth/v1/resourceSession"
SECRET_KEY = "/auth/v1/secretKey"
def __init__(self, username=None, password=None, env="prod"):
self.session_token = None
self.secret_key = None
self.env = env
# Load credentials if i have them.
netrc = requests.utils.get_netrc_auth(self._synapse_env())
if netrc:
self.username = netrc[0]
self.password = netrc[1]
# Override if you gave me credentials
if username:
self.username = username
if password:
self.password = password
self.Login()
def _synapse_env(self):
return SynapseClient.BASE % self.env
def _synapse_headers(self, url, hdrs, sign):
"""Generate headers for sending to synapse."""
headers = { "Content-Type": "application/json",
"Accept": "application/json" }
if sign:
if not self.secretkey:
raise SynapseNotLoggedInException()
sig_timestamp = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime())
sig_data = self.username + url + sig_timestamp
signature = base64.b64encode(
hmac.new(self.secretkey, sig_data, hashlib.sha1).digest())
sig = { "userId": self.username,
"signatureTimestamp": sig_timestamp,
"signature": signature }
headers.update(sig)
headers.update(hdrs)
return headers
def _synapse_post(self, url, data=None, hdrs={}, sign=False):
"""Post some data to synapse"""
headers = self._synapse_headers(url, hdrs, sign)
base_env_url = self._synapse_env()
post_url = base_env_url + url
return requests.post(post_url, data=jsonlib2.dumps(data), headers=headers)
def _synapse_get(self, url, params=None, hdrs={}, sign=False):
"""Get some data to synapse"""
base_env_url = self._synapse_env()
get_url = base_env_url + url
headers = self._synapse_headers(url, hdrs, sign)
return requests.get(get_url, headers=headers)
def GetResource(self, resource_token):
"""Given a resource_token, try and retrieve that resource."""
url = "%s/%s" % (SynapseClient.RESOURCE_SESSION, resource_token)
resource = self._synapse_get(url, sign=True)
if resource.status_code != 200:
raise SynapseNoResourceException(resource.content)
resource_data = jsonlib2.loads(resource.content)
return resource_data['userData']
def Login(self):
"""Login, get secret key, all that."""
data = { "email": self.username,
"password": self.password }
login_response = self._synapse_post(SynapseClient.LOGIN,
data=data)
if login_response.status_code != 201:
raise SynapseLoginException(login_response.content)
login_data = jsonlib2.loads(login_response.content)
self.session_token = login_data['sessionToken']
gsk_response = self._synapse_get(SynapseClient.SECRET_KEY,
hdrs={"sessionToken": self.session_token})
if gsk_response.status_code != 200:
raise SynapseLoginException(gsk_response.content)
secretkey_data = jsonlib2.loads(gsk_response.content)
self.secretkey = base64.b64decode(secretkey_data['secretKey'])
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment