Use these files to interact with arcgis server rest api. Lots of functionality exposed via REST. Good for automating tasks in python.
# -*- coding: iso-8859-1 -*-
ArcGIS Server Module
__author__='Ed Briggler, Jason Tipton'
import requests, sys, os, logging
from conf import settings
class Endpoint(object):
Endpoints: ArcGIS for Server
def __init__(self, base_url):
''' init all our endpoints '''
self.BASE_URL = base_url
self.ADMIN = '{}/arcgis/admin'.format(base_url)
self.VALIDATE_ITEM_DATA = '{}/data/validateDataItem'.format(self.ADMIN)
self.REGISTER_DATA_ITEM = '{}/data/registerItem'.format(self.ADMIN)
self.TOKEN = '{}/arcgis/admin/generateToken'.format(base_url)
self._USER_URL = '{}/arcgis/admin/security/users'.format(base_url)
self._ROLE_URL = '{}/arcgis/admin/security/roles'.format(base_url)
self._SERVICE_URL = '{}/arcgis/admin/services'.format(base_url)
self._REST_SRVCS_URL = '{}/arcgis/rest/services'.format(base_url)
self.ADD_USER = '{}/add'.format(self._USER_URL)
self.GET_USERS = '{}/getUsers'.format(self._USER_URL)
self.SEARCH_USER = '{}/search'.format(self._USER_URL)
self.REMOVE_USER = '{}/remove'.format(self._USER_URL)
self.UPDATE_USER = '{}/update'.format(self._USER_URL)
self.ADD_ROLE = '{}/assignRoles'.format(self._USER_URL)
self.GET_ROLE = '{}/getRoles'.format(self._USER_URL)
self.REMOVE_ROLE = '{}/removeRoles'.format(self._USER_URL)
self.GET_PRIVILEGE = '{}/getPrivilege'.format(self._USER_URL)
self.GET_ROLES_FOR_USER = '{}/getRolesForUser'.format(self._ROLE_URL)
self.ADD_USER_TO_ROLE = '{}/addUsersToRole'.format(self._ROLE_URL)
self.ASSIGN_PRIVILEGE = '{}/assignPrivilege'.format(self._ROLE_URL)
self.COUNT_ERROR = '{}/arcgis/arcgis/admin/logs/countErrorReports'.format(base_url)
self.SYSTEM = '{}/System'.format(self._REST_SRVCS_URL)
class Manager(object):
inherit from this class to communicate with an arcgis server
resource docs for ESRI:
def __init__(self,url=None, proxies=False, username=None, password=None):
self.username = username
self.password = password
self.cookies = None
init method
disable urllib3 warnings because we know what we are doing
setup proxies and get our token so we can call the api's
self.token = None
if url is not None:
self._endpoints = Endpoint(url) # create endpoints
if proxies:
self.proxies = settings.config['proxies']
elif not proxies:
self.proxies = {
self.proxies = proxies
if username:
self._get_token(username, password)
def _get_token(self,username, password):
''' gets a token from arcgis server '''
# payload
params = {'username': username, 'password': password, 'client': 'requestip', 'f': 'json'}
# Read response
response =, data=params, proxies=self.proxies, verify=False )
self.cookies = response.cookies
data = self.checkResponse(response)
self.token = data['token']
def checkResponse(self, req):
''' using this to check for errors, will raise, otherwise, will return json '''
return req.json()
def _post(self, r_url, r_params={}, files={}):
''' do post with payload, always append format and token '''
if r_params.has_key('f') == False:
r_params['f'] = 'json'
r_params['token'] = self.token
response =, data=r_params, proxies=self.proxies, verify=False, files=files) #, cookies=self.cookies)
response = self.checkResponse(response)
if response.has_key('status'):
if response.has_key('code'):
if response['code'] == 498:
from pprint import pprint
#print('Invalid Token: Requesting a new token')
## Invalid Token, request a new and try again
self._get_token(self.username, self.password)
r_params['token'] = self.token
response = self._post(r_url, r_params, files)
#response =, data=r_params, proxies=self.proxies, verify=False, files=files)
#response = self.checkResponse(response)
return response
class GP(Manager):
Interacts with the Geoprocessors on the server
def __init__(self, url = None, proxies = False, username = None, password = None):
return super(GP, self).__init__(url, proxies, username, password)
def submit_job(self,workspace,service_name, task, params):
url = '{}/{}/{}/GPServer/{}/submitJob'.format(self._endpoints._REST_SRVCS_URL,workspace,service_name,task, params)
#print url
response = self._post(url,params)
from pprint import pprint
return response
def get_result(self,workspace,service_name, task, job_obj, param_name):
url = '{}/{}/{}/GPServer/{}/jobs/{}/{}'.format(self._endpoints._REST_SRVCS_URL,workspace,service_name,task, job_obj['jobId'], job_obj['results'][param_name].values()[0] )
response = self._post(url)
##print response
return response
def is_job_complete(self,workspace,service_name, task, id):
js = self.job_status(workspace,service_name, task, id)
##print js
if js.has_key('error') == False:
if js['jobStatus'] == "esriJobExecuting" or js['jobStatus'] == "esriJobSubmitted":
return False
else: return True
def wait_for_job_completion(self,workspace,service_name, task, id, wait_time=1):
from time import sleep
js = self.job_status(workspace,service_name, task, id)
#print js
if js.has_key('error') == False:
##print js['jobStatus']
while js['jobStatus'] == "esriJobExecuting" or js['jobStatus'] == "esriJobSubmitted":
# Need to find a better way
js = self.job_status(workspace,service_name, task, id)
if isinstance(js,dict) == False:
js = self.job_status(workspace,service_name, task, id)
elif js.has_key('jobStatus') == False:
js = self.job_status(workspace,service_name, task, id)
##print js['jobStatus']
#print sys.exc_info()[0]
#print js['jobStatus']
return js
def job_status(self,workspace,service_name, task, id):
url = '{}/{}/{}/GPServer/{}/jobs/{}'.format(self._endpoints._REST_SRVCS_URL,workspace,service_name,task, id)
##print url
response = self._post(url)
return response
# System/PublishingTools/GPServer/Get%20Database%20Connection%20String/submitJob
def submit_report(self,task, svc, folder_name=None, svc_type='MapServer'):
This was created for getting a report on caching status. Not sure what else it can be used for
report_url = '{}/ReportingTools/GPServer'.format(self._endpoints.SYSTEM)
svc_url = ''
if folder_name != None:
svc_url = folder_name + '/'
svc_url += '{}:{}'.format(svc,svc_type)
url = '{report}/{task}/execute'.format(report=report_url, task=task)
params = {'service_url': svc_url}
response = self._post(url, params)
return response
class Admin(Manager):
def move_site(self, path_from, path_to):
Moves site to new location. Moves configstore and directories that match path_from.
Also copies cache directories
self.move_configstore(path_from, path_to)
self.move_directories(path_from, path_to)
def get_directories(self):
""" Gets the server directories. Ex: Cache, Jobs, Output... """
url = '{}/arcgis/admin/system/directories'.format(self._endpoints.BASE_URL)
response = self._post(url)
return response
def copy_directory_contents(self, path_from, path_to, async=True, silent=True):
""" Copies contents from "from" folder to "to" folder. Useful for Caches"""
import subprocess
# /e Copy all subdirectories
# /XD ".*" Exclude directories that being with ".". Ex: ".site"
silent_str = ''
if silent == True:
silent_str = '/NFL /NDL /NJH /NJS /nc /ns /np'
cmd = 'robocopy {path_from} {path_to} {silent_str} /e /XD ".*" '.format(path_from=path_from, path_to=path_to, silent_str=silent_str)
result = ''
if async == True:
result = subprocess.Popen(cmd)
result =
return result
def move_directories(self, path_from, path_to, copy_cache=True):
""" Moves all directories that match the from path. If copy_cache=True, copies the cache data as well """
responses = []
# For directory in server directories
for d in self.get_directories()['directories']:
if path_from in d['physicalPath']: # If the path_from matches
print d['name']
new_path = d['physicalPath'].replace(path_from, path_to)
print new_path
responses.append(self.edit_directory(d['name'], new_path)) # Change the directory
# Copy cache contents if set to copy_cahce=True
if d['directoryType'] == 'CACHE' and copy_cache == True:
self.copy_directory_contents(d['physicalPath'], d['physicalPath'].replace(path_from, path_to))
return responses
def get_directory(self,directory):
url = url = '{}/arcgis/admin/system/directories/{}'.format(self._endpoints.BASE_URL, directory)
response = self._post(url)
return response
def edit_directory(self, directory, new_path, cleanup_mode=None, max_file_age=None, description=None, name=None, directory_type=None):
The server directory's edit operation allows you to change the path and clean up properties of the directory.
This operation updates the GIS service configurations (and points them to the new path) that are using this directory, causing them to restart.
It is therefore recommended that any edit to the server directories be performed when the server is not under load.
This operation is mostly used when growing a single machine site to a multiple machine site configuration,
which requires that the server directories and configuration store be put on a network-accessible file share.
Keyword arguments:
cleanup_mode -- how to cleanup files ('TIME_ELAPSED_SINCE_LAST_MODIFIED'/ default 'NONE')
max_file_age -- How long to keep file before deletion in minutes
if directory in ['arcgisindex', 'arcgisinput', 'arcgisjobregistry', 'arcgisuploads', 'kml']:
return {'warning':"Can't edit this direcotry", 'directory':directory}
orig_directory = self.get_directory(directory)
# Maps python params to web params
param_map = {'cleanup_mode':'cleanupMode', 'max_file_age':'maxFileAge',
'description':'description','name':'name', 'new_path':'physicalPath', 'directory_type':'directoryType'}
params = {}
for k,v in param_map.items():
if eval(k) is not None:
params[v] = eval(k)
# Get values from original (You actually have to do this)
params[v] = orig_directory[v]
url = url = '{}/arcgis/admin/system/directories/{}/edit'.format(self._endpoints.BASE_URL, directory)
print params
response = self._post(url, params)
print response
return response
def get_configstore(self):
The configuration store maintains all of server's configurations.
Typical configurations include all the resources such as clusters, machines, GIS services, and security rules that are required to power a Site.
In a way, the configuration store is a physical representation of a site.
url = '{}/arcgis/admin/system/configstore'.format(self._endpoints.BASE_URL)
response = self._post(url)
return response
def move_configstore(self, folder_in, folder_out):
""" Moves the config store """
current_configstore = self.get_configstore()['connectionString']
if folder_in in current_configstore:
folder_out = current_configstore.replace(folder_in, folder_out)
return self.edit_configstore(folder_out, move=True, run_async=True)
return {"Message": "Config store not at the old location"}
def edit_configstore(self, folder_out, move=True, run_async=True, keep_trying=True):
You can use this operation to update the configuration store. Typically, this operation is used to change the location of the store.
When ArcGIS Server is installed, the default configuration store uses local paths. As the site grows (more server machines are added),
the location of the store must be updated to use a shared file system path. On the other hand,
if you know at the onset that your site will have two or more server machines, you can start from a shared path while creating a site and skip this step altogether.
import time
url = '{}/arcgis/admin/system/configstore/edit'.format(self._endpoints.BASE_URL)
params = {'type': 'FILESYSTEM', 'connectionString': folder_out,
'move': move, 'runAsync': run_async}
response = {}
while True:
# If site is busy, keep trying...
response = self._post(url, params)
if response['status'] == 'error':
if keep_trying == False:
elif any(['Please try again later' in x for x in response['messages']]):
# break if actual error
# break if no error
print('sleeping for a minute')
return response
def export_site(self, folder_out):
Exports the site configuration to the folder path given
url = '{}/arcgis/admin/exportSite'.format(self._endpoints.BASE_URL)
params = {'location':folder_out}
response = self._post(url, params)
return response
def import_site(self, file_in):
Exports the site configuration from the file given
url = '{}/arcgis/admin/importSite'.format(self._endpoints.BASE_URL)
params = {'location': file_in}
response = self._post(url, params)
return response
def delete_site(self):
Deletes site and removes all clusters/machines from it
url = '{}/deleteSite'.format(self._endpoints.ADMIN)
#print url
response = self._post(url)
return response
def create_site(self, username, password, dir_config, dir_output, dir_output_obj = None, cluster='default', logsSettings = '', runAsync='true' ):
import os, json
config_store = {'connectionString':dir_config}
if dir_output_obj == None:
directories_obj = {
"directories": [
"name": "arcgiscache",
"physicalPath": os.path.join(dir_output,"arcgiscache"),
"directoryType": "CACHE",
"cleanupMode": "NONE",
"maxFileAge": 0,
"description": "Stores tile caches used by map, globe, and image services for rapid performance.",
"name": "arcgisjobs",
"physicalPath": os.path.join(dir_output,"arcgisjobs"),
"directoryType": "JOBS",
"maxFileAge": 360,
"description": "Stores results and other information from geoprocessing services.",
"name": "arcgisoutput",
"physicalPath": os.path.join(dir_output,"arcgisoutput"),
"directoryType": "OUTPUT",
"maxFileAge": 10,
"description": "Stores various information generated by services, such as map images.",
"name": "arcgissystem",
"physicalPath": os.path.join(dir_output,"arcgissystem"),
"directoryType": "SYSTEM",
"cleanupMode": "NONE",
"maxFileAge": 0,
"description": "Stores directories and files used internally by ArcGIS Server.",
params = {
'username': username,
'password': password,
'configStoreConnection': json.dumps(config_store),
'directories': json.dumps(directories_obj)
url = '{}/createNewSite'.format(self._endpoints.ADMIN, params)
response = self._post(url)
## cluster stuff
def get_clusters(self):
url = "{}/clusters".format(self._endpoints.ADMIN)
response = self._post(url)
return response
def get_cluster(self, cluster_name='default'):
url = "{}/clusters/{}".format(self._endpoints.ADMIN, cluster_name)
response = self._post(url)
return response
def get_machines_in_cluster(self, cluster_name='default'):
url = "{}/clusters/{}/machines".format(self._endpoints.ADMIN, cluster_name)
response = self._post(url)
return response
def remove_from_cluster(self, machine_names, cluster_name='default'):
url = "{}/clusters/{}/machines/remove".format(self._endpoints.ADMIN, cluster_name)
params = {"machineNames": machine_names}
response = self._post(url, params)
return response
def remove_all_other_machines_from_cluster(self, cluster_name='default'):
import urlparse
current_server = urlparse.urlparse(self._endpoints.BASE_URL).hostname.upper()
l_machines = []
for serv in self.get_machines_in_cluster(cluster_name)['machines']:
if serv['machineName'] <> current_server:
machines = ", ".join(l_machines)
#print("Removing machine from cluster: " + machines)
#print(self.remove_from_cluster(machines, cluster_name))
def add_machines_to_cluster(self, machine_names, cluster_name='default'):
url = "{}/clusters/{}/machines/add".format(self._endpoints.ADMIN, cluster_name)
params = {"machineNames": machine_names}
response = self._post(url, params)
return response
def get_available_machines(self):
url = "{}/clusters/getAvailableMachines".format(self._endpoints.ADMIN)
response = self._post(url)
return response
def add_available_machines_to_cluster(self, cluster_name='default'):
l_machines = []
for m in self.get_available_machines()['machines']:
if len(l_machines) > 0:
machines = ", ".join(l_machines)
#print("Adding machines to cluster: " + machines)
return self.add_machines_to_cluster(machines, cluster_name)
else: return "No servers to add"
def unregister_machine(self, machine_name):
url = "{}/machines/{}/unregister".format(self._endpoints.ADMIN,machine_name)
response = self._post(url)
return response
def register_machine(self, machine_name, machine_admin_url=None):
machine_admin_url is the url where the admin is running on the server to be added. Ex:
If left blank, ArcGIS Server will attempt to reach it at <machineName>:6080/arcgis/admin
params = {"machineName": machine_name}
if machine_admin_url <> None:
params['machineName'] = machine_name
url = "{}/machines/register".format(self._endpoints.ADMIN,machine_name)
response = self._post(url)
return response
def register_machines(self, l_machine_names, admin_rel_path=r":6080/arcgis/admin"):
for m in l_machine_names:
if admin_rel_path == r":6080/arcgis/admin":
self.register_machine(m, '{}/{}'.format(m,admin_rel_path))
def unregister_machines(self, l_machine_names):
for m in l_machine_names:
def unregister_available_machines(self):
Unregisters all machines not currently participating in a site
l_machines = self.get_available_machines()['machines']
#print l_machines
def validate_data_item(self, item, item_name, type):
import json
url = self._endpoints.VALIDATE_ITEM_DATA
p1 = {}
if type == 'gdb':
p1 = { 'info': {
'connectionString': item['value'],
'dataStoreConnectionType': 'shared',
'isManaged': 'false'},
'path': '/enterpriseDatabases/' + item_name,
'type': 'egdb'
params = {'item':json.dumps(p1)}
response = self._post(url, params)
#print response
return response
def get_all_egdb(self):
Gets all SDE Connections
url = '{}/{}'.format(self._endpoints.ADMIN,'data/findItems')
params = {'types': 'egdb'}
return self._post(url, params)
def unregister_all_egdb(self):
Unregisters all SDE Connections
for x in self.get_all_egdb()['items']:
gdb = x['path'].replace(r'/enterpriseDatabases/','')
def unregister_gdb(self, cxn_name):
#print('Unregistering GDB {}'.format(cxn_name))
def unregister_data_item(self, item_path):
url = '{}/data/unregisterItem'.format(self._endpoints.ADMIN)
params = {'itempath':item_path}
response = self._post(url, params)
#print response
return response
def register_data_item(self,item, item_name, type):
import json
url = self._endpoints.REGISTER_DATA_ITEM
p1 = {}
if type == 'gdb':
p1 = { 'info': {
'connectionString': item['value'],
'dataStoreConnectionType': 'shared'
'path': '/enterpriseDatabases/' + item_name,
'type': 'egdb'
#print p1
params = {'item':json.dumps(p1)}
if self.validate_data_item(item, item_name, type)['status'] == 'success':
#print('Registering item')
response = self._post(url, params)
#print response
return response
def upload(self, fpath, desc=''):
""" Function to upload a file to the ArcGIS REST Admin
params = {
"file": 'fffffffile',
files = {"itemFile": open(fpath, 'rb')}
response = self._post(url, params,files=files)
from pprint import pprint
return response
def registerDB(self, cxn_name, cxn_file, server_cxn='', pt_workspace='System', pt_svc_name='PublishingTools'):
gp = GP(self._endpoints.BASE_URL,username = self.username, password=self.password)
uploaded = self.upload(cxn_file, cxn_name)
task = 'Get Database Connection String'
params = {
'in_inputData': uploaded['item']['itemID'],
job_status = gp.submit_job(pt_workspace,pt_svc_name,task,params)
job_status = gp.wait_for_job_completion(pt_workspace,pt_svc_name,task,job_status['jobId'])
result = gp.get_result(pt_workspace,pt_svc_name,task,job_status, 'out_connectionString')
if self.validate_data_item(result, cxn_name,'gdb'):
'registering item'
return self.register_data_item(result, cxn_name,'gdb')
class User(Manager):
this inherits from Manager and contains methods pertaining to user management
def __init__(self,url=None, proxies=False, username=None, password=None):
super(User, self).__init__(url, proxies, username, password)
def addUser(self, username, password, params=None):
''' adds a user to arcgis server '''
if params == None:
params = {
params['username'] = username
params['password'] = password
return self._post(self._endpoints.ADD_USER, params)
def getUsers(self,startIndex=0, pageSize=10):
gets users from arcgis server
Max of pageSize max is 1,000
requestURL = self._endpoints.GET_USERS
params = {'startIndex':startIndex,'pageSize':pageSize}
return self._post(requestURL, params)
def getAllUsers(self):
Iterates through getUsers() using the
startIndex and pageSize to return all users
users = []
i = 0
step = 1000
while True:
response = self.getUsers(startIndex=i,pageSize=step)
[users.append(u) for u in response['users']]
if response.has_key('hasMore') == False:
response['users'] = users
elif response['hasMore'] == True:
# Run again
i += step
response['users'] = users
return response
def getRolesForUser(self,user):
''' gets all of the roles a user '''
params ={
'username': user
return self._post(self._endpoints.GET_ROLES_FOR_USER, params)
def doesUserHaveRole(self,user,role):
''' see if user belongs to a role '''
return role in self.getRolesForUser(user)["roles"]
def returnAvailableUsername(self,username):
''' check if username is available '''
i = 1
origUserName = username
while len(self.searchUsers(username)['users']) <> 0:
username = origUserName + '_' + str(i)
i+= 1
return username
def getUser(self,username):
''' return a user '''
users = self.searchUsers(username, maxcount=50) ['users']
for u in users:
if u['username'] == username:
return u
return {}
def searchUsers(self, searchfilter, maxcount=10):
''' search for users based on a search filter '''
params = {
return self._post(self._endpoints.SEARCH_USER, params)
def removeUser(self,username):
''' removes a user from arcgis server '''
#print('{}: removing user'.format(username))
params = {
return self._post(self._endpoints.REMOVE_USER, params)
def updateUser(self, username, password=None, fullname=None, description=None, email=None):
''' update a user on arcgis server '''
params = {
if password:
params['password'] = password
if fullname:
params['fullname'] = fullname
if description:
params['description'] = description
if email:
params['email'] = email
return self._post(self._endpoints.UPDATE_USER, params)
def createKeyIfNotExists(self, dict, key):
''' if a key for a dictionary doesn't exist, create it. this is a helper method '''
if dict.has_key(key) == False:
dict[key] = ''
return dict
def updateUserPWD(self, username, password):
''' update a users password '''
u = self.getUser(username)
if len(u) <> 0:
#print u
return self.updateUser(username, password, u['fullname'], u['description'], u['email'])
#print('User not found')
return 'error'
def addRoles(self, username, roles):
''' add a roles to a user. roles should be a list ['role1','role2','role3'] '''
params = {
return self._post(self._endpoints.ADD_ROLE, params)
def removeRoles(self, username, roles):
''' remove roles from a user. roles should be a list ['role1','role2','role3'] '''
params = {
return self._post(self._endpoints.REMOVE_ROLE, params)
def getPrivileges(self, username):
''' get a users privileges ADMINISTRATOR etc... '''
params = {
return self._post(self._endpoints.GET_PRIVILEGE, params)
def addUsersToRole(self, usernames, role):
''' adds users to a role'''
params = {
'users': usernames,
'rolename' : role
return self._post(self._endpoints.ADD_USER_TO_ROLE, params)
class Service(Manager):
def __init__(self,url=None, proxies=False, username=None, password=None):
super(Service, self).__init__(url, proxies, username, password)
def _collectServices(self, d):
''' this method packages up our services that reside on arcgis server and returns a dictionary of services '''
folders = [i for i in d['folders']]
data = []
for folder in folders:
requestURL = '{}/{}'.format(self._endpoints._SERVICE_URL,folder)
resp = self._post(requestURL)
for service in resp['services']:
name = service['serviceName']
folder = service['folderName']
type = service['type']
status = self.getServiceStatus(folder, name, type)['realTimeState']
'folder': folder,
'type': type,
'status': status,
return data
def getAllServices(self):
''' returns all services for our arcgis server '''
data = self._post(self._endpoints._SERVICE_URL)
services = self._collectServices(data)
return services
def getServiceStatus(self,folder, servicename, servicetype):
''' gets the status of a server services '''
requestURL = '{}/{}/{}.{}/status'.format(self._endpoints._SERVICE_URL, folder,servicename, servicetype)
return self._post(requestURL)
def startService(self,folder,servicename, servicetype):
''' starts a service '''
requestURL = '{}/{}/{}.{}/start'.format(self._endpoints._SERVICE_URL, folder,servicename, servicetype)
return self._post(requestURL)
def stopService(self,folder,servicename, servicetype):
''' stops a service '''
requestURL = '{}/{}/{}.{}/stop'.format(self._endpoints._SERVICE_URL, folder, servicename, servicetype)
return self._post(requestURL)
def getServiceScales(self, folder, servicename, servicetype):
requestURL = '{}/{}/{}/{}'.format(self._endpoints._REST_SRVCS_URL, folder, servicename, servicetype)
# #print requestURL
data = self._post(requestURL)
return data['tileInfo']['lods']
def getServiceScalesAsString(self, folder, servicename, servicetype):
json_data = self.getServiceScales(folder, servicename, servicetype)
# #print json_data
return ",".join([str(s["scale"]) for s in json_data])
def _format_extent(self, xmin, ymin, xmax, ymax, wkid=26915):
return {"xmin":xmin,
"spatialReference": {"wkid":wkid}
def exportTiles(self, folder, servicename, servicetype, extent, wkid=26915):
params = {
'exportExtent': str(self._format_extent(extent[0], extent[1], extent[2], extent[3], wkid)),
'exportBy' : 'scale',
'levels' : self.getServiceScalesAsString(folder,servicename,servicetype),
requestURL = '{}/{}/{}/{}/exportTiles'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype)
data = self._post(requestURL, params)
return data
def createReplica(self, folder, servicename, servicetype, extent, wkid=26915):
url = '{}/{}/{}/{}'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype)
service_details = self._post(url)
extent = {str(k) : v for k,v in service_details['fullExtent'].iteritems() }
extent = [str(v) for k, v in extent.iteritems()]
params = {
"replicaName" : servicename,
"layers" :",".join([str(i['id']) for i in service_details['layers']]),
"geometry" : ",".join(extent),
"geometryType" : "esriGeometryEnvelope",
#"inSR" : 4326, #wkid,
"transportType": "esriTransportTypeUrl",
"returnAttachments" : "false",
"returnAttachmentDataByURL" : "false",
"syncModel" : "perLayer",
"async" : "true",
"dataFormat" : "sqlite",
"replicaOptions": ""
print params
requestURL = '{}/{}/{}/{}/createReplica'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype)
print requestURL
data = self._post(requestURL, params)
return data
def checkServiceJobStatus(self, folder, servicename, servicetype, task, jobid):
requestURL = '{}/{}/{}/{}/{}/jobs/{}'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype, task, jobid)
data = self._post(requestURL)
return data
def getJobResultsOutputUrl(self, folder, servicename, servicetype, task, jobid):
requestURL = '{}/{}/{}/{}/{}/jobs/{}/results/out_service_url'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype, task, jobid)
data = self._post(requestURL)
return data
def getJobResults(self, folder, servicename, servicetype, task, jobid):
data = self.getJobResultsOutputUrl(folder, servicename, servicetype, task, jobid)
requestURL = data['value']
results = self._post(requestURL)
return requestURL, results
class Status(Manager):
def __init__(self, url=None, proxies=False, username=None, password=None):
super(Status, self).__init__(url, proxies, username, password)
def countErrorReport(self,servername):
''' count errors report for server'''
params = {'machine': servername}
return self._post(self._endpoints.COUNT_ERRORS)
import os,json
class settings(object):
These will be settings that all other modules may use.
Ex: Databases, environments, paths.
Anything that may need to be reused across multiple modules
- - - - - - - -SET THE GLOBAL ENVIRONMENT HERE- - - - - - - - -
_config_fpath = configs_fpath = os.path.join(os.path.dirname(os.path.realpath(__file__)),'config.json')
_envir_fpath = configs_fpath = os.path.join(os.path.dirname(os.path.realpath(__file__)),'envir.json')
config = {}
envir = ''
with open(_config_fpath) as _config_file:
config = json.load(_config_file)
with open(_envir_fpath) as _config_file:
envir = json.load(_config_file)['envir']
config['envir'] = envir
env_vars = config['env_vars']
dbo = env_vars[envir]['dbs']
"proxies": {
"http": "http://<username>:<pword>@proxy.<>.com:80",
"https": "https://<username>:<pword>@proxy.<>.com:80"
"email_address": "",
"smtp_server": "",
"email_group": "",
"nas_storage": "",
"env_vars": {
"dev": {
"dbs": {
"db1": {
"name": "exampledb",
"users": {
"omdba": { "user": "test", "password": "test" },
"conn_string": ""
"db2": {
"name": "exampledb2",
"users": {
"sde": { "user": "test", "password": "test" },
"conn_string": ""
"esri_storage": "",
"arcgis_server": {
"url": "",
"servers": [
"arcgis_cache": { }
{"envir": "dev"}
