Created
September 18, 2015 17:53
-
-
Save shubhadeep/d9ebf425839d9b911237 to your computer and use it in GitHub Desktop.
Python WebDAV Client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, sys, requests | |
from lxml import objectify, etree | |
WEBDAV_STATUS_CODE = { # See http://tools.ietf.org/html/rfc4918 | |
'OK' : 200, | |
'CREATED' : 201, | |
'NO_CONTENT' : 204, | |
'MULTI_STATUS' : 207, | |
'NOT_FOUND' : 404, | |
'METHOD_NOT_ALLOWED' : 405, | |
'PRECONDITION_FAILED' : 412, | |
'REQUEST_URI_TOO_LONG': 414, | |
'UNPROCESSABLE_ENTITY': 422, | |
'LOCKED' : 423, | |
'FAILED_DEPENDENCY' : 424, | |
'INSUFFICIENT_STORAGE': 507, | |
} | |
class Client: | |
def __init__(self, url, username, password, path='/'): | |
self._url = url | |
self._remotedir = path | |
self._session = requests.Session() | |
self._session.auth = (username, password) | |
self._authenticated = False | |
def authenticate(self): | |
response = self._session.get(self._url) | |
if response.status_code == 200: | |
self._authenticated = True | |
return self._authenticated | |
def send_request(self, verb, path, body='', headers=None, properties=None): | |
if not headers: | |
headers = {} | |
response = self._session.request(verb, path, data=body, headers=headers) | |
return response | |
def get_request_path(self, filename, path=None): | |
requestpath = '' | |
if not path: | |
requestpath = '{0}/{1}'.format(self._remotedir, filename) | |
return self._url + requestpath | |
def propfind(self, path, properties=None): | |
if not self._authenticated: | |
self.authenticate() | |
verb = 'PROPFIND' | |
body = self.get_propfind_body() | |
requestpath = self.get_request_path('', path) | |
headers = {'Depth': '1'} | |
response = self.send_request(verb, requestpath, body=body, headers=headers) | |
if response.status_code == WEBDAV_STATUS_CODE['MULTI_STATUS']: | |
xml_without_encoding_declaration = '\n'.join(response.text.split('\n')[1:]) | |
return objectify.fromstring(xml_without_encoding_declaration) | |
print response.status_code | |
return None | |
def get(self, filename, path=None): | |
verb = 'GET' | |
requestpath = self.get_request_path(filename, path) | |
response = self.send_request(verb, requestpath) | |
if response.status_code == WEBDAV_STATUS_CODE['OK']: | |
return response.text | |
return False | |
def put(self, filename, path=None): | |
verb = 'PUT' | |
requestpath = self.get_request_path(filename, path) | |
file = open(filename) | |
response = self.send_request(verb, requestpath, file.read()) | |
if response.status_code in (WEBDAV_STATUS_CODE['CREATED'], WEBDAV_STATUS_CODE['NO_CONTENT']): | |
return True | |
return False | |
def delete(self, filename, path=None): | |
verb = 'DELETE' | |
requestpath = self.get_request_path(filename, path) | |
response = self.send_request(verb, requestpath) | |
if response.status_code == WEBDAV_STATUS_CODE['NO_CONTENT']: | |
return True | |
print response.status_code | |
return False | |
def mkcol(self, filename, path=None): | |
verb = 'MKCOL' | |
requestpath = self.get_request_path(filename, path=None) | |
response = self.send_request(verb, requestpath) | |
if response.status_code == WEBDAV_STATUS_CODE['CREATED']: | |
return True | |
print response.status_code | |
return False | |
def get_propfind_body(self, properties=None): | |
if not properties: | |
properties=[] | |
body = '<?xml version="1.0" encoding="utf-8" ?>' | |
body += '<D:propfind xmlns:D="DAV:">' | |
if properties: | |
body += '<D:prop>' | |
for prop in properties: | |
body += '<D:' + prop + '/>' | |
body += '</D:prop>' | |
else: | |
body += '<D:allprop/>' | |
body += '</D:propfind>' | |
return body | |
if __name__ == '__main__': | |
reload(sys) | |
sys.setdefaultencoding('utf8') # http://tinyurl.com/why-we-need-sys-setdefaultenco | |
URL='https://example.com' | |
PATH='/webdavpath' | |
USER='user' | |
PASS='password' | |
client = Client(URL, USER, PASS, PATH) | |
client.authenticate() | |
if len(sys.argv) == 1: | |
print "Please profile file path to be uploaded" | |
print "Example:\n{0} {1}".format(os.path.basename(sys.argv[0]), "'path/to/file'") | |
sys.exit(1) | |
filepath = sys.argv[1] | |
print "Uploading {0} ...".format(filepath), | |
print 'DONE' if client.put(filepath) else 'FAIL' | |
''' | |
EXAMPLES | |
print client.propfind(PATH) | |
print 'Creating directory testdir' | |
print 'OK' if client.mkcol('testdir') else 'FAIL' | |
print 'Creating directory testdir again' | |
print 'OK' if client.mkcol('testdir') else 'FAIL' | |
print 'Removing directory testdir' | |
print 'OK' if client.delete('testdir') else 'FAIL' | |
print 'Removing directory testdir again' | |
print 'OK' if client.delete('testdir') else 'FAIL' | |
print 'Putting file test.py' | |
print 'OK' if client.put('test.py') else 'FAIL' | |
print 'getting test.py' | |
resp = client.get('test.py') | |
print resp if resp else 'FAIL' | |
print 'Deleting file test.py' | |
print 'OK' if client.delete('test.py') else 'FAIL' | |
print 'Deleting file test.py again' | |
print 'OK' if client.delete('test.py') else 'FAIL' | |
''' | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment