Last active
August 29, 2015 13:57
-
-
Save rjeschke/868a8cc071bb9f31c29b to your computer and use it in GitHub Desktop.
WIP of finally working git-for-hg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# vim:fileencoding=utf-8:et:ts=4:sts=4 | |
# | |
# Copyright 2014 René Jeschke <[email protected]> | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
""" | |
hgit.py - A script for using git to work with mercurial repositories | |
TODOs: | |
- remove empty folders in working dir (on both sides) | |
- maybe copy subrepos instead of cloning them twice | |
- think about migrating git tags back to hg | |
@author: René Jeschke <[email protected]> | |
""" | |
import shutil | |
import os | |
import errno | |
import re | |
import sys | |
import subprocess | |
import posixpath | |
import ConfigParser | |
import argparse | |
from tempfile import NamedTemporaryFile | |
DEFAULT_HG_IGNORES = [ | |
".gitignore", ".hgit.tracking", ".hgit.config", ".hgit.repo/", ".git/"] | |
DEFAULT_GIT_IGNORES = [ | |
".gitignore", ".hgit.tracking", ".hgit.config", ".hgit.repo/", ".hg/"] | |
# | |
# | |
# Utilities | |
# | |
# | |
def remove_dir_or_file(path): | |
if os.path.islink(path): | |
os.unlink(path) | |
elif os.path.isdir(path): | |
os.rmdir(path) | |
else: | |
os.remove(path) | |
def read_lines(filename): | |
"""Reads a file line-by-line without CR/LF.""" | |
if os.path.exists(filename): | |
return [line.rstrip("\n\r") for line in open(filename, "r")] | |
return [] | |
def read_user_mapping(): | |
"""Reads a user mapping file for Mercurial->Git conversion.""" | |
lines = read_lines(os.path.expanduser("~/.hgit-user-mapping")) | |
ret = {} | |
for line in lines: | |
line.strip() | |
if len(line) == 0 or line[0] == "#": | |
continue | |
[a, b] = [t.strip() for t in line.split("=")] | |
ret[a] = b | |
return ret | |
def copy_mod(src, dst): | |
"""Copies mode flags from src to dst.""" | |
if os.path.exists(src) and os.path.exists(dst): | |
os.chmod(dst, os.stat(src).st_mode & 0o777) | |
def query_yes_no(question, default="yes"): | |
"""Ask a yes/no question via raw_input() and return their answer. | |
"question" is a string that is presented to the user. | |
"default" is the presumed answer if the user just hits <Enter>. | |
It must be "yes" (the default), "no" or None (meaning | |
an answer is required of the user). | |
The "answer" return value is one of "yes" or "no". | |
""" | |
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} | |
if default is None: | |
prompt = " [y/n] " | |
elif default == "yes": | |
prompt = " [Y/n] " | |
elif default == "no": | |
prompt = " [y/N] " | |
else: | |
raise ValueError("invalid default answer: '%s'" % default) | |
while True: | |
sys.stdout.write(question + prompt) | |
choice = raw_input().lower() | |
if default is not None and choice == '': | |
return valid[default] | |
elif choice in valid: | |
return valid[choice] | |
else: | |
sys.stdout.write( | |
"Please respond with 'yes' or 'no' (or 'y' or 'n').\n") | |
def cmd(args): | |
"""Executes a shell command, raises IOError on error""" | |
if subprocess.call(args): | |
raise IOError("Failed on: " + " ".join(args)) | |
def cmd_in(args): | |
"""Executes a shell command, returning its output""" | |
return subprocess.check_output(args) | |
def maybe_hgit_folder(): | |
"""Checks if the current folder contains '.git' or '.hgit.repo'""" | |
current = os.path.abspath(os.curdir) | |
return os.path.exists(os.path.join(current, ".git")) or os.path.exists(os.path.join(current, ".hgit.repo")) | |
def is_hgit_folder(): | |
"""Checks if the current folder contains both '.git' and '.hgit.repo'""" | |
current = os.path.abspath(os.curdir) | |
return os.path.exists(os.path.join(current, ".git")) and os.path.exists(os.path.join(current, ".hgit.repo")) | |
def find_repo_base_path(): | |
"""Finds a valid repository base path by traversing upwards.""" | |
current = os.path.abspath(os.curdir) | |
while True: | |
if os.path.exists(os.path.join(current, ".git")) and os.path.exists(os.path.join(current, ".hgit.repo")): | |
return current | |
parent = os.path.dirname(current) | |
if parent == current or len(parent) == 0: | |
raise IOError("Can't find githg repository") | |
current = parent | |
def get_git_meta_path(): | |
"""Returns the path to '.git'""" | |
return os.path.join(find_repo_base_path(), ".git") | |
def get_hg_repo_path(): | |
"""Returns the path to '.hgit.repo'""" | |
return os.path.join(find_repo_base_path(), ".hgit.repo") | |
def get_hg_tracking_path(): | |
"""Returns the path to '.hgit.tracking'""" | |
return os.path.join(find_repo_base_path(), ".hgit.tracking") | |
def get_config_path(): | |
"""Returns the path to '.hgit.config'""" | |
return os.path.join(find_repo_base_path(), ".hgit.config") | |
def read_change_map(): | |
"""Reads the mapping from hg to git revisions as a list of tuples.""" | |
mapping = [] | |
for line in open(get_hg_tracking_path(), "r"): | |
mapping.append(tuple(l.strip() for l in line.split("="))) | |
return mapping | |
def write_change_map(mapping): | |
"""Writes the mapping from hg to git revisions from a (sorted) list of tuples.""" | |
with open(get_hg_tracking_path(), "w") as fd: | |
for m in mapping: | |
(a, b) = m | |
fd.write(a + "=" + b + "\n") | |
def read_config(): | |
"""Reads '.hgit.config'""" | |
config = ConfigParser.RawConfigParser() | |
config.read(get_config_path()) | |
return config | |
def write_config(config): | |
"""Writes '.hgit.config'""" | |
with open(get_config_path(), "wb") as fd: | |
config.write(fd) | |
def mkdirs(path): | |
"""mkdir -p""" | |
try: | |
os.makedirs(path) | |
except OSError as e: | |
if e.errno != errno.EEXIST or not os.path.isdir(path): | |
raise | |
def map_hg_userid(userid): | |
ret = {} | |
idx = userid.find("<") | |
name = "" | |
email = "" | |
if idx != -1: | |
name = userid[:idx].strip() | |
email = userid[idx + 1: -1].strip() | |
else: | |
if userid.find("@") != -1: | |
email = userid | |
else: | |
name = userid | |
if len(name) == 0 and len(email) > 0: | |
idx = email.find("@") | |
if idx != -1: | |
name = email[:idx].strip() | |
if len(name) == 0: | |
name = "John Doe" | |
if len(email) == 0: | |
email = "[email protected]" | |
ret["name"] = name | |
ret["email"] = email | |
return ret | |
def normurl(url): | |
"""Normalizes a repository web url (resolving relative path components).""" | |
idx = url.find("//") | |
if idx != -1: | |
path = os.path.normpath(url[idx + 1:]) | |
return url[:idx + 1] + path | |
else: | |
idx = url.find(":") | |
if idx == -1: | |
raise IOError("Unsupported url: " + url) | |
path = os.path.normpath("/" + url[idx + 1:]) | |
return url[:idx + 1] + path[1:] | |
def make_subrepo_path(basePath, repoPath): | |
"""Creates the mercurial clone URL for a sup repository.""" | |
if "@" in repoPath or re.match("[a-zA-z]+:/+", repoPath): | |
return repoPath | |
return normurl(posixpath.join(basePath, repoPath)) | |
def get_repo_name(url): | |
"""Gets a repository name from an URL.""" | |
if url[-1] in r"\/": | |
url = url[:-1] | |
return os.path.basename(url) | |
# | |
# | |
# Mercurial stuff | |
# | |
# | |
def hg_install_local_hgignore(ignores=DEFAULT_HG_IGNORES): | |
"""Installs a local hgignore file.""" | |
config = ConfigParser.RawConfigParser() | |
hgrc = os.path.join(get_hg_repo_path(), ".hg", "hgrc") | |
hgignore = os.path.join(get_hg_repo_path(), ".hg", "hgignore") | |
config.read(hgrc) | |
if not config.has_section("ui"): | |
config.add_section("ui") | |
config.set("ui", "ignore", hgignore) | |
with open(hgrc, "wb") as fd: | |
config.write(fd) | |
with open(hgignore, "wb") as fd: | |
fd.write("syntax: glob\n\n") | |
for ign in ignores: | |
fd.write(ign + "\n") | |
def hg_get_revisions(): | |
"""Returns a list of all changesets in a hg repository.""" | |
revs = [] | |
for line in subprocess.check_output(["hg", "log"]).split("\n"): | |
line = line.strip() | |
if re.match("changeset:", line): | |
revs.append(line[10:].strip()) | |
return revs | |
def hg_get_log_for(rev): | |
"""Returns a verbose log about a given hg changeset.""" | |
log = [line.rstrip() for line in subprocess.check_output( | |
["hg", "log", "-v", "-r", rev]).split("\n")] | |
ret = dict() | |
cmt = "" | |
inDesc = False | |
for line in log: | |
if inDesc: | |
cmt += line + "\n" | |
else: | |
if re.match("description:", line): | |
inDesc = True | |
else: | |
idx = line.find(":") | |
if idx <= 0: | |
inDesc = True | |
# Removed to allow for empty/missing 'description' | |
# raise IOError("Unsupported hg log output: " + line) | |
else: | |
ret[line[:idx]] = line[idx + 1:].strip() | |
cmt = cmt.rstrip() | |
if len(cmt) == 0: | |
cmt = "***empty-commit-message***" | |
ret["description"] = cmt | |
return ret | |
def hg_get_diff_stats(prevRev, nextRev): | |
"""Returns hg diff stats between the given revisions.""" | |
return [line.strip() for line in cmd_in(["hg", "status", "--rev", prevRev, "--rev", nextRev]).split("\n")] | |
def hg_get_subrepositories(): | |
"""Gets all listed hg subrepositories.""" | |
hgsub = read_lines(".hgsub") | |
hgsubstate = read_lines(".hgsubstate") | |
if len(hgsub) == 0: | |
return dict() | |
subs = dict() | |
for repo in hgsub: | |
r = [v.strip() for v in repo.split("=")] | |
entry = dict() | |
entry["os-path"] = r[0] | |
entry["hg-path"] = r[1] | |
subs[r[0]] = entry | |
for repo in hgsubstate: | |
r = [v.strip() for v in repo.split(" ")] | |
if not r[1] in subs: | |
raise IOError("Corrupt .hgsubstate, unmatched repo: " + r[1]) | |
subs[r[1]]["revision"] = r[0] | |
return subs | |
def hg_get_default_repo_path(): | |
"""Gets the hg default repo base path.""" | |
for line in cmd_in(["hg", "paths"]).split("\n"): | |
line = line.strip() | |
if re.match("default", line): | |
return line[line.find("=") + 1:].strip() | |
raise IOError("No default path found") | |
def hg_replay(prevChange, changeset, userMap={}): | |
"""Plays back a single changeset from mercurial.""" | |
gitRepo = find_repo_base_path() | |
hgRepo = get_hg_repo_path() | |
curDir = os.path.abspath(os.curdir) | |
os.chdir(hgRepo) | |
log = hg_get_log_for(changeset) | |
diff = hg_get_diff_stats(prevChange, changeset) | |
cmd(["hg", "update", "-r", changeset]) | |
changed = False | |
for line in diff: | |
if len(line): | |
changed = True | |
f = line[2:] | |
of = os.path.join(gitRepo, f) | |
if line[0] in "AM": | |
if os.path.islink(f): | |
lnk = os.readlink(f) | |
os.chdir(gitRepo) | |
os.symlink(lnk, f) | |
os.chdir(hgRepo) | |
elif os.path.isfile(f): | |
mkdirs(os.path.dirname(of)) | |
shutil.copyfile(f, of) | |
else: | |
mkdirs(of) | |
copy_mod(f, of) | |
elif line[0] in "R": | |
remove_dir_or_file(of) | |
else: | |
raise IOError("Unknown mode: " + line) | |
ret = None | |
os.chdir(gitRepo) | |
changed = changed and git_is_dirty() | |
if changed: | |
tmp = NamedTemporaryFile(delete=False) | |
tmp.write(log.get("description")) | |
tmp.close() | |
user = log.get("user") | |
if user in userMap: | |
user = userMap[user] | |
usr = map_hg_userid(user) | |
os.environ["GIT_COMMITTER_DATE"] = log.get("date") | |
os.environ["GIT_COMMITTER_NAME"] = usr["name"] | |
os.environ["GIT_COMMITTER_EMAIL"] = usr["email"] | |
cmd(["git", "add", "-A", "."]) | |
cmd(["git", "commit", "-F", tmp.name, '--author="' + usr["name"] + | |
' <' + usr["email"] + '>"', '--date="' + log.get("date") + '"']) | |
os.remove(tmp.name) | |
ret = tuple([log.get("changeset"), git_get_last_commit()]) | |
if "tag" in log and log.get("tag") != "tip": | |
cmd(["git", "tag", log.get("tag")]) | |
del os.environ["GIT_COMMITTER_DATE"] | |
del os.environ["GIT_COMMITTER_NAME"] | |
del os.environ["GIT_COMMITTER_EMAIL"] | |
os.chdir(curDir) | |
return ret | |
def hg_update_subrepos(): | |
"""Updates all mercurial subrepositories.""" | |
gitRepo = find_repo_base_path() | |
os.chdir(gitRepo) | |
baseurl = read_config().get("hg", "repo-url") | |
subrepos = hg_get_subrepositories() | |
for path, sub in subrepos.iteritems(): | |
if not os.path.exists(path): | |
repoUrl = make_subrepo_path(baseurl, sub["hg-path"]) | |
mkdirs(path) | |
git_add_entry_to_local_gitignore(path) | |
os.chdir(path) | |
cmd(["hg", "clone", repoUrl, "."]) | |
else: | |
os.chdir(path) | |
cmd(["hg", "update", "-r", "tip"]) | |
cmd(["hg", "pull", "-u"]) | |
if "revision" in sub: | |
cmd(["hg", "update", "-r", sub["revision"]]) | |
os.chdir(gitRepo) | |
def hg_update_subrepo_revisions(): | |
"""Updates all mercurial subrepository revisions.""" | |
gitRepo = find_repo_base_path() | |
os.chdir(gitRepo) | |
subrepos = hg_get_subrepositories() | |
for path, sub in subrepos.iteritems(): | |
if os.path.exists(path) and "revision" in sub: | |
os.chdir(path) | |
cmd(["hg", "update", "-r", sub["revision"]]) | |
os.chdir(gitRepo) | |
def hg_update(): | |
"""Fetches changes from the remote mercurial repository.""" | |
gitRepo = find_repo_base_path() | |
hgRepo = get_hg_repo_path() | |
change_map = read_change_map() | |
os.chdir(gitRepo) | |
branch = git_get_current_branch() | |
os.chdir(hgRepo) | |
cmd(["hg", "purge", "--all", "--config", "extensions.purge="]) | |
cmd(["hg", "pull", "-u"]) | |
changes = hg_get_revisions() | |
changes.reverse() | |
todo = changes[len(change_map):] | |
userMap = read_user_mapping() | |
os.chdir(gitRepo) | |
if len(todo) > 0: | |
cmd(["git", "checkout", "_hg_downstream"]) | |
last_rev = "null:0" | |
if len(change_map) > 0: | |
(last_rev, _) = change_map[-1] | |
for change in todo: | |
print "> Migrating " + change | |
chg = hg_replay(last_rev, change, userMap) | |
if chg: | |
change_map.append(chg) | |
last_rev = change | |
os.chdir(hgRepo) | |
cmd(["hg", "update", "-r", "tip"]) | |
os.chdir(gitRepo) | |
write_change_map(change_map) | |
cmd(["git", "checkout", branch]) | |
# | |
# | |
# Git stuff | |
# | |
# | |
def migrate_hgignore(): | |
"""Migrates the .hgignore file to a .gitignore.""" | |
curDir = os.path.abspath(os.curdir) | |
ignores = [] | |
os.chdir(get_hg_repo_path()) | |
isRegex = True | |
for line in read_lines(".hgignore"): | |
line = line.strip() | |
if len(line) == 0 or line[0] == "#": | |
ignores.append(line) | |
elif re.match("syntax:", line): | |
isRegex = line[7:].strip() != "glob" | |
elif re.match("glob:", line): | |
ignores.append(line[5:].strip()) | |
elif not isRegex: | |
ignores.append(line) | |
os.chdir(find_repo_base_path()) | |
if len(ignores) > 0: | |
with open(".gitignore", "w") as fd: | |
for ign in ignores: | |
fd.write(ign + "\n") | |
os.chdir(curDir) | |
def git_is_dirty(): | |
"""Checks whether the current working dir is dirty or not.""" | |
return len(subprocess.check_output(["git", "status", "--porcelain"]).strip()) > 0 | |
def git_get_revisions(): | |
"""Returns a list of all changesets in a git repository.""" | |
revs = [] | |
for line in subprocess.check_output(["git", "log"]).split("\n"): | |
line = line.strip() | |
if re.match("commit ", line): | |
revs.append(line[7:].strip()) | |
return revs | |
def git_get_last_commit(): | |
"""Returns the hash for the last commit.""" | |
lines = [line.strip() | |
for line in subprocess.check_output(["git", "log", "-1"]).split("\n")] | |
return lines[0][7:].strip() | |
def git_get_current_branch(): | |
"""Returns the current branch of a git repository.""" | |
return subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"]).strip() | |
def git_get_log_for(rev): | |
"""Returns a verbose log about a given git revision.""" | |
log = [line.rstrip() for line in subprocess.check_output( | |
["git", "log", "-1", rev]).split("\n")] | |
ret = dict() | |
cmt = "" | |
inDesc = False | |
for line in log: | |
if inDesc: | |
if line.startswith(" "): | |
cmt += line[4:] + "\n" | |
else: | |
cmt += line + "\n" | |
else: | |
if len(line) == 0: | |
inDesc = True | |
else: | |
if line.startswith("commit "): | |
ret["changeset"] = line[6:].strip() | |
elif line.startswith("Author:"): | |
ret["user"] = line[7:].strip() | |
elif line.startswith("Date:"): | |
ret["date"] = line[5:].strip() | |
else: | |
raise IOError("Unsupported git log output: " + line) | |
ret["description"] = cmt.rstrip() | |
return ret | |
def git_read_local_gitignore(): | |
"""Reads the local .gitignore file.""" | |
ignores = set() | |
for line in read_lines(".git/info/exclude"): | |
line.strip() | |
if len(line) > 0 and not (line[0] in "#"): | |
ignores.add(line) | |
return ignores | |
def git_write_local_gitignore(ignores=DEFAULT_GIT_IGNORES): | |
"""Writes 'ignores' into the local .gitignore file.""" | |
igs = set(ignores) | |
with open(".git/info/exclude", "wb") as fd: | |
fd.write("# Automagically generated\n") | |
for line in igs: | |
fd.write(line + "\n") | |
def git_add_entry_to_local_gitignore(entry): | |
"""Add the given entry to the local .gitignore file.""" | |
ignores = git_read_local_gitignore() | |
ignores.add(entry) | |
git_write_local_gitignore(ignores) | |
def git_create_repository(name): | |
"""Creates an empty git repository and sets up basic local ignores.""" | |
os.mkdir(name) | |
os.chdir(name) | |
cmd(["git", "init"]) | |
git_write_local_gitignore() | |
def git_replay(branch): | |
"""Replays the changes in the current branch into the hg repository.""" | |
gitRepo = find_repo_base_path() | |
hgRepo = get_hg_repo_path() | |
os.chdir(gitRepo) | |
for line in subprocess.check_output(["git", "diff", "--name-status", "_hg_downstream.." + branch]).split("\n"): | |
line.strip() | |
if len(line) == 0: | |
continue | |
if line[0] in "ACDMT": | |
f = line[1:].strip() | |
if line[0] == "D": | |
os.chdir(hgRepo) | |
cmd(["hg", "remove", f]) | |
os.chdir(gitRepo) | |
else: | |
of = os.path.join(hgRepo, f) | |
mkdirs(os.path.dirname(of)) | |
with open(of, "wb") as fd: | |
if subprocess.call(["git", "show", branch + ":" + f], stdout=fd): | |
raise IOError("Failed to extract file: " + f) | |
copy_mod(f, of) | |
else: | |
raise IOError("Unsupported operation: " + line) | |
def migrate_hg_repo(cloneSubs=True, limit=-1): | |
"""Migrates a mercurial repository into a git repository, also checking out submodules.""" | |
gitRepo = os.path.abspath(os.curdir) | |
hgRepo = os.path.abspath(".hgit.repo") | |
os.chdir(hgRepo) | |
hg_install_local_hgignore() | |
changes = hg_get_revisions() | |
changes.reverse() | |
last_rev = "null:0" | |
print "Migrating " + str(len(changes)) + " changesets ..." | |
mapping = [] | |
userMap = read_user_mapping() | |
if limit > 0: | |
todo = min(len(changes), limit) | |
to_skip = len(changes) - todo | |
else: | |
to_skip = 0 | |
pos = 0 | |
for change in changes: | |
if pos < to_skip: | |
print "> Skipping " + change | |
mapping.append(tuple([change, "none"])) | |
else: | |
print "> Migrating " + change | |
chg = hg_replay(last_rev, change, userMap) | |
if chg: | |
mapping.append(chg) | |
last_rev = change | |
else: | |
mapping.append(tuple([change, "none"])) | |
pos += 1 | |
cmd(["hg", "update", "-r", "tip"]) | |
write_change_map(mapping) | |
migrate_hgignore() | |
os.chdir(hgRepo) | |
basePath = hg_get_default_repo_path() | |
if cloneSubs: | |
print "Checking for hg subrepositories ..." | |
subs = hg_get_subrepositories() | |
if len(subs) > 0: | |
print "> Found " + str(len(subs)) + " subrepositories, cloning ..." | |
for path, sub in subs.iteritems(): | |
os.chdir(gitRepo) | |
git_add_entry_to_local_gitignore(path) | |
mkdirs(path) | |
os.chdir(path) | |
repoUrl = make_subrepo_path(basePath, sub["hg-path"]) | |
print "> Fetching " + repoUrl | |
if "revision" in sub: | |
cmd(["hg", "clone", "-r", sub["revision"], repoUrl, "."]) | |
else: | |
cmd(["hg", "clone", repoUrl, "."]) | |
os.chdir(gitRepo) | |
else: | |
print "> None found" | |
os.chdir(gitRepo) | |
config = read_config() | |
config.add_section("hg") | |
config.set("hg", "repo-url", basePath) | |
write_config(config) | |
cmd(["git", "checkout", "-b", "_hg_downstream"]) | |
cmd(["git", "checkout", "master"]) | |
# | |
# | |
# Main/commands | |
# | |
# | |
def cmd_clone(url, name=None, limit=-1): | |
"""Clones a mercurial repository as hgit repository.""" | |
if maybe_hgit_folder(): | |
raise IOError( | |
"The current directory contains either a '.git' or a '.hgit.repo' folder, aborting") | |
repoName = name or get_repo_name(url) | |
git_create_repository(repoName) | |
cmd(["hg", "clone", url, ".hgit.repo"]) | |
migrate_hg_repo(True, limit) | |
def cmd_update_subrepos(revsOnly=False): | |
"""Updates mercurial subrepositories.""" | |
if git_get_current_branch() == "_hg_downstream": | |
raise IOError("Can't work on downstream branch") | |
if revsOnly: | |
hg_update_subrepo_revisions() | |
else: | |
hg_update_subrepos() | |
def cmd_switch_branch(branch): | |
"""Switches to the given git branch with subrepository revision updating.""" | |
gitRepo = find_repo_base_path() | |
os.chdir(gitRepo) | |
if branch == "_hg_downstream": | |
raise IOError("Can't switch to downstream branch") | |
cmd(["git", "checkout", branch]) | |
hg_update_subrepo_revisions() | |
def cmd_fetch(): | |
"""Fetches changes from remote mercurial repository.""" | |
if git_get_current_branch() == "_hg_downstream": | |
raise IOError("Can't work on downstream branch") | |
hg_update() | |
def cmd_merge(fetchUpdates=False): | |
"""Merges the downstream branch into the current one, optionally fetches updates.""" | |
if git_get_current_branch() == "_hg_downstream": | |
raise IOError("Can't merge into downstream branch") | |
if fetchUpdates: | |
hg_update() | |
cmd(["git", "merge", "_hg_downstream"]) | |
def cmd_push(message="", fetchUpdates=False, preview=False): | |
"""Pushes the current changes to mercurial.""" | |
gitRepo = os.path.abspath(os.curdir) | |
hgRepo = os.path.abspath(".hgit.repo") | |
os.chdir(gitRepo) | |
state = 0 | |
branch = git_get_current_branch() | |
if branch == "_hg_downstream": | |
raise IOError("Can't push from downstream branch") | |
if fetchUpdates: | |
hg_update() | |
cmd(["git", "merge", "_hg_downstream"]) | |
try: | |
git_replay(branch) | |
os.chdir(hgRepo) | |
cmd(["hg", "addremove"]) | |
changed = False | |
for line in subprocess.check_output(["hg", "status"]).split("\n"): | |
line = line.strip() | |
if len(line) > 0 and line[0] in "AMR": | |
changed = True | |
break | |
if changed: | |
if preview: | |
cmd(["hg", "status"]) | |
if not query_yes_no("Do you want to push these changes to " + hg_get_default_repo_path(), "no"): | |
cmd(["hg", "update", "-C"]) | |
return | |
if len(message) > 0: | |
cmd(["hg", "commit", "-m", message]) | |
else: | |
cmd(["hg", "commit"]) | |
state = 1 | |
cmd(["hg", "push"]) | |
state = 2 | |
hg_update() | |
else: | |
os.chdir(hgRepo) | |
cmd(["hg", "update", "-C"]) | |
except: | |
os.chdir(hgRepo) | |
if state == 1: | |
cmd(["hg", "rollback"]) | |
cmd(["hg", "update", "-C"]) | |
raise | |
def create_argparser(): | |
"""Creates and returns the ArgumentParser.""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-d", "--debug", action="store_true", help="enables debug mode") | |
subparsers = parser.add_subparsers(dest="sub_commands") | |
parser_clone = subparsers.add_parser( | |
"clone", description="clones a (mercurial) repository") | |
parser_clone.add_argument( | |
"-n", "--limit", type=int, default=-1, help="only migrate last n commits to git") | |
parser_clone.add_argument("url", type=str, help="the repository URL") | |
parser_clone.add_argument( | |
"name", type=str, nargs="?", default=None, help="name of the directory to clone into") | |
parser_push = subparsers.add_parser( | |
"push", description="commit and push changes from the current branch to the hg repository") | |
parser_push.add_argument( | |
"-m", "--message", type=str, default="", help="the hg commit message") | |
parser_push.add_argument( | |
"-u", "--update", action="store_true", help="fetch and merge updates first") | |
parser_push.add_argument("-p", "--preview", action="store_true", | |
help="shows a preview of the changes before committing and asks to continue") | |
parser_subrepos = subparsers.add_parser( | |
"subrepos", description="updates mercurial subrepositories") | |
parser_subrepos.add_argument("-r", "--revisions-only", action="store_true", | |
help="only update to linked changeset without checking for updates") | |
parser_switch = subparsers.add_parser( | |
"switch", description="switches to the given git branch and updates submodules") | |
parser_switch.add_argument("branch", type=str, help="the git branch name") | |
subparsers.add_parser( | |
"fetch", description="fetches changes from remote mercurial repository, make sure your git working directory is clean") | |
parser_merge = subparsers.add_parser( | |
"merge", description="merges the downstream branch into the current one") | |
parser_merge.add_argument("-u", "--update", action="store_true", | |
help="fetch updates first, make sure your git working directory is clean") | |
parser_help = subparsers.add_parser("help") | |
parser_help.add_argument("command", nargs="?", default=None) | |
return parser | |
def main(): | |
"""int main(int argc, char** argv)^^""" | |
parser = create_argparser() | |
if len(sys.argv) < 2: | |
sys.argv.append("help") | |
parsed = parser.parse_args() | |
try: | |
if parsed.sub_commands == "help": | |
if not parsed.command: | |
parser.parse_args(["--help"]) | |
else: | |
parser.parse_args([parsed.command, "--help"]) | |
else: | |
if parsed.sub_commands == "clone": | |
cmd_clone(parsed.url, parsed.name, parsed.limit) | |
elif parsed.sub_commands == "subrepos": | |
cmd_update_subrepos(parsed.revisions_only) | |
elif parsed.sub_commands == "switch": | |
cmd_switch_branch(parsed.branch) | |
elif parsed.sub_commands == "fetch": | |
cmd_fetch() | |
elif parsed.sub_commands == "merge": | |
cmd_merge(parsed.update) | |
elif parsed.sub_commands == "push": | |
cmd_push(parsed.message, parsed.update, parsed.preview) | |
except Exception as e: | |
if parsed.debug: | |
raise | |
else: | |
print "*** Error: %s" % e | |
exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment