Skip to content

Instantly share code, notes, and snippets.

@giuseppe
Created May 7, 2017 17:26
Show Gist options
  • Save giuseppe/43cdb1e86f3042c4bcfd25cb68a249af to your computer and use it in GitHub Desktop.
Save giuseppe/43cdb1e86f3042c4bcfd25cb68a249af to your computer and use it in GitHub Desktop.
build rpm
# Released under the terms of the GPLv2
import os
import sys
import struct
import subprocess
import select
import StringIO
import gzip
import hashlib
import tempfile
import shutil
class RpmWriter(object):
MAGIC = [0xed, 0xab, 0xee, 0xdb]
MAJOR = [0x3]
MINOR = [0x0]
BINARY_TYPE = [0x0, 0x0]
NOARCH = [0xFF, 0xFF]
OS = [0x0, 0x1]
SIGNATURE_TYPE = [0x0, 0x5]
RESERVED = [0x0] * 16
HEADER_MAGIC = [0x8e, 0xad, 0xe8]
HEADER_VERSION = [0x01]
HEADER_RESERVED = [0x0] * 4
SIGNATURE_MAGIC = [0x8e, 0xad, 0xe8]
SIGNATURE_VERSION = [0x01]
SIGNATURE_RESERVED = [0x0] * 4
RPMTAG_NAME = 1000
RPMTAG_VERSION = 1001
RPMTAG_RELEASE = 1002
RPMTAG_OS = 1021
RPMTAG_ARCH = 1022
RPMTAG_FILESIZES = 1028
RPMTAG_DIRINDEXES = 1116
RPMTAG_BASENAMES = 1117
RPMTAG_DIRNAMES = 1118
RPMTAG_FILEMODES = 1030
RPMTAG_FILEDIGESTS = 1035
RPMTAG_FILEFLAGS = 1037
RPMTAG_FILEUSERNAME = 1039
RPMTAG_FILEGROUPNAME = 1040
RPMTAG_PAYLOADFORMAT = 1124
RPMTAG_PAYLOADCOMPRESSOR = 1125
RPMTAG_FILEDIGESTALGO = 5011
RPM_SPEC_FILEMODE = (1 << 8)
RPM_SPEC_DIRMODE = (1 << 9)
RPMFILE_CONFIG = (1 << 0)
RPMTAG_PAYLOADDIGEST = 5092
RPMTAG_PAYLOADDIGESTALGO = 5093
PGPHASHALGO_SHA1 = 2
def get_sha1(self, path):
m = hashlib.sha1()
with open(path, 'r') as f:
while True:
data = os.read(f.fileno(), 4096)
if len(data) == 0:
break
m.update(data)
return m.hexdigest()
def __init__(self, out, root, name, version, release):
self.out = out
self.name = name
self.version = version
self.release = release
self.headers = []
self.written = 0
self.root = root
self.all_files = []
def add_header(self, tag, typ, count, value, pad=1):
self.headers.append([tag, typ, count, value, pad])
def _make_uint16(self, val):
return bytearray(struct.pack(">H", val))
def _make_uint32(self, val):
return bytearray(struct.pack(">I", val))
def _writebytearray(self, data):
self.written += len(data)
self.out.write(bytearray(data))
def _rpmlead(self):
def get_name(name, version, release):
name = "%s-%s-%s" % (name, version, release)
if len(name) > 65:
name = name[:65]
return bytearray(name) + bytearray([0] * (66 - len(name)))
self._writebytearray(RpmWriter.MAGIC)
self._writebytearray(RpmWriter.MAJOR)
self._writebytearray(RpmWriter.MINOR)
self._writebytearray(RpmWriter.BINARY_TYPE)
self._writebytearray(RpmWriter.NOARCH)
self._writebytearray(get_name(self.name, self.version, self.release))
self._writebytearray(RpmWriter.OS)
self._writebytearray(RpmWriter.SIGNATURE_TYPE)
self._writebytearray(RpmWriter.RESERVED)
def pad(self, size):
while self.written % size != 0:
self._writebytearray([0])
def _signature(self):
self._writebytearray(RpmWriter.SIGNATURE_MAGIC)
self._writebytearray(RpmWriter.SIGNATURE_VERSION)
self._writebytearray(RpmWriter.SIGNATURE_RESERVED)
self._writebytearray(self._make_uint32(1))
self._writebytearray(self._make_uint32(4))
self._writebytearray(self._make_uint32(1000)) # sigtag_size
self._writebytearray(self._make_uint32(4)) # int32
self._writebytearray(self._make_uint32(0)) # offset
self._writebytearray(self._make_uint32(1)) # count
# payload
self._writebytearray(self._make_uint32(0))
self.pad(8)
def _header(self):
header_section = bytearray()
store = bytearray()
self._writebytearray(RpmWriter.HEADER_MAGIC)
self._writebytearray(RpmWriter.HEADER_VERSION)
self._writebytearray(RpmWriter.HEADER_RESERVED)
for i, v in enumerate(self.headers):
while (len(store) % v[4]) != 0:
store.append(0x0)
header_section += self._make_uint32(v[0]) # tag
header_section += self._make_uint32(v[1]) # type
header_section += self._make_uint32(len(store)) # offset
header_section += self._make_uint32(v[2]) # count
store += v[3] # value
self._writebytearray(self._make_uint32(len(self.headers)))
self._writebytearray(self._make_uint32(len(store)))
self._writebytearray(header_section)
self._writebytearray(store)
def _payload(self, out):
def try_read(stdout, gzip_out, out):
while True:
readable, _, _ = select.select([stdout], [], [], 0)
if len(readable) == 0:
break
data = os.read(stdout.fileno(), 4096)
gzip_out.write(data)
with open(os.devnull) as DEVNULL, gzip.GzipFile(fileobj=out, mode="w") as gzip_out:
cpio_process = subprocess.Popen(["cpio", "-H", "crc", "-no"], stderr=DEVNULL, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
for f in self.all_files:
cpio_process.stdin.write(f + "\n")
try_read(cpio_process.stdout, gzip_out, out)
cpio_process.stdin.close()
shutil.copyfileobj(cpio_process.stdout, gzip_out)
def _make_array_uint32(self, ints):
ret = bytearray()
for i in ints:
ret += self._make_uint32(i)
return ret
def _make_array_uint16(self, ints):
ret = bytearray()
for i in ints:
ret += self._make_uint16(i)
return ret
def _make_array_strings(self, strings):
return "\0".join(strings) + "\0"
def generate(self):
self.all_files = []
for root, _, files in os.walk(self.root):
for f in files:
path = os.path.join(root, f)
self.all_files.append(path)
self.all_files.sort()
dirs = set()
for i in self.all_files:
dirs.add(os.path.dirname(i))
dirs = list(dirs)
dir_index = {}
for i, v in enumerate(dirs):
dir_index[v] = i
basenames = []
dirindexes = []
for i in self.all_files:
dirindexes.append(dir_index.get(os.path.dirname(i)))
basenames.append(os.path.basename(i))
all_stats = [os.stat(x) for x in self.all_files]
self.add_header(RpmWriter.RPMTAG_DIRNAMES, 8, len(dirs), self._make_array_strings(["%s/" % x for x in dirs]))
self.add_header(RpmWriter.RPMTAG_BASENAMES, 8, len(basenames), self._make_array_strings(basenames))
self.add_header(RpmWriter.RPMTAG_DIRINDEXES, 4, len(dirindexes), self._make_array_uint32(dirindexes), pad=4)
self.add_header(RpmWriter.RPMTAG_FILEUSERNAME, 8, len(basenames), self._make_array_strings(["root"] * len(basenames)))
self.add_header(RpmWriter.RPMTAG_FILEGROUPNAME, 8, len(basenames), self._make_array_strings(["root"] * len(basenames)))
fileflags = [RPMFILE_CONFIG if x.startswith("/etc/") else 0 for x in self.all_files]
self.add_header(RpmWriter.RPMTAG_FILEFLAGS, 4, len(basenames), self._make_array_uint32(fileflags), pad=4)
self.add_header(RpmWriter.RPMTAG_FILESIZES, 4, len(basenames), self._make_array_uint32([x.st_size for x in all_stats]), pad=4)
filemodes = [x.st_mode for x in all_stats]
self.add_header(RpmWriter.RPMTAG_FILEMODES, 3, len(basenames), self._make_array_uint16(filemodes), pad=2)
self.add_header(RpmWriter.RPMTAG_NAME, 6, 1, "%s\0" % self.name)
self.add_header(RpmWriter.RPMTAG_VERSION, 6, 1, "%s\0" % self.version)
self.add_header(RpmWriter.RPMTAG_RELEASE, 6, 1, "%s\0" % self.release)
self.add_header(RpmWriter.RPMTAG_OS, 6, 1, "linux\0")
self.add_header(RpmWriter.RPMTAG_ARCH, 6, 1, "noarch\0")
self.add_header(RpmWriter.RPMTAG_PAYLOADFORMAT, 6, 1, "cpio\0")
self.add_header(RpmWriter.RPMTAG_PAYLOADCOMPRESSOR, 6, 1, "gzip\0")
with tempfile.NamedTemporaryFile() as payload:
self._payload(payload)
filedigests = [self.get_sha1(payload.name)]
filedigests = [self.get_sha1(x) for x in self.all_files]
self.add_header(RpmWriter.RPMTAG_FILEDIGESTALGO, 4, 1, self._make_uint32(RpmWriter.PGPHASHALGO_SHA1), pad=4)
self.add_header(RpmWriter.RPMTAG_FILEDIGESTS, 8, len(filedigests), self._make_array_strings(filedigests))
self.add_header(RpmWriter.RPMTAG_PAYLOADDIGESTALGO, 4, 1, self._make_uint32(RpmWriter.PGPHASHALGO_SHA1), pad=4)
self.add_header(RpmWriter.RPMTAG_PAYLOADDIGEST, 8, 1, self._make_array_strings([self.get_sha1(payload.name)]))
self._rpmlead()
self._signature()
self._header()
payload.seek(0)
shutil.copyfileobj(payload, self.out)
if __name__ == "__main__":
if len(sys.argv) < 3:
sys.stderr.write("Usage minirpmbuild.py TREE RPM-FILE NAME\n")
sys.exit(1)
tree, rpm_file, name = sys.argv[1:4]
with open(rpm_file, "w") as f:
writer = RpmWriter(f, tree, name, "1", "1")
writer.generate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment