Skip to content

Instantly share code, notes, and snippets.

@saml
Created December 22, 2011 18:06
Show Gist options
  • Save saml/1511232 to your computer and use it in GitHub Desktop.
Save saml/1511232 to your computer and use it in GitHub Desktop.
cq api
'''CQ HTTP API
>>> cq = CQCurl(curl='curl', host='localhost', port=4502, cookie_path='/tmp/tmp.cookie')
>>> cq.url('/foo/bar')
'http://localhost:4502/foo/bar'
#>>> cq.login(dryrun=True)
#curl -f -s -F _charset_=utf-8 -c /tmp/tmp.cookie -F j_username=admin -F j_password=admin http://localhost:4502/libs/qa/core/content/login.html/j_security_check
'''
import json
import subprocess
import os
import itertools
def append(l, *args):
for x in args:
l.append(x)
class CQCurl(object):
def __init__(self, curl='curl', host='localhost', port=4502,
username='admin', password='admin', cwd='/',
cookie_path=os.path.join(os.path.expanduser('~'), 'cq.cookie')):
self.curl = curl
self.host = host
self.port = port
self.username = username
self.password = password
self.cwd = cwd #current working directory
self.json = None #current json. as a cache.
self.cookie_path = cookie_path
def url(self, path, protocol='http://', host=None, port=None):
'path => url'
if host is None:
host = self.host
if port is None:
port = self.port
return '%s%s:%d%s' % (protocol, host, port, path)
def _cmd(self, method='GET', use_cookie=True):
cmd = [self.curl, '-f', '-s']
if use_cookie:
append(cmd, '-b', self.cookie_path)
if 'POST' == method:
append(cmd, '-F', '_charset_=utf-8')
return cmd
def _cmd_post(self, props):
cmd = self._cmd('POST')
for k,v in props.iteritems():
append(cmd, '-F', '%s=%s', (k, v))
return cmd
def _exec_command(self, cmd, dryrun=False):
'executes cmd. returns (returncode, stdout, stderr)'
if dryrun:
print(' '.join(cmd))
return (None, None, None)
else:
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out,err = p.communicate()
return (p.returncode, out, err)
def get_json(self, path=None, level=1, is_tidy=False, **kwargs):
if path is None:
path = self.cwd
if is_tidy:
tidy_selector = '.tidy'
else:
tidy_selector = ''
cmd = self._cmd()
append(cmd, self.url('%s%s.%d.json' % (path, tidy_selector, level)))
returncode,out,err = self._exec_command(cmd, **kwargs)
d = json.loads(out)
if path == self.cwd:
self.json = d #update cache
return d
def login(self, **kwargs):
if os.path.exists(self.cookie_path):
#empty cookie if exists
open(self.cookie_path, 'w').close()
cmd = self._cmd(method='POST', use_cookie=False)
append(cmd, '-c', self.cookie_path)
append(cmd, '-F', 'j_username=' + self.username)
append(cmd, '-F', 'j_password=' + self.password)
append(cmd, self.url('/libs/qa/core/content/login.html/j_security_check'))
return self._exec_command(cmd, **kwargs)
def remove_page(self, page_path, **kwargs):
cmd = self._cmd('POST')
append(cmd, '-F', 'cmd=deletePage')
append(cmd, '-F', 'force=false')
append(cmd, '-F', 'path=' + page_path)
append(cmd, self.url('/bin/wcmcommand'))
return self._exec_command(cmd, **kwargs)
def remove_pages(self, page_paths, **kwargs):
cmd = self._cmd('POST')
append(cmd, '-F', 'cmd=deletePage')
append(cmd, '-F', 'force=false')
for x in page_paths:
append(cmd, '-F', 'path=' + x)
append(cmd, self.url('/bin/wcmcommand'))
return self._exec_command(cmd, **kwargs)
def upload_image(self, file_path, target_dir_path, target_name=None, **kwargs):
file_name = os.path.basename(file_path)
if target_name is None:
target_name = file_name
cmd = self._cmd('POST')
append(cmd, '-T', file_path)
append(cmd, '-F', '*@TypeHint=nt:file')
append(cmd, self.url('%s/%s' % (target_dir_path, target_name)))
return self._exec_command(cmd, **kwargs)
def pwd(self):
return self.cwd
def ls(self, path=None):
if path is None:
path = self.cwd
path = os.path.join(self.cwd, path)
l = []
d = self.get_json(path)
for k,v in d.iteritems():
if isinstance(v, dict):
l.append(k + '/')
else:
l.append(k)
return l
def cd(self, rel_path=None):
if rel_path is None:
return
self.cwd = os.path.abspath(os.path.join(self.cwd, rel_path))
self.json = None
def req(self, path, method='GET', **kwargs):
cmd = self._cmd(method)
append(cmd, self.url(path))
return self._exec_command(cmd, **kwargs)
def create_node(self, path, primary_type='nt:unstructured', props=None, **kwargs):
if props is None:
props = {}
cmd = self._cmd('POST')
append(cmd, '-F', 'jcr:primaryType=' + primary_type)
for k,v in props.iteritems():
append(cmd, '-F', k + '=' + v)
append(cmd, self.url(path))
return self._exec_command(cmd, **kwargs)
def create_folder(self, path, **kwargs):
return self.create_node(path, primary_type='sling:Folder', **kwargs)
def propget(self, prop_name):
if self.json is None:
self.json = self.get_json()
return self.json.get(prop_name)
def propset(self, prop_name, value, path=None, **kwargs):
if path is None:
path = self.cwd
props = {}
props[prop_name] = value
cmd = self._cmd_post(props)
append(cmd, self.url(path))
returncode,out,err = self._exec_command(cmd, **kwargs)
if path == self.cwd:
self.json = None
return (returncode,out,err)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment