Created
July 3, 2016 04:42
-
-
Save syakesaba/a024eae8ae43817eb9ef19b84efb50d0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
import requests # pip install requests | |
import logging | |
import json | |
from urlparse import urlparse | |
discover_prefix = "https://lyncdiscover.{0}/" # .format(sip_domain) | |
discover_prefix_internal = "https://lyncdiscoverinternal.{0}/" # .format(sip_domain) | |
class SkypeForBussinessSession(requests.Session): | |
def __init__(self, sip_domain, login_domain, login_username, login_password, proxy_url=None, ssl_check=False): | |
# def __init__(self, sip_domain, sip_username, sip_password, login_domain, login_username, login_password, proxy_url=None, ssl_check=False): | |
super(SkypeForBussinessSession, self).__init__() | |
self.sip_domain = sip_domain | |
#self.sip_username = sip_username | |
#self.sip_password = sip_password | |
self.login_domain = login_domain | |
self.login_username = login_username | |
self.login_password = login_password | |
self.proxies = None | |
self.verify = False | |
if proxy_url: | |
self.proxies = {"http":proxy_url,"https":proxy_url} | |
if ssl_check: | |
self.verify = bool(ssl_check) | |
self.initialize() | |
def initialize(self): | |
self.rootURL = self.lyncdiscover() | |
print self.rootURL | |
self.authURL = self.getAuthURL() | |
print self.authURL | |
self.auth_data = {'grant_type':'password', 'username':self.login_username + "@" + self.login_domain,'password':self.login_password} | |
print self.auth_data | |
self.access_token = self.getAuthToken() | |
print self.access_token | |
self.auth_headers = {'Authorization':"{0} {1}".format(self.access_token['token_type'],self.access_token['access_token'])} | |
print self.auth_headers | |
self.applications_url = self.getApplicationURL() | |
print self.applications_url | |
self.application_data = {'culture':'en-us','endpointId':'1235637','userAgent':'pythonApp/1.0 (CritterOS)'} | |
print self.application_data | |
self.apps,self.application_base = self.createApplicationEndPoint() | |
print self.apps | |
print self.application_base | |
self.displayContacts() | |
self.messagingInvite() | |
def lyncdiscover(self): | |
""" | |
Phase1 | |
""" | |
discover_url = discover_prefix.format(self.sip_domain) | |
discover_url_internal = discover_prefix_internal.format(self.sip_domain) | |
#test external | |
logging.info("Phase1: Discover Lync URL # GET {}".format(discover_url)) | |
try: | |
response = self.get(discover_url, proxies=self.proxies, verify=self.verify) | |
logging.info("Phase1: RESPONSE STATUS # {}".format(response.status_code)) | |
if response.status_code == 200: | |
data = response.json() | |
lync_root = data['_links']['user']['href'] | |
return lync_root | |
except: | |
logging.info("LyncDiscover failed! trying LyncDiscoverInternal URL...") | |
#test external | |
logging.info("Phase1: Discover Lync URL Internal # GET {}".format(discover_url_internal)) | |
response = self.get(discover_url_internal, proxies=self.proxies, verify=self.verify) | |
logging.info("Phase1: RESPONSE STATUS # {}".format(response.status_code)) | |
if response.status_code == 200: | |
data = response.json() | |
lync_root = data['_links']['user']['href'] | |
return lync_root | |
def getAuthURL(self): | |
""" | |
Phase2 | |
""" | |
logging.info("Phase2: Get Authentication URL # GET {}".format(self.rootURL)) | |
response = self.get(self.rootURL, proxies=self.proxies, verify=self.verify) | |
logging.info("Phase2: RESPONSE STATUS # {}".format(response.status_code)) | |
auth_url = self._extractAuthURL(response.headers['www-authenticate']) | |
return auth_url | |
def getAuthToken(self): | |
""" | |
Phase3 | |
""" | |
logging.info("Phase3: Get Auth Token # POST {}".format(self.authURL)) | |
logging.info("Phase3: AuthDATA: {}".format(str(self.auth_data))) | |
response = requests.post(self.authURL, data=self.auth_data, proxies=self.proxies, verify=self.verify) | |
logging.info("Phase3: RESPONSE STATUS # {}".format(response.status_code)) | |
#if response.status_code | |
access_token = response.json() | |
return access_token | |
def getApplicationURL(self): | |
""" | |
Phase4 | |
""" | |
logging.info("Phase4: Get Application URL # GET {}".format(self.rootURL)) | |
logging.info("Phase4: AuthHeader: {}".format(self.auth_headers)) | |
response = self.get(self.rootURL, headers=self.auth_headers, proxies=self.proxies, verify=self.verify) | |
logging.info("Phase4: RESPONSE STATUS # {}".format(response.status_code)) | |
data =response.json() | |
applications_url = data['_links']['applications']['href'] | |
return applications_url | |
def createApplicationEndPoint(self): | |
""" | |
Phase5 | |
""" | |
logging.info("Phase5: Create Application # POST {}".format(self.applications_url)) | |
logging.info("Phase5: AuthHeader: {}".format(self.auth_headers)) | |
logging.info("Phase5: ApplicationData: {}".format(str(self.application_data))) | |
additional_headers = {'content-type': 'application/json'} | |
logging.info("Phase5: AdditioanlHeader: {}".format(additional_headers)) | |
additional_headers.update(self.auth_headers) | |
response = requests.post(self.applications_url,headers=additional_headers,data=json.dumps(self.application_data),proxies=self.proxies, verify=self.verify) | |
logging.info("Phase5: RESPONSE STATUS # {}".format(response.status_code)) | |
apps = response.json() | |
up = urlparse(self.applications_url) | |
application_base = "{0}://{1}".format(up.scheme,up.netloc) | |
return (apps, application_base) | |
def displayContacts(self): | |
contacts_url = self.application_base + self.apps['_embedded']['people']['_links']['myContacts']['href'] | |
response = self.get(contacts_url, headers=self.auth_headers, proxies=self.proxies, verify=self.verify) | |
for contact in response.json()['_embedded']['contact']: | |
print "Name " + contact['name'] | |
def messagingInvite(self): | |
print self.apps.keys() | |
event_url = self.application_base + self.apps["_links"]["events"]["href"] | |
response = self.get(event_url, headers=self.auth_headers, proxies=self.proxies, verify=self.verify) | |
print response.text | |
im_url = self.application_base + self.apps["_embedded"]["communication"]["_links"]["startMessaging"]["href"] | |
data = {"importance":"Normal","sessionContext":"testSession","subject":"test script","telemetryId":None,"to":"sip:***********","operationId":"testOp"} | |
response = self.post(im_url, headers=self.auth_headers, proxies=self.proxies, verify=self.verify,data=json.dumps(data)) | |
print response.status_code | |
print response.text | |
def _extractAuthURL(self, s): | |
start = s.find('MsRtcOAuth'); | |
q1 = s.find('"',start); | |
q2 = s.find('"',q1+1); | |
if q1 == -1 or q2 == -1: | |
raise Exception("cannot find auth string") | |
return s[q1+1:q2] | |
session = SkypeForBussinessSession( \ | |
sip_domain="*************", login_domain="****************", | |
login_username="************", login_password="**********", | |
proxy_url=None, ssl_check=False) | |
import code | |
code.interact(local=locals()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment