-
-
Save datavudeja/09e362b6cd5f4c1799b58989c91292ab to your computer and use it in GitHub Desktop.
Utilities for creating a docker image.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Utilities for creating a docker image. | |
""" | |
import fnmatch | |
import json | |
import pathlib | |
import re | |
import stat | |
import subprocess | |
import tarfile | |
import time | |
def shellCmd(cmd: str, input: str = None) -> (int, str, str): | |
""" | |
/bin/sh `cmd` < `input` | |
:param cmd: shell command | |
:param input: data | |
:return: (exitcode, stdout, stderr) | |
""" | |
with subprocess.Popen(args=cmd, shell=True, text=True, | |
stdin=(input and -1), stdout=-1, stderr=-1) as proc: | |
if input and proc.stdin: | |
proc._stdin_write(input) | |
return (proc.wait(), proc.stdout.read(), proc.stderr.read()) | |
def cmdImageBuild(dockerfile_text: str, workdir: str or pathlib.Path="") -> str: | |
""" | |
docker image build -f - `workdir` < `dockerfile_text` | |
:param dockerfile_text: code | |
:param workdir: workdir for ADD and COPY | |
:return: ID | |
""" | |
workdir = str(workdir or "") | |
if len(workdir): | |
workdir = f"f - {workdir}" | |
(excode, pout, perr) = shellCmd(f"docker image build -{workdir}", dockerfile_text) | |
if excode == 0: | |
return pout.rsplit("Successfully built", 1)[-1].strip() | |
if excode == 1 and "TLS handshake timeout" in perr: | |
raise ConnectionError("TLS handshake timeout") | |
print(f"{pout}\nExitCode={excode}\n{perr}") | |
def cmdImageImport(tar_path, tag: str, change: str or list=None) -> str: | |
""" | |
docker image import [--change `change`, ...] `tar_path` [`tag`] | |
:param tar_path: path of image | |
:param tag: tag | |
:param change: one or list of --change | |
:return: ID | |
""" | |
if tag is None: | |
tag = "" | |
chng = "" | |
if change is not None: | |
if isinstance(change, str): | |
chng = " ".join("--change '" + c.replace("'", "\\'") + "'" for c in change.splitlines(False)) | |
else: | |
chng = " ".join("--change '" + c.replace("'", "\\'") + "'" for c in change) | |
chng = chng.replace("--change ''", "") | |
(excode, pout, perr) = shellCmd(f"docker image import {chng} {tar_path} {tag}") | |
if excode == 0: | |
return pout.split(":", 1)[-1].strip() | |
print(f"{pout}\nExitCode={excode}\n{perr}") | |
def dockerImageSaveAnalizer(path: pathlib.Path or str) -> dict: | |
""" | |
Returns config and information on files in all layers of the exported image. | |
:param path: path to image | |
:return: {"manifest": manifest, "config": config, "layers": layers} | |
""" | |
with tarfile.open(path, "r") as img: | |
tinfo = dict([(_.name, _) for _ in img.getmembers()]) | |
manifest = json.load(img.extractfile(tinfo["manifest.json"]))[0] | |
config = json.load(img.extractfile(tinfo[manifest["Config"]])) | |
layers = [] | |
for lrs in manifest["Layers"]: | |
with tarfile.open(fileobj=img.extractfile(lrs)) as l: | |
offset = tinfo[lrs].offset_data | |
members = l.getmembers() | |
for m in members: | |
m.offset += offset | |
m.offset_data += offset | |
layers.append((json.load(img.extractfile(tinfo[lrs[:-9] + "json"])), members)) | |
return {"manifest": manifest, "config": config, "layers": layers} | |
def mergeLayers(dst: tarfile.TarFile, src: str, include: tuple or list = ("*",), | |
exclude: tuple or list = None): | |
""" | |
Copying files from all layers of the image to the archive. | |
Format for `include` and `exclude` = fnmatch. But "*" is added to the beginning if not startswith("/"). | |
:param dst: open tarball | |
:param src: path to image (for `dockerImageSaveAnalizer(src)`) | |
:param include: list with fnmatch or re.compile() | |
:param exclude: list with fnmatch or re.compile() | |
""" | |
def _convertFnmatch(match: str) -> re.Pattern: | |
if not isinstance(match, str): | |
return match | |
if not match.startswith(("/", "*")): | |
match = f"*{match}" | |
return re.compile(fnmatch.translate(match)) | |
flist = {} | |
if include and not isinstance(include, (tuple, list)): | |
include = (include,) | |
if exclude and not isinstance(exclude, (tuple, list)): | |
exclude = (exclude,) | |
include = (include and tuple(map(_convertFnmatch, include))) or tuple() | |
exclude = (exclude and tuple(map(_convertFnmatch, exclude))) or tuple() | |
name = "" | |
exbreak = False | |
for cfg, lrs in dockerImageSaveAnalizer(src)["layers"]: | |
for l in lrs: | |
name = f"/{l.name}" | |
exbreak = False | |
for mtch in exclude: | |
if mtch.match(name): | |
exbreak = True | |
break | |
if exbreak: | |
continue | |
for mtch in include: | |
if mtch.match(name): | |
flist[l.name] = l | |
break | |
with tarfile.open(src, "r") as tin: | |
for tinfo in flist.values(): | |
dst.addfile(tinfo, | |
fileobj=(tin.extractfile(tinfo) if not (tinfo.islnk() or tinfo.issym()) else None)) | |
def repairTar(tar_path, force=False) -> list: | |
""" | |
Removes duplicates from the tarball that appear after adding to the end of the archive. | |
Docker cannot unpack such tar. | |
:param tar_path: path | |
:param force: repack the archive with sorted paths always. | |
:return: list of duplicates | |
""" | |
duplicate = [] | |
flist = {} | |
with tarfile.open(tar_path, "r") as ftar: | |
for m in ftar.getmembers(): | |
if m.name in flist: | |
duplicate.append(m.name) | |
flist[m.name] = m | |
if len(duplicate) > 0 or force: | |
tmpfile = f"{tar_path}_tempRepairTar" | |
with tarfile.open(tmpfile, "w") as ftmp: | |
with tarfile.open(tar_path, "r") as ftar: | |
for name in sorted(flist.keys()): | |
tinfo = flist[name] | |
ftmp.addfile(tinfo, fileobj=( | |
ftar.extractfile(tinfo) if not (tinfo.islnk() or tinfo.issym()) else None)) | |
pathlib.Path(tmpfile).replace(tar_path) | |
return duplicate | |
def dumpImageInfo(tofile: pathlib.Path or str, imageSaveAnalizer: dict): | |
""" | |
Dump config and list of files (`ls -l`) from all layers of the image. | |
:param tofile: path | |
:param imageSaveAnalizer: return of `dockerImageSaveAnalizer()` | |
""" | |
with open(tofile, "w", encoding="utf-8") as out: | |
out.write("===== MANIFEST =====\n") | |
json.dump(imageSaveAnalizer["manifest"], out, indent=2) | |
out.write("\n===== CONFIG =====\n") | |
json.dump(imageSaveAnalizer["config"], out, indent=2) | |
out.write("\n===== LAYERS =====\n") | |
for cfg, lrs in imageSaveAnalizer["layers"]: | |
out.write(f"LayerID: {cfg['id']}\n") | |
json.dump(cfg, out, indent=2) | |
out.write("\n") | |
lst = [] | |
for l in lrs: | |
tm = time.gmtime(l.mtime) | |
if l.uid == 0 and len(l.uname) == 0: | |
l.uname = "root" | |
if l.gid == 0 and len(l.gname) == 0: | |
l.gname = "root" | |
lst.append(( | |
f"{tm.tm_year}-{tm.tm_mon:0>2d}-{tm.tm_mday:0>2d} " + | |
f"{tm.tm_hour:0>2d}:{tm.tm_min:0>2d}:{tm.tm_sec:0>2d}", | |
stat.filemode(l.mode), | |
f"{l.uname}({l.uid})", | |
f"{l.gname}({l.gid})", | |
str(l.size or ""), | |
"| " + l.name + (f" -> {l.linkname}" if len(l.linkname) else "") | |
)) | |
if len(lst) == 0: | |
continue | |
maxlen = [len(l) for l in lst[0]] | |
for l in lst: | |
maxlen = [max(maxlen[x], len(d)) for x, d in enumerate(l)] | |
for l in lst: | |
out.write(" ".join( | |
[(f"{d: <{maxlen[x]}}" if x != 4 else f"{d: >{maxlen[x]}}") for x, d in enumerate(l)] | |
).strip() + "\n") | |
out.write("\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment