Skip to content

Instantly share code, notes, and snippets.

@elprup
Last active December 15, 2015 00:49
Show Gist options
  • Save elprup/5175504 to your computer and use it in GitHub Desktop.
Save elprup/5175504 to your computer and use it in GitHub Desktop.
simpler interface for boto iam
import boto.iam
import time
import urllib2
import logging
# connetc to iam
# default boto config file location: /etc/boto.cfg
conn = boto.connect_iam()
#setup logging
h = logging.StreamHandler()
h.setFormatter(logging.Formatter('%(message)s'))
logging.getLogger().addHandler(h)
logging.getLogger().setLevel(logging.INFO)
def exists(name):
''' check user existed'''
try:
u = conn.get_user(name)
return True
except:
return False
def adduser(name, password=None):
''' create user, return auth info'''
try:
conn.create_user(name)
logging.info('create user succeed')
except:
logging.warning('error when create user')
return
if password:
try:
conn.create_login_profile(name, password)
logging.info('password set succeed')
except:
logging.warning('password set error')
def deluser(name):
resp = conn.get_all_user_policies(name)
for p in resp['list_user_policies_response']['list_user_policies_result']['policy_names']:
conn.delete_user_policy(name, p)
try:
conn.delete_login_profile(name)
except:
pass
try:
conn.delete_user(name)
logging.info('delete user succeed.')
except:
logging.warning('user does not exist.')
def changepassword(name, password):
try:
resp = conn.update_login_profile(name, password)
logging.info('change password succeed')
except:
try:
resp = conn.create_login_profile(name, password)
logging.info('change password succeed')
except:
logging.warning('change password failed. user exist?')
def userinfo(name):
print 'userinfo',name
try:
resp = conn.get_all_user_policies(name)
for p in resp['list_user_policies_response']['list_user_policies_result']['policy_names']:
r = conn.get_user_policy(name, p)
content = r['get_user_policy_response']['get_user_policy_result']['policy_document']
content = urllib2.unquote(content.encode("utf8"))
pname = r['get_user_policy_response']['get_user_policy_result']['policy_name']
print pname, content
except:
logging.warning('user info get error')
def grant(username, policyname, actions, effect, resources):
''' please refer to http://awspolicygen.s3.amazonaws.com/policygen.html'''
if type(actions)==type([]):
actions = '[' + ','.join(["\"%s\"" % a for a in actions]) + ']'
if type(resources)==type([]):
resources = '[' + ','.join(["\"%s\"" % a for a in resources]) + ']'
policy_template = '''{
"Statement":[{
"Sid":"%(sid)s",
"Action":%(actions)s,
"Effect":"%(effect)s",
"Resource":%(resources)s
}]
}'''
policy_json = policy_template % {'actions':actions, 'effect': effect, 'resources': resources, 'sid': str(int(time.time()*1000))}
print policy_json
try:
resp = conn.put_user_policy(username, policyname, policy_json)
logging.info('set policy succeed')
except:
logging.warning('set policy failed')
def ungrant(username, policyname):
try:
resp = conn.delete_user_policy(username, policyname)
logging.info('delete policy succeed')
except:
logging.waring('delete policy failed')
# shortcuts
def newuploader(name, password):
adduser(name, password)
grant(name, '_'.join([name, str(int(time.time()*1000))]), '"s3:*"', 'Allow',['arn:aws:s3:::papaya-download', 'arn:aws:s3:::papaya-download/*'])
grant(name, '_'.join([name, str(int(time.time()*1000))]), ["s3:ListAllMyBuckets"], 'Allow',['arn:aws:s3:::*'])
userinfo(name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment