Skip to content

Instantly share code, notes, and snippets.

@remyj38
Last active July 8, 2018 12:23
Show Gist options
  • Save remyj38/d20b8fb0c7a332b7bdaad5ea4e30e9f3 to your computer and use it in GitHub Desktop.
Save remyj38/d20b8fb0c7a332b7bdaad5ea4e30e9f3 to your computer and use it in GitHub Desktop.
GitLab repositories backup
#!/usr/bin/python3
# -*- coding: utf-8 -*-
'''
Clone and update all gitlab repositories, based on defined filters (current are your own private repsitories)
Repositories are cloned using SSH and SSH_KEY (automaticaly generated if not exists).
The SSH key will be automaticaly added as deployement key on each project
Requires :
- gitpython
- python-gitlab
Exit codes:
- 1: Unknown error
- 2: Unable to create BACKUP_DIRECTORY
- 3: BACKUP_DIRECTORY is not a directory
- 4: BACKUP_DIRECTORY is not writable
'''
import base64
import git
import gitlab
import hashlib
import os
import socket
import sys
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
# Configuration
API_TOKEN = ''
BACKUP_DIRECTORY = '/mnt/backup'
DEFAULT_PERMISSION=0o775 # folders permissions
EXCLUSION_LIST = []
FILTERS = dict(owned=True, visibility='private')
SSH_KEY = '/home/remy/.gitlab_deploy_key' # SSH private key used for cloning repositories. Public key is SSH_KEY+'.pub'
# End of configuration
# Defining variables
gl = gitlab.Gitlab('https://gitlab.com', private_token=API_TOKEN)
result = []
gitEnv = {'GIT_SSH_COMMAND': 'ssh -i %s' %SSH_KEY}
# Generate public key fingerprint
# Source : https://stackoverflow.com/a/6682934
def generate_fingerprint(key):
key = base64.b64decode(key.strip().split()[1].encode('ascii'))
fp_plain = hashlib.md5(key).hexdigest()
return ':'.join(a+b for a,b in zip(fp_plain[::2], fp_plain[1::2]))
# Generating SSH key if not exists
if not os.path.exists(SSH_KEY):
key = rsa.generate_private_key(backend=default_backend(), public_exponent=65537, key_size=4096)
with open(SSH_KEY, 'w') as content_file:
os.chmod(SSH_KEY, 0o600)
content_file.write(key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()).decode('utf-8'))
with open(SSH_KEY+'.pub', 'w') as content_file:
content_file.write(key.public_key().public_bytes(serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH).decode('utf-8'))
del key
# Generating public key fingerprint
with open(SSH_KEY+'.pub', 'r') as sshPublicKeyFile:
sshPublicKey = sshPublicKeyFile.read()
publicKeyFingerprint = generate_fingerprint(sshPublicKey)
# Verifying target folder
if not os.path.exists(BACKUP_DIRECTORY):
try:
os.mkdir(BACKUP_DIRECTORY, mode=DEFAULT_PERMISSION)
except:
print('%s does not exists and cannot be created'%BACKUP_DIRECTORY)
exit(2)
if not os.path.isdir(BACKUP_DIRECTORY):
print('%S is not a directory'%BACKUP_DIRECTORY)
exit(3)
if not os.access(BACKUP_DIRECTORY, os.W_OK):
print('%s is not writable'%BACKUP_DIRECTORY)
exit(4)
backupDirContent = os.listdir(BACKUP_DIRECTORY)
projectList = gl.projects.list(**FILTERS)
for project in projectList:
# Go to next project if this one is on exclusion list
if project.path_with_namespace in EXCLUSION_LIST:
print('%s excluded'%project.name_with_namespace)
result.append('%s excluded'%project.name_with_namespace)
continue
# Setting deploy key on project
isKeyPresent = False
for key in project.keys.list():
if generate_fingerprint(key.key) == publicKeyFingerprint:
isKeyPresent = True
break
if not isKeyPresent:
project.keys.create({'title': 'Backup from %s'%socket.gethostname(), 'key': sshPublicKey})
print('Backup of %s started'%project.name_with_namespace)
# Creating and checking access rights of namespace folder
projectNamespacePath = os.path.join(BACKUP_DIRECTORY, project.namespace['path'])
if not os.path.exists(projectNamespacePath):
try:
os.mkdir(projectNamespacePath, mode=DEFAULT_PERMISSION)
except PermissionError:
os.chmod(BACKUP_DIRECTORY, mode=DEFAULT_PERMISSION)
try:
os.mkdir(projectNamespacePath, mode=DEFAULT_PERMISSION)
except Exception as ex:
print(ex)
exit(1)
except Exception as ex:
print(ex)
exit(1)
if not os.access(projectNamespacePath, os.W_OK):
os.chmod(projectNamespacePath, mode=DEFAULT_PERMISSION)
projectFullPath = '%s.git'%os.path.join(projectNamespacePath, project.path)
# If project not exists, cloning
if not os.path.exists(projectFullPath):
print('Cloning repository from %s'%project.ssh_url_to_repo)
try:
git.Repo.clone_from(project.ssh_url_to_repo, projectFullPath, mirror=True, env=gitEnv)
result.append('%s cloned'%project.name_with_namespace)
except:
print('Failed to clone %s'%project.name_with_namespace)
result.append('%s: Failed to clone repo'%project.name_with_namespace)
# Else, fetch new commits
else:
print('Updating repository')
repo = git.Repo(projectFullPath)
try:
repo.remote().fetch(env=gitEnv)
result.append('%s updated'%project.name_with_namespace)
except:
print('Failed to update %s'%project.name_with_namespace)
result.append('%s: failed to update'%project.name_with_namespace)
print('Backup of %s done'%project.name_with_namespace)
result.sort()
print('-----------------------------------------------------------------------')
print('Summary:')
for line in result:
print(line)
if any("Failed to" in line for line in result):
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment