Created
March 9, 2018 12:53
-
-
Save zopyx/dd94316653b9adb89a34adea7c02df30 to your computer and use it in GitHub Desktop.
Plone 4 -> Plone 5.1 migration via plone.restapi
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import yaml | |
import pprint | |
import base64 | |
import requests | |
import plone.api | |
from requests.auth import HTTPBasicAuth | |
from Testing.makerequest import makerequest | |
from zope.component.hooks import setSite | |
from AccessControl.SecurityManagement import newSecurityManager | |
from Products.CMFCore.WorkflowCore import WorkflowException | |
CONFIG = None | |
FOLDER_MAPPING = { | |
'Folder': 'richfolder', | |
'Rich Document': 'richdocument', | |
'dynamore.seminarsdx.seminarfolder': 'dynamore.seminarsdx.seminarfolder', | |
} | |
def query_yes_no(question, default="yes"): | |
"""Ask a yes/no question via raw_input() and return their answer. | |
"question" is a string that is presented to the user. | |
"default" is the presumed answer if the user just hits <Enter>. | |
It must be "yes" (the default), "no" or None (meaning | |
an answer is required of the user). | |
The "answer" return value is True for "yes" or False for "no". | |
""" | |
valid = {"yes": True, "y": True, "ye": True, | |
"no": False, "n": False} | |
if default is None: | |
prompt = " [y/n] " | |
elif default == "yes": | |
prompt = " [Y/n] " | |
elif default == "no": | |
prompt = " [y/N] " | |
else: | |
raise ValueError("invalid default answer: '%s'" % default) | |
while True: | |
sys.stdout.write(question + prompt) | |
choice = raw_input().lower() | |
if default is not None and choice == '': | |
return valid[default] | |
elif choice in valid: | |
return valid[choice] | |
else: | |
sys.stdout.write("Please respond with 'yes' or 'no' " | |
"(or 'y' or 'n').\n") | |
def get_object_data(obj): | |
try: | |
review_state = plone.api.content.get_state(obj=obj) | |
except WorkflowException: | |
review_state = None | |
teaser_image = extract_lead_image(obj) | |
teaser_image_caption = extract_lead_image_caption(obj) | |
file_data = extract_file(obj) | |
image_data = extract_image(obj) | |
try: | |
text = obj.getText() | |
except AttributeError: | |
text = None | |
data = dict( | |
id=obj.getId(), | |
title=obj.Title(), | |
description=obj.Description(), | |
teaser_image=teaser_image, | |
teaser_image_caption=teaser_image_caption, | |
image=image_data, | |
review_state=review_state) | |
if text: | |
data['text'] = text | |
return data | |
def find_objects(root): | |
def _find_parents(node): | |
parents = [] | |
current = node | |
running = True | |
while running: | |
current = current.aq_parent | |
if current.portal_type != 'Plone Site': | |
data = dict( | |
portal_type=current.portal_type, | |
review_state=plone.api.content.get_state(obj=current), | |
path='/'.join(current.getPhysicalPath())) | |
parents.append(data) | |
else: | |
running = False | |
return parents[::-1] | |
def _find_objects(node, result): | |
if not node.portal_type in result: | |
result[node.portal_type] = [] | |
try: | |
review_state = plone.api.content.get_state(obj=node), | |
except WorkflowException: | |
review_state = None | |
result[node.portal_type].append(dict( | |
path='/'.join(node.getPhysicalPath()), | |
review_state=review_state, | |
parents=_find_parents(node), | |
portal_type=node.portal_type)) | |
if node.portal_type in ('RichDocument', 'Folder', 'dynamore.seminarsdx.seminarfolder'): | |
for child in node.contentValues(): | |
_find_objects(child, result) | |
try: | |
print 'Finding objects in {}'.format(root.absolute_url(1)) | |
except: | |
import pdb | |
pdb.set_trace() | |
result = dict() | |
_find_objects(root, result) | |
print 'DONE - Finding objects in {}'.format(root.absolute_url(1)) | |
return result | |
def old2new_path(path): | |
""" Replace old path with new path """ | |
return path.replace( | |
'{0}'.format(CONFIG['old_site_id']), | |
'{0}'.format(CONFIG['new_site_id'])) | |
def extract_lead_image(obj): | |
lead_image_data = {} | |
try: | |
schema = obj.Schema() | |
except AttributeError: | |
return {} | |
lead_image_field = schema.getField('leadImage') | |
lead_image = str(lead_image_field.get(obj)) | |
if lead_image: | |
lead_image_data = { | |
'data': base64.b64encode(lead_image), | |
'encoding': 'base64', | |
'content-type': 'image/png' | |
} | |
return lead_image_data | |
def extract_lead_image_caption(obj): | |
try: | |
schema = obj.Schema() | |
except AttributeError: | |
return None | |
caption_field = schema.getField('leadImageCaption') | |
if caption_field: | |
return caption_field.get(obj) | |
def extract_file(obj): | |
field = obj.Schema().getField('file') | |
if not field: | |
return None | |
file_data = str(obj.getFile()) | |
if file_data: | |
return { | |
'data': base64.b64encode(file_data), | |
'encoding': 'base64', | |
'content-type': field.get(obj).content_type, | |
'filename': field.get(obj).filename, | |
} | |
def extract_image(obj): | |
field = obj.Schema().getField('image') | |
if not field: | |
return | |
content_type = field.get(obj).content_type | |
if not content_type.startswith('image/'): | |
return | |
image_data = str(obj.getImage()) | |
if image_data: | |
return { | |
'data': base64.b64encode(image_data), | |
'encoding': 'base64', | |
'content-type': content_type, | |
'filename': field.get(obj).filename, | |
} | |
def extract_image_dx(obj): | |
""" Extract image from dexterity content """ | |
try: | |
image_data = str(obj.image.data) | |
except: | |
return None | |
if image_data: | |
return { | |
'data': base64.b64encode(image_data), | |
'encoding': 'base64', | |
'content-type': obj.image.contentType, | |
'filename': obj.image.filename, | |
} | |
def delete_path(path): | |
headers = { | |
'accept': 'application/json', | |
} | |
url = '{0}/{1}'.format(CONFIG['endpoint']['url'], path) | |
auth = HTTPBasicAuth(CONFIG['endpoint']['user'], | |
CONFIG['endpoint']['password']) | |
print 'delete', url | |
result = requests.delete( | |
url, | |
auth=auth, | |
headers=headers) | |
def publish_resource(resource_path, review_state): | |
if not review_state or review_state == 'private': | |
return | |
headers = { | |
'accept': 'application/json' | |
} | |
url = '{0}/{1}/@workflow/publish'.format( | |
CONFIG['endpoint']['url'], resource_path) | |
print 'publish: {0}'.format(resource_path) | |
auth = HTTPBasicAuth(CONFIG['endpoint']['user'], | |
CONFIG['endpoint']['password']) | |
result = requests.post( | |
url, | |
auth=auth, | |
headers=headers) | |
def add_user(resource_path, username, password, fullname, email, roles): | |
headers = { | |
'accept': 'application/json', | |
'content-type': 'application/json' | |
} | |
data = { | |
'username': username, | |
'password': password, | |
'fullname': fullname, | |
'roles': roles, | |
'email': email | |
} | |
url = '{0}/{1}/@users'.format(CONFIG['endpoint']['url'], resource_path) | |
print '+user {0}'.format(username) | |
auth = HTTPBasicAuth(CONFIG['endpoint']['user'], | |
CONFIG['endpoint']['password']) | |
result = requests.post( | |
url, | |
auth=auth, | |
headers=headers, | |
json=data) | |
if result.status_code not in (200, 201): | |
raise RuntimeError(result.text) | |
def create_ct(resource_path, portal_type, **kw): | |
try: | |
return _create_ct(resource_path, portal_type, **kw) | |
except Exception as e: | |
print 'ERROR: create_ct({0}, "{1}", {2})'.format(resource_path, portal_type, e) | |
import pdb; pdb.set_trace() | |
def _create_ct(resource_path, portal_type, **kw): | |
data = {'@type': portal_type} | |
data.update(**kw) | |
if not data.get('title'): | |
data['title'] = data['id'] | |
data['title'] = data['title'].replace('\r\n', '') | |
headers = { | |
'accept': 'application/json', | |
'content-type': 'application/json' | |
} | |
url = '{0}/{1}'.format(CONFIG['endpoint']['url'], resource_path) | |
print '+{0}: {1}/{2}'.format(portal_type, resource_path, data['id']) | |
auth = HTTPBasicAuth(CONFIG['endpoint']['user'], | |
CONFIG['endpoint']['password']) | |
result = requests.post( | |
url, | |
auth=auth, | |
headers=headers, | |
json=data) | |
if result.status_code not in (200, 201): | |
raise RuntimeError(result.text) | |
def _remote_exists(folder_path): | |
url = '{0}/@@remote-exists?path={1}'.format( | |
CONFIG['endpoint']['url'], folder_path) | |
auth = HTTPBasicAuth(CONFIG['endpoint']['user'], | |
CONFIG['endpoint']['password']) | |
result = requests.get( | |
url, | |
auth=auth) | |
# print 'remote_exists', url, result.status_code | |
return result.status_code in [200] | |
def recreate_remote_plone_site(): | |
url = '{0}/@@recreate-plone-site'.format(CONFIG['endpoint']['url']) | |
auth = HTTPBasicAuth(CONFIG['endpoint']['user'], | |
CONFIG['endpoint']['password']) | |
data = { | |
'site_id': CONFIG['new_site_id'], | |
'extension_ids': CONFIG['extension_ids'] | |
} | |
result = requests.post( | |
url, | |
auth=auth, | |
json=data) | |
assert result.status_code == 201 | |
def _migrate_content(hierarchy_path): | |
all_objects = find_objects(SITE.restrictedTraverse(hierarchy_path)) | |
all_items = (all_objects.get('RichDocument', []) + | |
all_objects.get('File', []) + | |
all_objects.get('Link', []) + | |
all_objects.get('ImageAttachment', []) + | |
all_objects.get('FileAttachment', []) + | |
all_objects.get('dynamore.seminarsdx.person', []) + | |
all_objects.get('dynamore.seminarsdx.location', []) + | |
all_objects.get('dynamore.seminarsdx.seminarfolder', []) + | |
all_objects.get('dynamore.seminarsdx.seminar', []) + | |
all_objects.get('Image', [])) | |
paths_created = [] | |
for d in all_items: | |
obj_path = d['path'] | |
if obj_path in paths_created: | |
continue | |
obj_portal_type = d['portal_type'] | |
obj = SITE.restrictedTraverse(obj_path) | |
try: | |
review_state = plone.api.content.get_state(obj=obj) | |
except WorkflowException: | |
review_state = None | |
for parent in d['parents']: | |
if parent['path'] in paths_created: | |
continue | |
parent_exists = _remote_exists(parent['path']) | |
if parent_exists: | |
continue | |
parent_obj = SITE.restrictedTraverse(parent['path']) | |
parent_data = get_object_data(parent_obj) | |
components = parent['path'].split('/') | |
resource_path = '/'.join(components[1:-1]) # no site root, no id | |
paths_created.append(parent['path']) | |
print 'Creating PARENT' | |
pt = FOLDER_MAPPING[parent['portal_type']] | |
if parent_data['id'] in ('seminar_teacher', 'seminar-locations', 'seminar_orte'): | |
pt = 'dynamore.seminarsdx.seminarfolder' | |
create_ct( | |
resource_path, | |
pt, | |
**parent_data) | |
last_parent = d['parents'][-1] | |
components = last_parent['path'].split('/') | |
resource_path= '/'.join(components[1:]) # no site root, no id | |
paths_created.append(obj_path) | |
print 'CREATING: {0} {1}/{2}'.format(obj.portal_type, resource_path, obj.getId()) | |
if obj.portal_type == 'RichDocument': | |
teaser_image = extract_lead_image(obj) | |
teaser_image_caption = extract_lead_image_caption(obj) | |
create_ct( | |
resource_path, | |
'richdocument', | |
**get_object_data(obj)) | |
elif obj.portal_type in ('File', 'FileAttachment'): | |
file_data = extract_file(obj) | |
if file_data: | |
create_ct( | |
resource_path, | |
'File', | |
id=obj.getId(), | |
title=obj.Title(), | |
description=obj.Description(), | |
file=file_data) | |
elif obj.portal_type in ('Image', 'ImageAttachment'): | |
image_data = extract_image(obj) | |
if image_data: | |
create_ct( | |
resource_path, | |
'Image', | |
id=obj.getId(), | |
title=obj.Title(), | |
description=obj.Description(), | |
image=image_data) | |
elif obj.portal_type == 'Link': | |
create_ct( | |
resource_path, | |
'Link', | |
id=obj.getId(), | |
title=obj.Title(), | |
description=obj.Description(), | |
remoteUrl=obj.getRemoteUrl()) | |
elif obj.portal_type == 'dynamore.seminarsdx.seminarfolder': | |
create_ct( | |
resource_path, | |
'dynamore.seminarsdx.seminarfolder', | |
id=obj.getId(), | |
title=obj.Title(), | |
description=obj.Description(), | |
year_min=obj.year_min, | |
year_max=obj.year_max) | |
elif obj.portal_type == 'dynamore.seminarsdx.person': | |
image_data = extract_image_dx(obj) | |
create_ct( | |
resource_path, | |
'dynamore.seminarsdx.person', | |
id=obj.getId(), | |
title=obj.Title(), | |
description=obj.Description(), | |
image=image_data, | |
salutation=obj.salutation, | |
firstname=obj.firstname, | |
lastname=obj.lastname, | |
text=obj.text.raw, | |
role=obj.role, | |
year_max=obj.year_max) | |
elif obj.portal_type == 'dynamore.seminarsdx.seminar': | |
create_ct( | |
resource_path, | |
'dynamore.seminarsdx.seminar', | |
id=obj.getId(), | |
title=obj.Title(), | |
description=obj.Description(), | |
title_long=obj.title_long, | |
text=obj.text.raw, | |
seminar_code=obj.seminar_code, | |
seminar_type=obj.seminar_type, | |
affiliation=obj.affiliation, | |
image=extract_image_dx(obj), | |
is_seminar_master=obj.is_seminar_master, | |
price =obj.price, | |
currency=obj.currency, | |
sender_email=obj.sender_email, | |
notification_email=obj.notification_email, | |
available_lecturers=obj.available_lecturers, | |
available_locations=obj.available_locations, | |
# table=obj.table | |
) | |
elif obj.portal_type == 'dynamore.seminarsdx.location': | |
if obj.text: | |
text = obj.text.raw | |
else: | |
text = None | |
create_ct( | |
resource_path, | |
'dynamore.seminarsdx.location', | |
id=obj.getId(), | |
title=obj.Title(), | |
description=obj.Description(), | |
text=text | |
) | |
else: | |
print 'UNHANDLED', obj.portal_type | |
publish_resource(resource_path+ '/' + obj.getId(), review_state) | |
def migrate_hierarchy(hierarchy_path): | |
delete_path(old2new_path(hierarchy_path)) | |
_migrate_content(hierarchy_path) | |
def migrate_users(): | |
""" Migrate all users """ | |
passwords = SITE.acl_users.source_users._user_passwords | |
for d in plone.api.user.get_users(): | |
user = plone.api.user.get(d.id) | |
if not user: | |
continue | |
pw = passwords.get(d.id) | |
email = user.getProperty('email') | |
roles = user.getRoles() | |
fullname = user.getProperty('fullname') | |
add_user(old2new_path(CONFIG['new_site_id']), d.id, pw, fullname, email, roles) | |
def migrate_persons(hierarchy_path): | |
from dynamore.contact import config | |
delete_path(hierarchy_path) | |
resource_path, folder_id = hierarchy_path.rsplit('/', 1) | |
create_ct(resource_path, 'richfolder', id=folder_id, title='Persons') | |
user_ids_seen = [] | |
for organization, users in config._users.items(): | |
print organization | |
for user_id, user in users.items(): | |
if user_id in user_ids_seen: | |
continue | |
user_ids_seen.append(user_id) | |
title = u'{0}, {1}'.format( | |
user.get('surname', ''), user.get('forename', '')) | |
keywords = filter(None, user.get('keywords', [])) | |
contacts = [] | |
for name in ('phone', 'mobile', 'fax'): | |
if name in user: | |
contacts.append(dict( | |
number_type=name, | |
country_code=user[name].dict['intnr'], | |
area_code=user[name].dict['cnr'], | |
number=user[name].dict['nr'])) | |
create_ct( | |
hierarchy_path, | |
'dynaperson', | |
id=user_id, | |
title=title, | |
firstname=user.get('forename', ''), | |
lastname=user.get('surname', ''), | |
organizations=[organization], | |
email=user.get('mail', ''), | |
keywords=keywords, | |
contacts=contacts | |
) | |
def migrate_locations(hierarchy_path): | |
from dynamore.contact import config | |
delete_path(hierarchy_path) | |
resource_path, folder_id = hierarchy_path.rsplit('/', 1) | |
create_ct(resource_path, 'richfolder', id=folder_id, title='Locations') | |
for location_id, location in config._establishments.items(): | |
title = u'{0}, {1}'.format( | |
user.get('surname', ''), user.get('forename', '')) | |
contacts = [] | |
for name in ('phone', 'mobile', 'fax'): | |
if name in location: | |
contacts.append(dict( | |
number_type=name, | |
country_code=location[name].dict['intnr'], | |
area_code=location[name].dict['cnr'], | |
number=location[name].dict['nr'])) | |
try: | |
address1 = location['address'][0] | |
except IndexError: | |
address1 = '' | |
try: | |
address2 = location['address'][1] | |
except IndexError: | |
address2 = '' | |
try: | |
address3 = location['address'][2] | |
except IndexError: | |
address3 = '' | |
create_ct( | |
hierarchy_path, | |
'dynalocation', | |
id=location_id, | |
title=location['title'], | |
address1=address1, | |
address2=address2, | |
address3=address3, | |
short_title=location['shorttitle'], | |
organizations=[], | |
contacts=contacts | |
) | |
if __name__ == '__main__': | |
yaml_fn = os.path.abspath(sys.argv[-1]) | |
print 'Reading {0}'.format(yaml_fn) | |
if not os.path.exists(yaml_fn): | |
raise IOError('Migration configuration {0} not found'.format(yaml_fn)) | |
with open(yaml_fn, 'rb') as fp: | |
CONFIG = yaml.load(fp) | |
pprint.pprint(CONFIG) | |
# setup Plone site and security context | |
SITE = getattr(app, CONFIG['old_site_id']) | |
SITE = makerequest(SITE) | |
setSite(SITE) | |
user = app.acl_users.getUser(CONFIG['local_admin_user']) | |
if not user: | |
raise ValueError('No admin account "{0}" found'.format( | |
CONFIG['local_admin_user'])) | |
print 'Setting new security context' | |
newSecurityManager(None, user.__of__(app.acl_users)) | |
# pre-check | |
for path in CONFIG['migrate_folders']: | |
print 'Precheck....{0}'.format(path) | |
folder = SITE.restrictedTraverse(path, None) | |
if folder is None: | |
raise ValueError('Folder {0} does not exist'.format(path)) | |
if query_yes_no('Clear and recreate remote Plone site?'): | |
recreate_remote_plone_site() | |
CATALOG = plone.api.portal.get_tool('portal_catalog') | |
for remote_folder in CONFIG['initial_remote_remove'] or []: | |
print 'Removing remote folder: {0}'.format(remote_folder) | |
delete_path(old2new_path(remote_folder)) | |
for remote_folder in CONFIG['initial_create_folders'] or []: | |
print 'Creating remote folder: {0}'.format(remote_folder) | |
resource_path, folder_id = remote_folder.rsplit('/', 1) | |
create_ct(old2new_path(resource_path), 'Folder', id=folder_id) | |
# Plone accounts | |
if CONFIG['migrate_users']: | |
migrate_users() | |
# `Person` objects | |
if CONFIG['dynamore_contacts_migrate']: | |
migrate_persons(CONFIG['dynamore_contacts_folder']) | |
# `Location` objects | |
if CONFIG['dynamore_locations_migrate']: | |
migrate_locations(CONFIG['dynamore_locations_folder']) | |
for path in CONFIG['migrate_folders']: | |
print '#' * 80 | |
print 'Migrating {0}'.format(path) | |
print '#' * 80 | |
migrate_hierarchy(path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment