Created
September 19, 2013 15:06
-
-
Save mjgp2/6624889 to your computer and use it in GitHub Desktop.
import.io python client library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Created on 19 Sep 2013 | |
@author: dev | |
''' | |
import requests, threading, logging, uuid, json | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class Query: | |
def __init__(self, callback): | |
self.jobsSpawned = 0 | |
self.jobsStarted = 0 | |
self.jobsCompleted = 0 | |
self.finished = False | |
self.callback = callback | |
def onMessage(self, data): | |
msgType = data["type"] | |
if msgType == u"SPAWN": | |
self.jobsSpawned+=1 | |
elif msgType == u"INIT" or msgType == u"START": | |
self.jobsStarted+=1 | |
elif msgType == u"STOP": | |
self.jobsCompleted+=1 | |
self.finished = self.jobsStarted is self.jobsCompleted and self.jobsSpawned + 1 is self.jobsStarted and self.jobsStarted > 0; | |
# if there is an error or the user is not authorised correctly then allow isFinished to return true by setting jobs to -1 | |
if msgType == u"ERROR" or msgType == u"UNAUTH" or msgType == u"CANCEL": | |
self.finished = True; | |
self.callback(data) | |
class IOClient: | |
def __init__(self, host, proxies={}): | |
self.host = host | |
self.proxies = proxies | |
self.headers = {"Content-Type":"application/json;charset=UTF-8"} | |
self.cookies = {} | |
self.msgId = 1 | |
self.clientId = None | |
self.url = "%s/query/comet/" % host | |
self.messagingChannel = u"/messaging" | |
self.queries = {} | |
def login(self, host, username, password): | |
post_body = {'username': username, 'password': password} | |
r = requests.post("%s/auth/login" % host, data=post_body, proxies=self.proxies) | |
if r.status_code is not 200: | |
raise Exception("Could not log in, code %s" % r.status_code) | |
self.cookies.update(r.cookies) | |
def request(self, channel, path="", data={}, throw=True): | |
# add in the common values | |
data["channel"] = channel | |
data["connectionType"] = "long-polling" | |
data["id"] = self.msgId | |
self.msgId += 1 | |
if self.clientId is not None: | |
data["clientId"] = self.clientId | |
url = "%s%s" % (self.url, path) | |
response = requests.post(url, data=json.dumps([data]), cookies=self.cookies, proxies=self.proxies, headers=self.headers) | |
if response.status_code != 200 : | |
raise Exception("Connect failed, status %s" % response.status_code) | |
for msg in response.json: | |
if "successful" in msg and msg["successful"] is not True : | |
msg = "Unsuccessful request: %s", msg | |
if throw: | |
raise Exception(msg) | |
else: | |
logger.warn(msg) | |
return response | |
def handshake(self): | |
handshake = self.request("/meta/handshake", path="handshake", data={"version":"1.0","minimumVersion":"0.9","supportedConnectionTypes":["long-polling"],"advice":{"timeout":60000,"interval":0}}) | |
self.clientId = handshake.json[0]["clientId"] | |
self.cookies.update(handshake.cookies) | |
def connect(self): | |
self.handshake() | |
self.request("/meta/subscribe", data={"subscription":self.messagingChannel}) | |
t = threading.Thread(target=self.poll, args=()) | |
t.daemon = True | |
t.start() | |
def poll(self): | |
while True: | |
response = self.request("/meta/connect", path="connect") | |
for msg in response.json: | |
if msg["channel"] != self.messagingChannel : continue | |
self.processMessage(msg["data"]) | |
def processMessage(self, data): | |
try: | |
reqId = data["requestId"] | |
query = self.queries[reqId] | |
query.onMessage(data) | |
if query.finished: del self.queries[reqId] | |
except: | |
logger.error("Error", exc_info=True) | |
def query(self, query, callback): | |
query["requestId"] = str(uuid.uuid4()) | |
self.queries[query["requestId"]] = Query(callback) | |
self.request("/service/query", data={ "data":query }) | |
if __name__ == "__main__": | |
import time | |
proxies = { "http":"192.168.56.1:8888"} | |
client = IOClient("http://query.import.io", proxies) | |
client.login("http://api.import.io", "xxx", "xxx") | |
client.connect() | |
client.query({"format":"JSON","input":{"query":"mac mini"},"connectorGuids":["39df3fe4-c716-478b-9b80-bdbee43bfbde"]}, lambda x: logger.critical("Got message! %s" % x)) | |
time.sleep(2000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment