Skip to content

Instantly share code, notes, and snippets.

@junftnt
Forked from Millnert/convert.py
Created February 19, 2023 08:34
Show Gist options
  • Select an option

  • Save junftnt/ee481eb6992bc6213c54b408403b2e4f to your computer and use it in GitHub Desktop.

Select an option

Save junftnt/ee481eb6992bc6213c54b408403b2e4f to your computer and use it in GitHub Desktop.
simple glance qcow2->raw conversion
#!/usr/bin/python
""" Glance client to convert QCOW2 images in Glance to RAW """
# from pprint import pprint
from os import environ as env
import collections
import subprocess
# import sys
from glanceclient.v2 import client as glclient
from glanceclient.v2 import shell as glshell
from glanceclient.common import utils
from keystoneauth1.identity import v3 as keystone_v3
from keystoneauth1 import session as keystone_session
from keystoneclient.v3 import client as ksclient
def get_keystone_session():
""" Get a keystone instance """
auth_url = None
cacert = None
username = None
username = None
password = None
project_id = None
project_name = None
# region_name = None
auth_url = env.get('OS_AUTH_URL')
cacert = env.get('OS_CACERT')
username = env.get('OS_USERNAME')
password = env.get('OS_PASSWORD')
project_id = env.get('OS_TENANT_ID', 'default')
project_name = env.get('OS_TENANT_NAME', 'default')
project_domain_id = env.get('OS_PROJECT_DOMAIN_ID', 'default')
user_domain_id = env.get('OS_USER_DOMAIN_ID', 'default')
# http://docs.openstack.org/developer/python-keystoneclient/api/keystoneclient.auth.identity.v3.html#module-keystoneclient.auth.identity.v3.password
# user_domain_id = project_id
# Using parameters {'username': 'admin',
# 'project_id': '4b1903a745484428a816c905ec00a970',
# 'user_domain_id': 'default',
# 'auth_url': 'https://keystone-beta.cloud.ipnett.se/v3',
# 'password': '***', 'project_domain_id': 'default'}
auth = keystone_v3.Password(auth_url=auth_url, username=username,
password=password, project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
user_domain_id=user_domain_id)
# http://docs.openstack.org/developer/python-keystoneclient/api/keystoneclient.html#module-keystoneclient.session
# above deprecated in favour of:
# http://docs.openstack.org/developer/keystoneauth/using-sessions.html#sessions-for-users
session = keystone_session.Session(auth=auth, verify=cacert)
# return keystone
return session
def get_endpoints_generic(session, region, servicename, interface):
""" Gets a generic endpoint filtered by region, servicename and interface
None means do not filter. conditions are AND:ed """
# http://docs.openstack.org/developer/python-keystoneclient/api/keystoneclient.v3.html#module-keystoneclient.v3.client
keystone = ksclient.Client(session=session)
# http://docs.openstack.org/developer/python-keystoneclient/api/keystoneclient.v3.html#keystoneclient.v3.endpoints.EndpointManager
resulting_endpoints = []
region_endpoints = set()
servicename_endpoints = set()
interface_endpoints = set()
endpoints = keystone.endpoints.list()
services = keystone.services.list()
services_dict = {}
for service in services:
services_dict[service.id] = service
for endpoint in endpoints:
service = services_dict.get(endpoint.service_id)
if service is not None:
if servicename is not None and service.name == servicename:
servicename_endpoints.add(endpoint)
elif servicename is None:
servicename_endpoints.add(endpoint)
if region is not None and endpoint.region == region:
region_endpoints.add(endpoint)
elif region is None:
region_endpoints.add(endpoint)
if interface is not None and endpoint.interface == interface:
interface_endpoints.add(endpoint)
elif interface is None:
interface_endpoints.add(endpoint)
resulting_endpoints = list(servicename_endpoints.intersection(region_endpoints).
intersection(interface_endpoints))
return resulting_endpoints
def get_glance_endpoints(keystone, region, interface='public'):
""" Get Glance endpoints from keystone client """
glance_endpoints = get_endpoints_generic(keystone, region, 'glance', interface)
return glance_endpoints
def get_glanceclient():
""" returns a glance client instance """
region = env.get('OS_REGION_NAME')
session = get_keystone_session()
glance_endpoints = get_glance_endpoints(session, region)
if len(glance_endpoints) > 0:
glance = glclient.Client(glance_endpoints[0].url, session=session)
return glance
# A glance image object looks like this
# {u'architecture': u'x86_64',
# u'checksum': u'38d62e2e1909c89f72ba4d5f5c0005d5',
# u'container_format': u'bare',
# u'created_at': u'2016-04-06T19:54:20Z',
# u'direct_url': u'rbd://8ea29b52-232a-4b77-ad71-21e5de70960f/glance-images/
# 66a32ab4-aca9-4a58-b512-249a4ecef9e8/snap',
# u'disk_format': u'qcow2',
# u'file': u'/v2/images/66a32ab4-aca9-4a58-b512-249a4ecef9e8/file',
# u'id': u'66a32ab4-aca9-4a58-b512-249a4ecef9e8',
# u'min_disk': 0,
# u'min_ram': 0,
# u'name': u'fedora-23-cloud-qcow2',
# u'owner': u'4b1903a745484428a816c905ec00a970',
# u'protected': True,
# u'schema': u'/v2/schemas/image',
# u'size': 234363392,
# u'status': u'active',
# u'tags': [],
# u'updated_at': u'2016-04-07T09:10:03Z',
# u'virtual_size': None,
# u'visibility': u'public'},
def get_images(glance):
""" List glance images """
images = [img for img in glance.images.list()]
return images
def filter_images_by_diskformat(images, diskformat):
""" Filters images in given list by diskformat """
filtered_images = [img for img in images if img['disk_format'] == diskformat]
return filtered_images
def download_image(glance, image, filename, progress=False, verbose=False):
""" Download an image """
if verbose:
print "Downloading to %s" % filename
argscol = collections.namedtuple('gcargs', 'id file progress')
args = argscol(id=image['id'], file=filename, progress=progress)
glshell.do_image_download(glance, args)
def convert_image(file_path_name):
""" Converts image, assuming qcow2 file, to raw file at same location """
# Assuming file_path is the path to a local qcow2 file
if file_path_name.endswith('.qcow2'):
raw_file_path_name = file_path_name[:-6] + '.raw'
ret = subprocess.call(['qemu-img', 'convert', file_path_name,
raw_file_path_name])
if ret != 0:
raise Exception("Conversion did not work. " \
"Try it manually: qemu-img convert %s %s" %
(file_path_name, raw_file_path_name))
return raw_file_path_name
else:
raise Exception("File given does not have qcow2 ending")
def create_clone_image(glance, source_image):
""" Create a clone of a new image and return the new id """
#name = source_image.get("name") + "-test"
parameters = [
"architecture",
"container_format",
"name",
"min_disk",
"min_ram",
"protected",
"tags",
"visibility"
]
clone_dict = {}
for parameter in parameters:
if parameter in source_image:
clone_dict[parameter] = source_image[parameter]
clone_dict["disk_format"] = "raw"
image = glance.images.create(**clone_dict)
return image
def upload_image(glance, image, filename, progress=False, verbose=False):
""" Upload an image """
if verbose:
print "Uploading %s to %s (%s)" % (filename, image['name'], image['id'])
argscol = collections.namedtuple('gcargs', 'id file size progress')
args = argscol(id=image['id'], file=filename, size=None, progress=progress)
glshell.do_image_upload(glance, args)
def process_image(glance, image, workingpath=".", progress=False, verbose=True):
""" Downloads, converts, clone-creates, uploads one image """
# if "converted-to-raw" in image["tags"]:
# return
if verbose:
print "Processing %s (%s) [%s]" % (image['name'], image['id'],
utils.make_size_human_readable(image['size']))
temp_filename = "%s/%s.qcow2" % (workingpath, image.get("id"))
download_image(glance, image, temp_filename, progress=progress,
verbose=verbose)
if verbose:
print "Converting image"
converted_filename = convert_image(temp_filename)
clone_image = create_clone_image(glance, image)
upload_image(glance, clone_image, converted_filename, progress=progress,
verbose=verbose)
# image['tags'] = image['tags'].append("converted-to-raw")
# doesn't work. probably looking for other property feature, but
# "property" doesn't seem to work either
def main():
""" Main routine """
workingdir = "."
glance = get_glanceclient()
images = get_images(glance)
qcow2_images = filter_images_by_diskformat(images, 'qcow2')
# pprint(qcow2_images)
for image in qcow2_images:
process_image(glance, image, workingpath=workingdir, progress=True,
verbose=True)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment