Last active
December 20, 2019 16:23
-
-
Save jtyr/611637d9dc7619340e31dbd3ecfc6fcc to your computer and use it in GitHub Desktop.
Script that allows to search, add or remove hostnames in phpIPAM.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import logging | |
import requests | |
import sys | |
# Disable SSL warnings | |
try: | |
from requests.packages.urllib3.exceptions import InsecureRequestWarning | |
except Exception: | |
from urllib3.exceptions import InsecureRequestWarning | |
class ApiError(Exception): | |
pass | |
class PhpIpam(): | |
def __init__(self, url, user, pwd, token, verify, log): | |
self.url = url | |
self.user = user | |
self.pwd = pwd | |
self.token = token | |
self.verify = verify | |
self.log = log | |
def _error(self, msg, code=1): | |
self.log.error(msg) | |
sys.exit(code) | |
def login(self): | |
self.log.debug("Logging in...") | |
if self.token is None: | |
try: | |
response = requests.post( | |
"%s/user/" % self.url, | |
verify=self.verify, | |
auth=requests.auth.HTTPBasicAuth(self.user, self.pwd)) | |
except Exception as e: | |
self._error("Cannot authenticate: %s" % str(e)) | |
data = response.json() | |
if ( | |
'data' not in data or | |
'token' not in data['data']): | |
self._error("No token found", 127) | |
self.token = data['data']['token'] | |
self.headers = { | |
'phpipam-token': self.token, | |
} | |
def get_subnet_id(self, subnet): | |
self.log.debug("Getting subnet ID...") | |
try: | |
response = requests.get( | |
"%s/subnets/search/%s/" % (self.url, subnet), | |
verify=self.verify, | |
headers=self.headers) | |
except Exception as e: | |
self._error("Cannot find subnet %s: %s" % (subnet, str(e)), 127) | |
data = response.json() | |
if ( | |
'data' not in data or | |
len(data['data']) == 0): | |
self._error("No subnet ID found", 127) | |
self.log.debug("Subnet ID: %s" % data['data'][0]['id']) | |
return data['data'][0]['id'] | |
def get_subnet_name(self, subnet_id): | |
self.log.debug("Getting subnet name...") | |
try: | |
response = requests.get( | |
"%s/subnets/%s/" % (self.url, subnet_id), | |
verify=self.verify, | |
headers=self.headers) | |
except Exception as e: | |
self._error( | |
"Cannot find subnet ID %s: %s" % (subnet_id, str(e)), 127) | |
data = response.json() | |
if 'data' not in data: | |
self._error("No subnet name found", 127) | |
subnet = "%s/%s" % ( | |
data['data']['subnet'], | |
data['data']['mask']) | |
self.log.debug('Subnet name for %s: %s' % (subnet_id, subnet)) | |
return subnet | |
def get_hostname_ip(self, hostname, subnet_id): | |
self.log.debug("Getting hostname IP...") | |
try: | |
response = requests.get( | |
"%s/addresses/search_hostname/%s/" % (self.url, hostname), | |
verify=self.verify, | |
headers=self.headers) | |
except Exception as e: | |
self._error( | |
"Cannot find hostname %s (subnet_id=%s): %s" % ( | |
hostname, subnet_id, str(e)), 127) | |
data = response.json() | |
ret = None | |
if 'data' in data and len(data['data']) > 0: | |
for h in data['data']: | |
if ( | |
'subnetId' in h and | |
h['subnetId'] == subnet_id and | |
'ip' in h): | |
ret = h['ip'] | |
break | |
self.log.debug("Hostname IP: %s" % ret) | |
return ret | |
def add_hostname(self, hostname, subnet_id): | |
self.log.debug("Adding hostname into the subnet...") | |
try: | |
response = requests.post( | |
url="%s/addresses/first_free/%s/" % (self.url, subnet_id), | |
data={ | |
'hostname': hostname, | |
}, | |
verify=self.verify, | |
headers=self.headers) | |
except Exception as e: | |
self._error( | |
"Cannot add new hostname %s (subnet_id=%s): %s" % ( | |
hostname, subnet_id, str(e))) | |
data = response.json() | |
if 'data' not in data: | |
self._error( | |
'Invalid response while creating record for %s' % hostname) | |
self.log.debug("New hostname IP: %s" % data['data']) | |
return data['data'] | |
def search_hostname(self, hostname, subnet_id=None): | |
self.log.debug("Searching for hostname...") | |
try: | |
response = requests.get( | |
url="%s/addresses/search_hostname/%s/" % (self.url, hostname), | |
verify=self.verify, | |
headers=self.headers) | |
except Exception as e: | |
self._error( | |
"Cannot find hostname %s: %s" % (hostname, str(e)), 127) | |
data = response.json() | |
if 'data' not in data: | |
self._error("Cannot find hostname %s" % hostname, 127) | |
ret = [] | |
i = 0 | |
for h in data['data']: | |
process_me = False | |
if subnet_id is not None: | |
# Filter only hostnames from given subnet | |
if h['subnetId'] == subnet_id: | |
ret.append(h['id']) | |
process_me = True | |
else: | |
ret.append(h['id']) | |
process_me = True | |
if process_me: | |
i += 1 | |
subnet = self.get_subnet_name(h['subnetId']) | |
print("%d: Hostname IP: %s" % (i, h['ip'])) | |
print(" Subnet: %s" % subnet) | |
print(" Subnet ID: %s" % h['subnetId']) | |
return ret | |
def remove_hostname(self, hostname_id): | |
self.log.debug("Removing hostname...") | |
try: | |
requests.delete( | |
url="%s/addresses/%s/" % (self.url, hostname_id), | |
verify=self.verify, | |
headers=self.headers) | |
except Exception as e: | |
self._error( | |
"Cannot remove hostname ID %s: %s" % (hostname_id, str(e))) | |
def parse_args(): | |
# Main parser | |
parser = argparse.ArgumentParser( | |
description='Search, add or remove hostname from phpIPAM.') | |
parser.add_argument( | |
'-U', '--url', | |
required=True, | |
help='URL for the API application (e.g. https://server/api/app_id).') | |
parser.add_argument( | |
'-u', '--user', | |
help='user name used to login to the API') | |
parser.add_argument( | |
'-p', '--password', | |
help='password used to login to the API') | |
parser.add_argument( | |
'-t', '--token', | |
help='token used to login to the API') | |
parser.add_argument( | |
'-s', '--skipssl', | |
action='store_true', | |
help='skip SSL cert verification') | |
parser.add_argument( | |
'-d', '--debug', | |
action='store_true', | |
help='show debug messages') | |
subparsers = parser.add_subparsers(metavar='TYPE') | |
subparsers.required = True | |
# Hostname parser | |
parser_hostname = subparsers.add_parser( | |
'hostname', | |
help='Hostname actions.', | |
) | |
parser_hostname.set_defaults(type='hostname') | |
subparsers_hostname = parser_hostname.add_subparsers( | |
metavar='HOSTNAME_ACTION') | |
subparsers_hostname.required = True | |
# Hostname search parser | |
parser_hostname_search = subparsers_hostname.add_parser( | |
'search', | |
help='Search for hostname.') | |
parser_hostname_search.add_argument( | |
'hostname', | |
help='Hostname to search for.') | |
parser_hostname_search.add_argument( | |
'subnet', | |
nargs='?', | |
help='Subnet in which to search.') | |
parser_hostname_search.set_defaults(action='search') | |
# Hostname add parser | |
parser_hostname_add = subparsers_hostname.add_parser( | |
'add', | |
help='Add hostname into subnet.') | |
parser_hostname_add.add_argument( | |
'hostname', | |
help='Hostname to add.') | |
parser_hostname_add.add_argument( | |
'subnet', | |
help='Subnet into which to add.') | |
parser_hostname_add.set_defaults(action='add') | |
# Hostname remove parser | |
parser_hostname_remove = subparsers_hostname.add_parser( | |
'remove', | |
help='Remove hostname.') | |
parser_hostname_remove.add_argument( | |
'hostname', | |
help='Hostname to remove.') | |
parser_hostname_remove.add_argument( | |
'subnet', | |
nargs='?', | |
help='Subnet from which to remove.') | |
parser_hostname_remove.set_defaults(action='remove') | |
return parser, parser.parse_args() | |
def main(): | |
# Read command line arguments | |
parser, args = parse_args() | |
# Setup logger | |
format = '%(levelname)s: %(message)s' | |
log_level = logging.ERROR | |
if args.debug: | |
log_level = logging.DEBUG | |
logging.getLogger('urllib3').setLevel(logging.WARNING) | |
logging.basicConfig(level=log_level, format=format) | |
log = logging.getLogger(__name__) | |
# Disable SSL warnings | |
requests.packages.urllib3.disable_warnings() | |
# Check input parameters | |
if 'action' not in args: | |
log.error('No action specified!') | |
parser.print_help() | |
sys.exit(1) | |
# Check if we have required credentials | |
if ( | |
args.token is None and ( | |
args.user is None or | |
args.password is None)): | |
log.error("Either token OR user and password is required.") | |
parser.print_help() | |
sys.exit(1) | |
# Login to the API | |
ipam = PhpIpam( | |
args.url, | |
args.user, | |
args.password, | |
args.token, | |
not args.skipssl, | |
log) | |
ipam.login() | |
if args.type == 'hostname': | |
# Search, add or remove hostname | |
if args.action == 'search': | |
subnet_id = None | |
if args.subnet is not None: | |
subnet_id = ipam.get_subnet_id(args.subnet) | |
ipam.search_hostname(args.hostname, subnet_id) | |
elif args.action == 'add': | |
subnet_id = ipam.get_subnet_id(args.subnet) | |
hostname_ip = ipam.get_hostname_ip(args.hostname, subnet_id) | |
if hostname_ip is None: | |
hostname_ip = ipam.add_hostname(args.hostname, subnet_id) | |
print(hostname_ip) | |
elif args.action == 'remove': | |
subnet_id = None | |
if args.subnet is not None: | |
subnet_id = ipam.get_subnet_id(args.subnet) | |
hostname_ids = ipam.search_hostname(args.hostname, subnet_id) | |
for hostname_id in hostname_ids: | |
ipam.remove_hostname(hostname_id) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment