Skip to content

Instantly share code, notes, and snippets.

@mjgp2
Created September 19, 2013 15:06
Show Gist options
  • Save mjgp2/6624889 to your computer and use it in GitHub Desktop.
Save mjgp2/6624889 to your computer and use it in GitHub Desktop.
import.io python client library
'''
Created on 19 Sep 2013
@author: dev
'''
import requests, threading, logging, uuid, json
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Query:
def __init__(self, callback):
self.jobsSpawned = 0
self.jobsStarted = 0
self.jobsCompleted = 0
self.finished = False
self.callback = callback
def onMessage(self, data):
msgType = data["type"]
if msgType == u"SPAWN":
self.jobsSpawned+=1
elif msgType == u"INIT" or msgType == u"START":
self.jobsStarted+=1
elif msgType == u"STOP":
self.jobsCompleted+=1
self.finished = self.jobsStarted is self.jobsCompleted and self.jobsSpawned + 1 is self.jobsStarted and self.jobsStarted > 0;
# if there is an error or the user is not authorised correctly then allow isFinished to return true by setting jobs to -1
if msgType == u"ERROR" or msgType == u"UNAUTH" or msgType == u"CANCEL":
self.finished = True;
self.callback(data)
class IOClient:
def __init__(self, host, proxies={}):
self.host = host
self.proxies = proxies
self.headers = {"Content-Type":"application/json;charset=UTF-8"}
self.cookies = {}
self.msgId = 1
self.clientId = None
self.url = "%s/query/comet/" % host
self.messagingChannel = u"/messaging"
self.queries = {}
def login(self, host, username, password):
post_body = {'username': username, 'password': password}
r = requests.post("%s/auth/login" % host, data=post_body, proxies=self.proxies)
if r.status_code is not 200:
raise Exception("Could not log in, code %s" % r.status_code)
self.cookies.update(r.cookies)
def request(self, channel, path="", data={}, throw=True):
# add in the common values
data["channel"] = channel
data["connectionType"] = "long-polling"
data["id"] = self.msgId
self.msgId += 1
if self.clientId is not None:
data["clientId"] = self.clientId
url = "%s%s" % (self.url, path)
response = requests.post(url, data=json.dumps([data]), cookies=self.cookies, proxies=self.proxies, headers=self.headers)
if response.status_code != 200 :
raise Exception("Connect failed, status %s" % response.status_code)
for msg in response.json:
if "successful" in msg and msg["successful"] is not True :
msg = "Unsuccessful request: %s", msg
if throw:
raise Exception(msg)
else:
logger.warn(msg)
return response
def handshake(self):
handshake = self.request("/meta/handshake", path="handshake", data={"version":"1.0","minimumVersion":"0.9","supportedConnectionTypes":["long-polling"],"advice":{"timeout":60000,"interval":0}})
self.clientId = handshake.json[0]["clientId"]
self.cookies.update(handshake.cookies)
def connect(self):
self.handshake()
self.request("/meta/subscribe", data={"subscription":self.messagingChannel})
t = threading.Thread(target=self.poll, args=())
t.daemon = True
t.start()
def poll(self):
while True:
response = self.request("/meta/connect", path="connect")
for msg in response.json:
if msg["channel"] != self.messagingChannel : continue
self.processMessage(msg["data"])
def processMessage(self, data):
try:
reqId = data["requestId"]
query = self.queries[reqId]
query.onMessage(data)
if query.finished: del self.queries[reqId]
except:
logger.error("Error", exc_info=True)
def query(self, query, callback):
query["requestId"] = str(uuid.uuid4())
self.queries[query["requestId"]] = Query(callback)
self.request("/service/query", data={ "data":query })
if __name__ == "__main__":
import time
proxies = { "http":"192.168.56.1:8888"}
client = IOClient("http://query.import.io", proxies)
client.login("http://api.import.io", "xxx", "xxx")
client.connect()
client.query({"format":"JSON","input":{"query":"mac mini"},"connectorGuids":["39df3fe4-c716-478b-9b80-bdbee43bfbde"]}, lambda x: logger.critical("Got message! %s" % x))
time.sleep(2000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment