-
-
Save lostdesign/8a283bc44778ceeb5baa2b653f439af0 to your computer and use it in GitHub Desktop.
Combine MBTiles
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#------------------------------------------------------------------------------- | |
# Name: combine_mbtiles.py | |
# Purpose: Processes multiple sqlite .mbtiles files into single file (or by level? or by degree grids?) | |
# | |
# Author: joshn. Modified by ache015 | |
# | |
# Created: 18/11/2014 | |
# | |
# Last Modified: 10/11/2018 | |
#------------------------------------------------------------------------------- | |
import os, sys, datetime, time, math, sqlite3, shutil | |
#--------------------------------Config----------------------------------------- | |
sourcePath = "/path/to/mbtiles/" | |
# The masterfile will be used to append to all other dbs into | |
masterFile = "master.mbtiles" | |
fileExtension = ".mbtiles" | |
filesToSkipTxt = "11_15" #not required for this purpose | |
logLineNumCharPadding = 20 | |
#------------------------------------------------------------------------------- | |
def main(): | |
log("Start") | |
os.chdir(sourcePath) | |
# Get the sqlite db mbtiles files | |
files = getFilesToProcess() | |
log("{0} files to merge".format(len(files))) | |
# Merge the dbs | |
mergeFiles(files) | |
#print files | |
log("") | |
log("Complete") | |
def getFilesToProcess(): | |
filesToProcess = [] | |
for root, dirs, files in os.walk("."): | |
for file in files: | |
# Check the file extension an skip file txt | |
fileName, fileExt = os.path.splitext(file) | |
if fileExt.lower() == fileExtension.lower() and not filesToSkipTxt in file and not file == masterFile: | |
filesToProcess.append(file) | |
return filesToProcess | |
def mergeFiles(files): | |
# Backup the masterfile | |
backupFile(masterFile,"backup") | |
# Connect to the master db for source | |
conDest = mbtilesConnect(masterFile) | |
curDest = conDest.cursor() | |
optimizeConnection(curDest) | |
log("master{0}:\t\t{1}".format("\t"*3,getFileSize(os.path.getsize(masterFile)))) | |
for dbFile in files: | |
log("{0}\t({1})".format(dbFile[:11],getFileSize(os.path.getsize(dbFile)))) | |
# Connect to the source db | |
conSource = mbtilesConnect(dbFile) | |
curSource = conSource.cursor() | |
optimizeConnection(conSource) | |
# Replace from destination db into the master db | |
try: | |
sqlScript = """ATTACH DATABASE '{0}' AS source; | |
REPLACE INTO map SELECT * FROM source.map; | |
REPLACE INTO images SELECT * FROM source.images;""".format(dbFile) | |
curDest.executescript(sqlScript) | |
curDest.execute("DETACH DATABASE source;") | |
except Exception as e: | |
log(e) | |
log("master{0}:\t\t{1}\n".format("\t"*3,getFileSize(os.path.getsize(masterFile)))) | |
def mbtilesConnect(mbtilesFile): | |
try: | |
con = sqlite3.connect(mbtilesFile) | |
return con | |
except Exception as e: | |
log("ERROR Could not connect to database") | |
log(e) | |
sys.exit(1) | |
def optimizeConnection(cur): | |
# http://www.sqlite.org/pragma.html#pragma_synchronous | |
# Some operations are as much as 50 or more times faster with synchronous OFF (but more risky if failure) | |
cur.execute("""PRAGMA synchronous=0""") | |
cur.execute("""PRAGMA locking_mode=EXCLUSIVE""") | |
cur.execute("""PRAGMA journal_mode=DELETE""") | |
def backupFile(fileName, backupAppendName): | |
backupFileName = "{0}_{1}".format(fileName,backupAppendName) | |
if not os.path.exists(backupFileName): | |
log("Creating a backup file '{0}'...".format(backupFileName)) | |
try: | |
shutil.copyfile(fileName,backupFileName) | |
log("Backup created\n") | |
except Exception as e: | |
log(e) | |
else: | |
log("Backup '{0}' already exists.\n".format(backupFileName)) | |
def getFileSize(size): | |
size_name = ("B","KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") | |
i = int(math.floor(math.log(size,1024))) | |
p = math.pow(1024,i) | |
s = round(size/p,2) | |
if (s > 0): | |
return '%s %s' % (s,size_name[i]) | |
else: | |
return '0B' | |
def log(msg): | |
print "{0}\t{1}".format(datetime.datetime.now().strftime("%H:%M:%S"),msg) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment