Last active
May 16, 2019 02:24
-
-
Save hackprime/047320bbd5fef2408bd8b959010b1cae to your computer and use it in GitHub Desktop.
Simple wrapper class over FreeIPA client implementation from https://github.com/nordnet/python-freeipa-json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simple wrapper class over FreeIPA client implementation from | |
https://github.com/nordnet/python-freeipa-json | |
Usage | |
* Create user: | |
>>> manager = LDAPClient('ldap.example.com', 'admin', 'qw3rty12') | |
>>> user = manager.create_user('user1', 'systemadmins', 'asd1234567') | |
>>> print(user['uid']) | |
'user1' | |
* Update user's password: | |
>>> manager.update_user_password('user1', '1234wfggggh') | |
* Delete User: | |
>>> manager.delete_user('user1') | |
""" | |
from json import JSONDecodeError | |
import logging | |
import ipahttp | |
class LDAPError(Exception): | |
def __init__(self, error_data): | |
self.message = error_data.get('message') | |
self.code = error_data.get('code', 0) | |
self.name = error_data.get('name') | |
def __str__(self): | |
return '%d: %s' % (self.code, self.message) | |
class LDAPAccessDenied(Exception): | |
message = 'Unable to log in into LDAP server' | |
def __str__(self): | |
return self.message | |
class LDAPClient: | |
_connection = None | |
_never_expire_datetime = '20500101000000Z' | |
def __init__(self, host, username, password): | |
self._host = host | |
self._username = username | |
self._password = password | |
@property | |
def connection(self): | |
if self._connection is None: | |
self._connection = ipahttp.ipa(self._host, sslverify=True) | |
self._connection.login(self._username, self._password) | |
return self._connection | |
def request(self, method, *args, **kwargs): | |
try: | |
data = getattr(self.connection, method)(*args, **kwargs) | |
except JSONDecodeError as e: | |
if 'Unable to verify your Kerberos credentials' in e.doc: | |
raise LDAPAccessDenied() | |
logging.error(e.doc) | |
raise | |
if data['error'] is not None: | |
raise LDAPError(data['error']) | |
return data.get('result', {}).get('result') | |
def get_group(self, group_name): | |
return self.request('group_show', group_name) | |
def get_user(self, user_name): | |
return self.request('user_show', user_name) | |
def is_group_exist(self, group_name): | |
try: | |
self.get_group(group_name) | |
except LDAPError as e: | |
if e.code == 4001: | |
return False | |
raise | |
return True | |
def is_user_exist(self, user_name): | |
try: | |
self.get_user(user_name) | |
except LDAPError as e: | |
if e.code == 4001: | |
return False | |
raise | |
return True | |
def create_group(self, group_name): | |
return self.request( | |
'group_add', | |
group_name, | |
description=( | |
'Group "%s", created by XTREME-STARGATE API' % group_name)) | |
def create_user(self, user_name, group_name, password, public_keys=None): | |
group = self.is_group_exist(group_name) | |
opts = { | |
'givenname': user_name, | |
'sn': group_name, | |
'cn': '%s %s' % (group_name, user_name), | |
'userpassword': password, | |
'mail': '%[email protected]' % user_name, | |
'homedirectory': '/home/%s/%s' % (group_name, user_name), | |
} | |
if public_keys is not None: | |
opts['ipasshpubkey'] = public_keys | |
user_data = self.request('user_add', user_name, opts=opts) | |
self.request('group_add_member', group_name, user_name, 'user') | |
user_data['memberof_group'].append(group_name) | |
# NOTE: For some reason, ipahttp cannot set password expiration time | |
# during the user creation operation and password will expire | |
# instatntly on the first login. To prevent that - | |
# updating password expiration time on a separate request. | |
expiration_prolong_data = self.request( | |
'user_mod', | |
user_name, | |
setattrs=[ | |
'krbpasswordexpiration=%s' % self._never_expire_datetime]) | |
user_data['krbpasswordexpiration'] = expiration_prolong_data.get( | |
'krbpasswordexpiration') | |
return user_data | |
def update_user_public_keys(self, user_name, public_keys): | |
try: | |
return self.request( | |
'user_mod', | |
user_name, | |
setattrs=['ipasshpubkey=%s' % key for key in public_keys]) | |
except LDAPError as e: | |
# Skip non-critical error "no modifications to be performed" | |
if e.code == 4202: | |
return {'error': e} | |
raise | |
def delete_user_public_keys(self, user_name, public_keys): | |
try: | |
return self.request( | |
'user_mod', | |
user_name, | |
delattrs=['ipasshpubkey=%s' % key for key in public_keys]) | |
except LDAPError as e: | |
# Skip non-critical errors: | |
# 4026: "ipasshpubkey does not contain {value}" | |
# 3009: "invalid 'ipasshpubkey': No such attribute on this entry" | |
if e.code in (4026, 3009): | |
return {'error': e} | |
raise | |
def update_user_password(self, user_name, password): | |
user_data = self.request( | |
'user_mod', user_name, setattrs=['userpassword=%s' % password]) | |
expiration_prolong_data = self.request( | |
'user_mod', | |
user_name, | |
setattrs=[ | |
'krbpasswordexpiration=%s' % self._never_expire_datetime]) | |
user_data['krbpasswordexpiration'] = expiration_prolong_data.get( | |
'krbpasswordexpiration') | |
logging.error(user_data) | |
return user_data | |
def delete_user(self, user_name): | |
return self.request('user_del', user_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment