Last active
July 8, 2018 12:23
-
-
Save remyj38/d20b8fb0c7a332b7bdaad5ea4e30e9f3 to your computer and use it in GitHub Desktop.
GitLab repositories backup
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
''' | |
Clone and update all gitlab repositories, based on defined filters (current are your own private repsitories) | |
Repositories are cloned using SSH and SSH_KEY (automaticaly generated if not exists). | |
The SSH key will be automaticaly added as deployement key on each project | |
Requires : | |
- gitpython | |
- python-gitlab | |
Exit codes: | |
- 1: Unknown error | |
- 2: Unable to create BACKUP_DIRECTORY | |
- 3: BACKUP_DIRECTORY is not a directory | |
- 4: BACKUP_DIRECTORY is not writable | |
''' | |
import base64 | |
import git | |
import gitlab | |
import hashlib | |
import os | |
import socket | |
import sys | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from cryptography.hazmat.backends import default_backend | |
# Configuration | |
API_TOKEN = '' | |
BACKUP_DIRECTORY = '/mnt/backup' | |
DEFAULT_PERMISSION=0o775 # folders permissions | |
EXCLUSION_LIST = [] | |
FILTERS = dict(owned=True, visibility='private') | |
SSH_KEY = '/home/remy/.gitlab_deploy_key' # SSH private key used for cloning repositories. Public key is SSH_KEY+'.pub' | |
# End of configuration | |
# Defining variables | |
gl = gitlab.Gitlab('https://gitlab.com', private_token=API_TOKEN) | |
result = [] | |
gitEnv = {'GIT_SSH_COMMAND': 'ssh -i %s' %SSH_KEY} | |
# Generate public key fingerprint | |
# Source : https://stackoverflow.com/a/6682934 | |
def generate_fingerprint(key): | |
key = base64.b64decode(key.strip().split()[1].encode('ascii')) | |
fp_plain = hashlib.md5(key).hexdigest() | |
return ':'.join(a+b for a,b in zip(fp_plain[::2], fp_plain[1::2])) | |
# Generating SSH key if not exists | |
if not os.path.exists(SSH_KEY): | |
key = rsa.generate_private_key(backend=default_backend(), public_exponent=65537, key_size=4096) | |
with open(SSH_KEY, 'w') as content_file: | |
os.chmod(SSH_KEY, 0o600) | |
content_file.write(key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()).decode('utf-8')) | |
with open(SSH_KEY+'.pub', 'w') as content_file: | |
content_file.write(key.public_key().public_bytes(serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH).decode('utf-8')) | |
del key | |
# Generating public key fingerprint | |
with open(SSH_KEY+'.pub', 'r') as sshPublicKeyFile: | |
sshPublicKey = sshPublicKeyFile.read() | |
publicKeyFingerprint = generate_fingerprint(sshPublicKey) | |
# Verifying target folder | |
if not os.path.exists(BACKUP_DIRECTORY): | |
try: | |
os.mkdir(BACKUP_DIRECTORY, mode=DEFAULT_PERMISSION) | |
except: | |
print('%s does not exists and cannot be created'%BACKUP_DIRECTORY) | |
exit(2) | |
if not os.path.isdir(BACKUP_DIRECTORY): | |
print('%S is not a directory'%BACKUP_DIRECTORY) | |
exit(3) | |
if not os.access(BACKUP_DIRECTORY, os.W_OK): | |
print('%s is not writable'%BACKUP_DIRECTORY) | |
exit(4) | |
backupDirContent = os.listdir(BACKUP_DIRECTORY) | |
projectList = gl.projects.list(**FILTERS) | |
for project in projectList: | |
# Go to next project if this one is on exclusion list | |
if project.path_with_namespace in EXCLUSION_LIST: | |
print('%s excluded'%project.name_with_namespace) | |
result.append('%s excluded'%project.name_with_namespace) | |
continue | |
# Setting deploy key on project | |
isKeyPresent = False | |
for key in project.keys.list(): | |
if generate_fingerprint(key.key) == publicKeyFingerprint: | |
isKeyPresent = True | |
break | |
if not isKeyPresent: | |
project.keys.create({'title': 'Backup from %s'%socket.gethostname(), 'key': sshPublicKey}) | |
print('Backup of %s started'%project.name_with_namespace) | |
# Creating and checking access rights of namespace folder | |
projectNamespacePath = os.path.join(BACKUP_DIRECTORY, project.namespace['path']) | |
if not os.path.exists(projectNamespacePath): | |
try: | |
os.mkdir(projectNamespacePath, mode=DEFAULT_PERMISSION) | |
except PermissionError: | |
os.chmod(BACKUP_DIRECTORY, mode=DEFAULT_PERMISSION) | |
try: | |
os.mkdir(projectNamespacePath, mode=DEFAULT_PERMISSION) | |
except Exception as ex: | |
print(ex) | |
exit(1) | |
except Exception as ex: | |
print(ex) | |
exit(1) | |
if not os.access(projectNamespacePath, os.W_OK): | |
os.chmod(projectNamespacePath, mode=DEFAULT_PERMISSION) | |
projectFullPath = '%s.git'%os.path.join(projectNamespacePath, project.path) | |
# If project not exists, cloning | |
if not os.path.exists(projectFullPath): | |
print('Cloning repository from %s'%project.ssh_url_to_repo) | |
try: | |
git.Repo.clone_from(project.ssh_url_to_repo, projectFullPath, mirror=True, env=gitEnv) | |
result.append('%s cloned'%project.name_with_namespace) | |
except: | |
print('Failed to clone %s'%project.name_with_namespace) | |
result.append('%s: Failed to clone repo'%project.name_with_namespace) | |
# Else, fetch new commits | |
else: | |
print('Updating repository') | |
repo = git.Repo(projectFullPath) | |
try: | |
repo.remote().fetch(env=gitEnv) | |
result.append('%s updated'%project.name_with_namespace) | |
except: | |
print('Failed to update %s'%project.name_with_namespace) | |
result.append('%s: failed to update'%project.name_with_namespace) | |
print('Backup of %s done'%project.name_with_namespace) | |
result.sort() | |
print('-----------------------------------------------------------------------') | |
print('Summary:') | |
for line in result: | |
print(line) | |
if any("Failed to" in line for line in result): | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment