Created
November 6, 2017 11:29
-
-
Save MagerValp/d6e2d04e3566b6a249d6f1fb4d9f34b6 to your computer and use it in GitHub Desktop.
Helper class for directory service lookups
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Directory Services helper class.""" | |
from __future__ import unicode_literals | |
from __future__ import print_function | |
from __future__ import division | |
from OpenDirectory import ODSession, ODNode, ODQuery, kODRecordTypeUsers, kODAttributeTypeRecordName, kODAttributeTypeStandardOnly, kODMatchEqualTo, kODRecordTypeGroups, kODRecordTypeUsers | |
__all__ = ["DSHelper", "DSHelperError"] | |
class DSHelperError(BaseException): | |
pass | |
class DSHelper(object): | |
"""Wrapper for Directory Services.""" | |
def __init__(self): | |
super(DSHelper, self).__init__() | |
self.odsession = ODSession.defaultSession() | |
def get_node(self, nodename): | |
node, error = ODNode.nodeWithSession_name_error_(self.odsession, nodename, None) | |
if node is None: | |
raise DSHelperError("Couldn't open {} node: {}".format(nodename, | |
error.localizedFailureReason())) | |
return node | |
def get_search_node(self): | |
return self.get_node("Search") | |
def find_groups_named(self, groupname, node=None): | |
"""Look up a group name and return an array of group records.""" | |
if node is None: | |
node = self.get_search_node() | |
odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node, | |
kODRecordTypeGroups, | |
kODAttributeTypeRecordName, | |
kODMatchEqualTo, | |
groupname, | |
kODAttributeTypeStandardOnly, | |
0, | |
None) | |
if odquery is None: | |
raise DSHelperError("Couldn't query {}: {}".format(node.nodeName, | |
error.localizedFailureReason())) | |
result, error = odquery.resultsAllowingPartial_error_(False, None) | |
if result is None: | |
raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason())) | |
return result | |
def find_users_named(self, username, node=None): | |
"""Look up a user name and return an array of user records.""" | |
if node is None: | |
node = self.get_search_node() | |
odquery, error = ODQuery.queryWithNode_forRecordTypes_attribute_matchType_queryValues_returnAttributes_maximumResults_error_( node, | |
kODRecordTypeUsers, | |
kODAttributeTypeRecordName, | |
kODMatchEqualTo, | |
username, | |
kODAttributeTypeStandardOnly, | |
0, | |
None) | |
if odquery is None: | |
raise DSHelperError("Couldn't query {}: {}".format(node.nodeName, | |
error.localizedFailureReason())) | |
result, error = odquery.resultsAllowingPartial_error_(False, None) | |
if result is None: | |
raise DSHelperError("Couldn't retrieve query results: {}".format(error.localizedFailureReason())) | |
return result | |
def add_user_to_group(self, user, group): | |
result, error = group.addMemberRecord_error_(user, None) | |
if not result: | |
if error: | |
error_msg = ": " + error.localizedFailureReason() | |
else: | |
error_msg = "" | |
raise DSHelperError("Couldn't add {} to {}{}".format(user.recordName, | |
group.recordName, | |
error_msg)) | |
def remove_user_from_group(self, user, group): | |
result, error = group.removeMemberRecord_error_(user, None) | |
if not result: | |
if error: | |
error_msg = ": " + error.localizedFailureReason() | |
else: | |
error_msg = "" | |
raise DSHelperError("Couldn't remove {} from {}{}" % (user.recordName, | |
group.recordName, | |
error_msg)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment