Last active
December 21, 2017 22:59
-
-
Save JensTimmerman/c123d5f6291e4cd542473241ce7bf4c9 to your computer and use it in GitHub Desktop.
Freeipa client with kerberos authentication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Copyright 2017-2017 Ghent University | |
# | |
# This file is part of vsc-freeipa, | |
# originally created by the HPC team of Ghent University (http://ugent.be/hpc/en), | |
# with support of Ghent University (http://ugent.be/hpc), | |
# the Flemish Supercomputer Centre (VSC) (https://www.vscentrum.be), | |
# the Flemish Research Foundation (FWO) (http://www.fwo.be/en) | |
# and the Department of Economy, Science and Innovation (EWI) (http://www.ewi-vlaanderen.be/en). | |
# | |
# https://github.ugent.be/hpcugent/vsc-freeipa | |
# | |
# vsc-freeipa is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU Library General Public License as | |
# published by the Free Software Foundation, either version 2 of | |
# the License, or (at your option) any later version. | |
# | |
# vsc-freeipa is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU Library General Public License for more details. | |
# | |
# You should have received a copy of the GNU Library General Public License | |
# along with vsc-freeipa. If not, see <http://www.gnu.org/licenses/>. | |
# | |
""" | |
A simple client for freeipa | |
Uses kerberos authentication | |
""" | |
import requests | |
import json | |
import logging | |
from requests_kerberos import HTTPKerberosAuth, REQUIRED | |
class FreeIPAClientException(Exception): | |
"""Exception thrown on a FreeIPA Error""" | |
pass | |
class FreeIPAClientEmptyModlistException(FreeIPAClientException): | |
"""Exception thrown when an empty modification list was requested""" | |
pass | |
class FreeIPAClient(object): | |
"""" | |
A python client to the freeipa rest api | |
Not a full client, methods get added on a per needed base. | |
Set the KRB5_KTNAME environment variable to point to your keytab to make the kerberos authentication work. | |
authentication with kerberos should be done before trying to use this client. | |
provide a valid ca cert if you are using your own ca's | |
""" | |
def __init__(self, url='https://myipaserver.example.com/ipa', user='sync_user', realm='MYREALM.EXAMPLE.COM', | |
ca=None, dry_run=False): | |
""" | |
Initialize the client | |
Supply a user with correct permissions (default sync_user). | |
""" | |
principal = '%s@%s' % (user, realm) | |
self.pid = 0 | |
self.url = url + '/json' | |
self.version = '4.4.0' | |
self.kerberos_auth = HTTPKerberosAuth(mutual_authentication=REQUIRED, principal=principal) | |
self.session = requests.session() | |
self.session.headers.update({ | |
'referer': url, | |
'Content-Type': 'application/json', | |
'Accept': 'application/json', | |
}) | |
self.ca = ca | |
self.dry_run = dry_run | |
def post(self, method, jsondata): | |
""" | |
post json data to a certain ipa method | |
method: method to post to | |
jsondata: a list of a list of arguments and a dictionary of options. | |
- example: | |
method: user-show | |
arguments: admin | |
optons: empty | |
you need to specify jsondata: | |
[ [ "admin" ], { "all": false, "no_members": false, "raw": false, "rights": false } ] | |
caveat: doesn't work with truncated results yet | |
""" | |
data = json.dumps({ | |
'method': method, | |
'params': jsondata, | |
'id': self.pid, | |
'version': self.version, | |
}) | |
logging.debug('performing post request to ipa: %s', data) | |
response = self.session.post(self.url, auth=self.kerberos_auth, data=data, verify=self.ca) | |
# Handle the error and warning messsages in a response | |
response = response.json() | |
logging.debug(response) | |
if response.get('error', False): | |
logging.error(response['error']['message']) | |
if response['error']['name'] == 'EmptyModlist': | |
raise FreeIPAClientEmptyModlistException(response['error']['message']) | |
raise FreeIPAClientException(response['error']['message']) | |
result = response['result'] | |
if 'summary' in result: | |
logging.info(result['summary']) | |
if 'messages' in result: | |
for message in result['messages']: | |
if message['type'] == 'warning': | |
logging.warning(message['message']) | |
else: | |
logging.info(message['message']) | |
logging.debug(message) | |
# TODO: repeat truncated results? | |
assert('truncated' not in result or not result['truncated']) | |
self.pid += 1 | |
return result['result'] | |
def user_find(self, username=""): | |
""" | |
Find all users | |
""" | |
return self.post('user_find', [[username], {}]) | |
def user_add(self, username, uid, givenname, surname, guidnumber, pubkey): | |
""" | |
Add a user by given username | |
params: | |
- uid integer | |
- givenname string | |
- surname string | |
- guidnumber integer | |
- pubkey comma seperated string of pubkeys | |
""" | |
result = self.post('user_add', [[username], { | |
'uidnumber': uid, # documentation would have you believe this is uid, but it expects uidnumber | |
'givenname': givenname, | |
'sn': surname, | |
'gidnumber': guidnumber, | |
'ipasshpubkey': pubkey, | |
}]) | |
return result | |
def user_mod(self, username, pubkey): | |
"""updates the pubkeys for a given user""" | |
try: | |
result = self.post('user_mod', [[username], { | |
'ipasshpubkey': pubkey, | |
}]) | |
except FreeIPAClientEmptyModlistException: | |
logging.info('user_mod: nothing modified for user %s', username) | |
return None | |
return result | |
def user_del(self, username): | |
"""Delete user with given username""" | |
result = self.post('user_del', [[username], {}]) | |
return result | |
def group_show(self, groupname): | |
"""get info for a given group""" | |
return self.post('group_show', [[groupname], {}]) | |
def group_add(self, groupname, gid): | |
"""add a group with given grouname and gid to ipa""" | |
return self.post('group_add', [[groupname], {'gidnumber': gid}]) | |
def group_add_member(self, groupname, users): | |
"""add a member or a list of members to a group with given grouname to ipa""" | |
if isinstance(users, list): | |
users = ",".join([str(x) for x in users]) | |
return self.post('group_add_member', [[groupname], {'user': users}]) | |
def group_remove_member(self, groupname, users): | |
"""remove a member or a list of members to a group with given grouname to ipa""" | |
if isinstance(users, list): | |
users = ",".join([str(x) for x in users]) | |
return self.post('group_remove_member', [[groupname], {'user': users}]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment