Created
December 22, 2011 18:06
-
-
Save saml/1511232 to your computer and use it in GitHub Desktop.
cq api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''CQ HTTP API | |
>>> cq = CQCurl(curl='curl', host='localhost', port=4502, cookie_path='/tmp/tmp.cookie') | |
>>> cq.url('/foo/bar') | |
'http://localhost:4502/foo/bar' | |
#>>> cq.login(dryrun=True) | |
#curl -f -s -F _charset_=utf-8 -c /tmp/tmp.cookie -F j_username=admin -F j_password=admin http://localhost:4502/libs/qa/core/content/login.html/j_security_check | |
''' | |
import json | |
import subprocess | |
import os | |
import itertools | |
def append(l, *args): | |
for x in args: | |
l.append(x) | |
class CQCurl(object): | |
def __init__(self, curl='curl', host='localhost', port=4502, | |
username='admin', password='admin', cwd='/', | |
cookie_path=os.path.join(os.path.expanduser('~'), 'cq.cookie')): | |
self.curl = curl | |
self.host = host | |
self.port = port | |
self.username = username | |
self.password = password | |
self.cwd = cwd #current working directory | |
self.json = None #current json. as a cache. | |
self.cookie_path = cookie_path | |
def url(self, path, protocol='http://', host=None, port=None): | |
'path => url' | |
if host is None: | |
host = self.host | |
if port is None: | |
port = self.port | |
return '%s%s:%d%s' % (protocol, host, port, path) | |
def _cmd(self, method='GET', use_cookie=True): | |
cmd = [self.curl, '-f', '-s'] | |
if use_cookie: | |
append(cmd, '-b', self.cookie_path) | |
if 'POST' == method: | |
append(cmd, '-F', '_charset_=utf-8') | |
return cmd | |
def _cmd_post(self, props): | |
cmd = self._cmd('POST') | |
for k,v in props.iteritems(): | |
append(cmd, '-F', '%s=%s', (k, v)) | |
return cmd | |
def _exec_command(self, cmd, dryrun=False): | |
'executes cmd. returns (returncode, stdout, stderr)' | |
if dryrun: | |
print(' '.join(cmd)) | |
return (None, None, None) | |
else: | |
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
out,err = p.communicate() | |
return (p.returncode, out, err) | |
def get_json(self, path=None, level=1, is_tidy=False, **kwargs): | |
if path is None: | |
path = self.cwd | |
if is_tidy: | |
tidy_selector = '.tidy' | |
else: | |
tidy_selector = '' | |
cmd = self._cmd() | |
append(cmd, self.url('%s%s.%d.json' % (path, tidy_selector, level))) | |
returncode,out,err = self._exec_command(cmd, **kwargs) | |
d = json.loads(out) | |
if path == self.cwd: | |
self.json = d #update cache | |
return d | |
def login(self, **kwargs): | |
if os.path.exists(self.cookie_path): | |
#empty cookie if exists | |
open(self.cookie_path, 'w').close() | |
cmd = self._cmd(method='POST', use_cookie=False) | |
append(cmd, '-c', self.cookie_path) | |
append(cmd, '-F', 'j_username=' + self.username) | |
append(cmd, '-F', 'j_password=' + self.password) | |
append(cmd, self.url('/libs/qa/core/content/login.html/j_security_check')) | |
return self._exec_command(cmd, **kwargs) | |
def remove_page(self, page_path, **kwargs): | |
cmd = self._cmd('POST') | |
append(cmd, '-F', 'cmd=deletePage') | |
append(cmd, '-F', 'force=false') | |
append(cmd, '-F', 'path=' + page_path) | |
append(cmd, self.url('/bin/wcmcommand')) | |
return self._exec_command(cmd, **kwargs) | |
def remove_pages(self, page_paths, **kwargs): | |
cmd = self._cmd('POST') | |
append(cmd, '-F', 'cmd=deletePage') | |
append(cmd, '-F', 'force=false') | |
for x in page_paths: | |
append(cmd, '-F', 'path=' + x) | |
append(cmd, self.url('/bin/wcmcommand')) | |
return self._exec_command(cmd, **kwargs) | |
def upload_image(self, file_path, target_dir_path, target_name=None, **kwargs): | |
file_name = os.path.basename(file_path) | |
if target_name is None: | |
target_name = file_name | |
cmd = self._cmd('POST') | |
append(cmd, '-T', file_path) | |
append(cmd, '-F', '*@TypeHint=nt:file') | |
append(cmd, self.url('%s/%s' % (target_dir_path, target_name))) | |
return self._exec_command(cmd, **kwargs) | |
def pwd(self): | |
return self.cwd | |
def ls(self, path=None): | |
if path is None: | |
path = self.cwd | |
path = os.path.join(self.cwd, path) | |
l = [] | |
d = self.get_json(path) | |
for k,v in d.iteritems(): | |
if isinstance(v, dict): | |
l.append(k + '/') | |
else: | |
l.append(k) | |
return l | |
def cd(self, rel_path=None): | |
if rel_path is None: | |
return | |
self.cwd = os.path.abspath(os.path.join(self.cwd, rel_path)) | |
self.json = None | |
def req(self, path, method='GET', **kwargs): | |
cmd = self._cmd(method) | |
append(cmd, self.url(path)) | |
return self._exec_command(cmd, **kwargs) | |
def create_node(self, path, primary_type='nt:unstructured', props=None, **kwargs): | |
if props is None: | |
props = {} | |
cmd = self._cmd('POST') | |
append(cmd, '-F', 'jcr:primaryType=' + primary_type) | |
for k,v in props.iteritems(): | |
append(cmd, '-F', k + '=' + v) | |
append(cmd, self.url(path)) | |
return self._exec_command(cmd, **kwargs) | |
def create_folder(self, path, **kwargs): | |
return self.create_node(path, primary_type='sling:Folder', **kwargs) | |
def propget(self, prop_name): | |
if self.json is None: | |
self.json = self.get_json() | |
return self.json.get(prop_name) | |
def propset(self, prop_name, value, path=None, **kwargs): | |
if path is None: | |
path = self.cwd | |
props = {} | |
props[prop_name] = value | |
cmd = self._cmd_post(props) | |
append(cmd, self.url(path)) | |
returncode,out,err = self._exec_command(cmd, **kwargs) | |
if path == self.cwd: | |
self.json = None | |
return (returncode,out,err) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment