Last active
May 25, 2021 20:40
-
-
Save sandromello/83fcf780abe4eef53564 to your computer and use it in GitHub Desktop.
SOAP Requests Zimbra Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function info_request_xml($token){ | |
return <<<XML | |
<?xml version="1.0" encoding="UTF-8"?> | |
<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope"> | |
<soap:Header> | |
<context xmlns="urn:zimbra"> | |
<userAgent name="ZimbraConnectorForOutlook" version="8.0.6.1063" /> | |
<nonotify /> | |
<noqualify /> | |
<authToken>$token</authToken> | |
</context> | |
</soap:Header> | |
<soap:Body> | |
<GetInfoRequest xmlns="urn:zimbraAccount" sections="" /> | |
</soap:Body> | |
</soap:Envelope> | |
XML; | |
} | |
function getInfo($data, $zimbra_url){ | |
//open connection | |
$ch = curl_init($zimbra_url); | |
$headers = array('Content-Type: application/soap+xml'); | |
curl_setopt($ch, CURLOPT_POST, true); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); | |
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); | |
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); | |
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); | |
curl_setopt($ch, CURLOPT_POSTFIELDS, $data); | |
$result = curl_exec($ch); | |
$http_status = curl_getinfo($ch, CURLINFO_HTTP_CODE); | |
curl_close($ch); | |
return array( | |
"status" => $http_status, | |
"result" => $result | |
); | |
} | |
?> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools, ldap_func, httplib, ssl, urllib2, socket | |
import xml.etree.ElementTree as ET | |
#Redhat eliptic curve ssl bug: | |
#https://eucalyptus.atlassian.net/browse/EUCA-8317 | |
class HTTPSConnectionV3(httplib.HTTPSConnection): | |
def __init__(self, *args, **kwargs): | |
httplib.HTTPSConnection.__init__(self, *args, **kwargs) | |
def connect(self): | |
sock = socket.create_connection((self.host, self.port), self.timeout) | |
if self._tunnel_host: | |
self.sock = sock | |
self._tunnel() | |
try: | |
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_SSLv3) | |
except ssl.SSLError, e: | |
print("Trying SSLv3.") | |
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_SSLv23) | |
class HTTPSHandlerV3(urllib2.HTTPSHandler): | |
def https_open(self, req): | |
return self.do_open(HTTPSConnectionV3, req) | |
class Provisioning(object): | |
def __init__(self, config, logger): | |
self.url_api = config.get('main', 'url_api') | |
self.admin_user = config.get('main', 'admin_user') | |
self.admin_password = config.get('main', 'admin_password') | |
self.req_headers = { 'Content-Type': 'application/soap+xml' } | |
self.ignore_words = config.get('main', 'ignore_words').split(',') | |
urllib2.install_opener(urllib2.build_opener(HTTPSHandlerV3())) | |
self.logger = logger | |
try: | |
self.token = self.get_token() | |
except urllib2.URLError, e: | |
raise Exception('%s. Error getting the initial token, check the credentials in config file: %s' % (e.code, e.read())) | |
def generate_name(self, name, domain): | |
""" Generate all combinations of a given full name | |
:param name: A list of a full name. ['Sandro', 'Mello', 'Silva'] | |
""" | |
name = [n.lower() for n in name] | |
# Remove ignored words | |
for word in self.ignore_words: | |
try: | |
name.remove(word) | |
except Exception: | |
continue | |
permutations = itertools.permutations(name) | |
suggested_names = [] | |
for namep in permutations: | |
namep = str('.'.join(namep)) | |
namep = namep + '@' + domain | |
suggested_names.append(namep) | |
return suggested_names | |
@classmethod | |
def search_account(cls, account): | |
""" Search an account in the ldap base | |
:param account: The given account to search | |
""" | |
query = '(&(objectClass=zimbraAccount)(&(zimbraMailDeliveryAddress=%s)))' % account | |
for dn, entry in ldap_func.ldapQuery(ldap_func.getLdapCredentials(), query, []): | |
if entry['zimbraMailDeliveryAddress'][0] == account: | |
return True | |
raise Exception('Account does not exist') | |
@property | |
def token_xml(self): | |
return '<?xml version="1.0" ?><soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">\ | |
<soap:Header><context xmlns="urn:zimbra"><format type="xml"/></context></soap:Header><soap:Body><AuthRequest xmlns="urn:zimbraAdmin">\ | |
<account by="name">%s</account><password>%s</password></AuthRequest></soap:Body></soap:Envelope>' % (self.admin_user, self.admin_password) | |
def reset_password_xml(self, account, old_password, new_password): | |
return '<?xml version="1.0" ?><soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">\ | |
<soap:Header><context xmlns="urn:zimbra"><format type="xml"/><authToken>%s</authToken>\ | |
</context></soap:Header><soap:Body><ChangePasswordRequest xmlns="urn:zimbraAccount">\ | |
<account by="name">%s</account><oldPassword>%s</oldPassword>\ | |
<password>%s</password></ChangePasswordRequest></soap:Body></soap:Envelope>' % (self.token, account, old_password, new_password) | |
def create_account_xml(self, new_account, displayname, password): | |
return '<?xml version="1.0" ?><soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">\ | |
<soap:Header><context xmlns="urn:zimbra"><format type="xml"/><authToken>%s</authToken>\ | |
</context></soap:Header><soap:Body><CreateAccountRequest name="%s" password="%s" xmlns="urn:zimbraAdmin">\ | |
<a n="displayName">%s</a></CreateAccountRequest></soap:Body></soap:Envelope>' % (self.token, new_account, password, displayname) | |
def get_account_xml(self, account): | |
return '<?xml version="1.0" ?><soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">\ | |
<soap:Header><context xmlns="urn:zimbra"><format type="xml"/><authToken>%s</authToken>\ | |
</context></soap:Header><soap:Body><GetAccountRequest xmlns="urn:zimbraAdmin">\ | |
<account by="name">%s</account></GetAccountRequest></soap:Body></soap:Envelope>' % (self.token, account) | |
def set_password_xml(self, zimbraID, new_password): | |
return '<?xml version="1.0" ?><soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">\ | |
<soap:Header><context xmlns="urn:zimbra"><format type="xml"/><authToken>%s</authToken>\ | |
</context></soap:Header><soap:Body><SetPasswordRequest id="%s" newPassword="%s" xmlns="urn:zimbraAdmin">\ | |
</SetPasswordRequest></soap:Body></soap:Envelope>' % (self.token, zimbraID, new_password) | |
def get_token(self): | |
self.logger.warn('Getting token...') | |
req = urllib2.Request(self.url_api, self.token_xml, self.req_headers) | |
resp = urllib2.urlopen(req) | |
root = ET.fromstring(resp.read()) | |
token = root.find('.//{urn:zimbraAdmin}authToken').text | |
self.logger.warn('SUCCESS. Token: %s' % token) | |
return token | |
def create_account_request(self, account, displayname, password): | |
soap_request = self.create_account_xml(account, displayname, password) | |
req = urllib2.Request(self.url_api, soap_request, self.req_headers) | |
resp = urllib2.urlopen(req) | |
if resp.read(): | |
return True | |
def change_password_request(self, account, old_password, new_password): | |
""" Change the account password given the current password (old_password) """ | |
soap_request = self.reset_password_xml(account, old_password, new_password) | |
req = urllib2.Request(self.url_api, soap_request, self.req_headers) | |
resp = urllib2.urlopen(req) | |
if resp.read(): | |
return True | |
def get_account_zimbra_id(self, account): | |
""" Return the zimbra ID of the given account """ | |
self.logger.warn('Getting account zimbra id...') | |
soap_request = self.get_account_xml(account) | |
req = urllib2.Request(self.url_api, soap_request, self.req_headers) | |
resp = urllib2.urlopen(req) | |
root = ET.fromstring(resp.read()) | |
account_list = root.findall('.//{urn:zimbraAdmin}account') | |
return account_list[0].attrib['id'] | |
def set_password_request(self, account, new_password): | |
""" Set a new password for the given account """ | |
zimbraID = self.get_account_zimbra_id(account) | |
soap_request = self.set_password_xml(zimbraID, new_password) | |
req = urllib2.Request(self.url_api, soap_request, self.req_headers) | |
resp = urllib2.urlopen(req) | |
if resp.read(): | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment