Skip to content

Instantly share code, notes, and snippets.

Forked from Roffild/
Created February 5, 2025 20:11
Show Gist options
  • Save datavudeja/09e362b6cd5f4c1799b58989c91292ab to your computer and use it in GitHub Desktop.
Save datavudeja/09e362b6cd5f4c1799b58989c91292ab to your computer and use it in GitHub Desktop.
Utilities for creating a docker image.
Utilities for creating a docker image.
import fnmatch
import json
import pathlib
import re
import stat
import subprocess
import tarfile
import time
def shellCmd(cmd: str, input: str = None) -> (int, str, str):
/bin/sh `cmd` < `input`
:param cmd: shell command
:param input: data
:return: (exitcode, stdout, stderr)
with subprocess.Popen(args=cmd, shell=True, text=True,
stdin=(input and -1), stdout=-1, stderr=-1) as proc:
if input and proc.stdin:
return (proc.wait(),,
def cmdImageBuild(dockerfile_text: str, workdir: str or pathlib.Path="") -> str:
docker image build -f - `workdir` < `dockerfile_text`
:param dockerfile_text: code
:param workdir: workdir for ADD and COPY
:return: ID
workdir = str(workdir or "")
if len(workdir):
workdir = f"f - {workdir}"
(excode, pout, perr) = shellCmd(f"docker image build -{workdir}", dockerfile_text)
if excode == 0:
return pout.rsplit("Successfully built", 1)[-1].strip()
if excode == 1 and "TLS handshake timeout" in perr:
raise ConnectionError("TLS handshake timeout")
def cmdImageImport(tar_path, tag: str, change: str or list=None) -> str:
docker image import [--change `change`, ...] `tar_path` [`tag`]
:param tar_path: path of image
:param tag: tag
:param change: one or list of --change
:return: ID
if tag is None:
tag = ""
chng = ""
if change is not None:
if isinstance(change, str):
chng = " ".join("--change '" + c.replace("'", "\\'") + "'" for c in change.splitlines(False))
chng = " ".join("--change '" + c.replace("'", "\\'") + "'" for c in change)
chng = chng.replace("--change ''", "")
(excode, pout, perr) = shellCmd(f"docker image import {chng} {tar_path} {tag}")
if excode == 0:
return pout.split(":", 1)[-1].strip()
def dockerImageSaveAnalizer(path: pathlib.Path or str) -> dict:
Returns config and information on files in all layers of the exported image.
:param path: path to image
:return: {"manifest": manifest, "config": config, "layers": layers}
with, "r") as img:
tinfo = dict([(, _) for _ in img.getmembers()])
manifest = json.load(img.extractfile(tinfo["manifest.json"]))[0]
config = json.load(img.extractfile(tinfo[manifest["Config"]]))
layers = []
for lrs in manifest["Layers"]:
with as l:
offset = tinfo[lrs].offset_data
members = l.getmembers()
for m in members:
m.offset += offset
m.offset_data += offset
layers.append((json.load(img.extractfile(tinfo[lrs[:-9] + "json"])), members))
return {"manifest": manifest, "config": config, "layers": layers}
def mergeLayers(dst: tarfile.TarFile, src: str, include: tuple or list = ("*",),
exclude: tuple or list = None):
Copying files from all layers of the image to the archive.
Format for `include` and `exclude` = fnmatch. But "*" is added to the beginning if not startswith("/").
:param dst: open tarball
:param src: path to image (for `dockerImageSaveAnalizer(src)`)
:param include: list with fnmatch or re.compile()
:param exclude: list with fnmatch or re.compile()
def _convertFnmatch(match: str) -> re.Pattern:
if not isinstance(match, str):
return match
if not match.startswith(("/", "*")):
match = f"*{match}"
return re.compile(fnmatch.translate(match))
flist = {}
if include and not isinstance(include, (tuple, list)):
include = (include,)
if exclude and not isinstance(exclude, (tuple, list)):
exclude = (exclude,)
include = (include and tuple(map(_convertFnmatch, include))) or tuple()
exclude = (exclude and tuple(map(_convertFnmatch, exclude))) or tuple()
name = ""
exbreak = False
for cfg, lrs in dockerImageSaveAnalizer(src)["layers"]:
for l in lrs:
name = f"/{}"
exbreak = False
for mtch in exclude:
if mtch.match(name):
exbreak = True
if exbreak:
for mtch in include:
if mtch.match(name):
flist[] = l
with, "r") as tin:
for tinfo in flist.values():
fileobj=(tin.extractfile(tinfo) if not (tinfo.islnk() or tinfo.issym()) else None))
def repairTar(tar_path, force=False) -> list:
Removes duplicates from the tarball that appear after adding to the end of the archive.
Docker cannot unpack such tar.
:param tar_path: path
:param force: repack the archive with sorted paths always.
:return: list of duplicates
duplicate = []
flist = {}
with, "r") as ftar:
for m in ftar.getmembers():
if in flist:
flist[] = m
if len(duplicate) > 0 or force:
tmpfile = f"{tar_path}_tempRepairTar"
with, "w") as ftmp:
with, "r") as ftar:
for name in sorted(flist.keys()):
tinfo = flist[name]
ftmp.addfile(tinfo, fileobj=(
ftar.extractfile(tinfo) if not (tinfo.islnk() or tinfo.issym()) else None))
return duplicate
def dumpImageInfo(tofile: pathlib.Path or str, imageSaveAnalizer: dict):
Dump config and list of files (`ls -l`) from all layers of the image.
:param tofile: path
:param imageSaveAnalizer: return of `dockerImageSaveAnalizer()`
with open(tofile, "w", encoding="utf-8") as out:
out.write("===== MANIFEST =====\n")
json.dump(imageSaveAnalizer["manifest"], out, indent=2)
out.write("\n===== CONFIG =====\n")
json.dump(imageSaveAnalizer["config"], out, indent=2)
out.write("\n===== LAYERS =====\n")
for cfg, lrs in imageSaveAnalizer["layers"]:
out.write(f"LayerID: {cfg['id']}\n")
json.dump(cfg, out, indent=2)
lst = []
for l in lrs:
tm = time.gmtime(l.mtime)
if l.uid == 0 and len(l.uname) == 0:
l.uname = "root"
if l.gid == 0 and len(l.gname) == 0:
l.gname = "root"
f"{tm.tm_year}-{tm.tm_mon:0>2d}-{tm.tm_mday:0>2d} " +
str(l.size or ""),
"| " + + (f" -> {l.linkname}" if len(l.linkname) else "")
if len(lst) == 0:
maxlen = [len(l) for l in lst[0]]
for l in lst:
maxlen = [max(maxlen[x], len(d)) for x, d in enumerate(l)]
for l in lst:
out.write(" ".join(
[(f"{d: <{maxlen[x]}}" if x != 4 else f"{d: >{maxlen[x]}}") for x, d in enumerate(l)]
).strip() + "\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment