-
-
Save spiwn/8c3545bb4d48ce8027e2b05facb38e1d to your computer and use it in GitHub Desktop.
Script to simulate mining diamonds in a Minecraft world and determine efficiency of different mining patterns
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import anvil | |
import datetime | |
from collections import defaultdict | |
# Setting this to 1 turns off multi threading functionality | |
threadCount = 4 | |
diamond = "minecraft:diamond_ore" | |
minable = {'coal_ore', 'iron_ore', 'redstone_ore', 'gold_ore', 'lapis_ore', 'emerald_ore'} | |
minable = set(map(lambda x: "minecraft:" + x, minable)) | |
minable = set() | |
minable.add(diamond) | |
csv_filename = "minedata" + str(datetime.datetime.now().strftime("%Y%m%d-%H%M%S")) + ".csv" | |
air = ['air', -1] | |
twoToTheSixtyFourth = 2 ** 64 | |
sectionsToParse = [0] | |
defaultBlockValue = -1 | |
_Version_20w_17a = 2529 | |
class Blocks: | |
def __init__(self, constraints): | |
self.chunkconstraints = [(constraints[0] >> 4), (constraints[1] >> 4), (constraints[2] >> 4), | |
(constraints[3] >> 4)] | |
self.blockconstraints = constraints | |
self.region = None | |
self.cachedChunks = {} | |
#self.parsingTime = datetime.timedelta() | |
#self.totalLoadingTime = datetime.timedelta() | |
#self.chunktime = datetime.timedelta() | |
def parseSection(self, chunk, section): | |
#start = datetime.datetime.now() | |
section = chunk.get_section(section) | |
#self.chunktime += datetime.datetime.now() - start | |
if section is None or section.get('BlockStates') is None: | |
return [air for i in range(4096)] | |
states = section['BlockStates'].value | |
palette = section['Palette'] | |
bits = max((len(palette) - 1).bit_length(), 4) | |
state = 0 | |
data = states[0] | |
if data < 0: | |
data += twoToTheSixtyFourth | |
bits_mask = 2 ** bits - 1 | |
offset = 0 | |
data_len = 64 | |
# start = datetime.datetime.now() | |
result = [0 for i in range(4096)] | |
version = chunk.version.value | |
for i in range(4096): | |
if data_len < bits: | |
state += 1 | |
new_data = states[state] | |
if new_data < 0: | |
new_data += twoToTheSixtyFourth | |
if version >= _Version_20w_17a: | |
data = new_data | |
data_len = 64 | |
else: | |
leftover = data_len | |
data_len += 64 | |
data = anvil.chunk.bin_append(new_data, data, leftover) | |
palette_id = data & bits_mask | |
data >>= bits | |
data_len -= bits | |
result[i] = [palette[palette_id]['Name'].value, defaultBlockValue] | |
#end = datetime.datetime.now() | |
#self.parsingTime += end - start | |
return result | |
def loadRegion(self, regionX, regionZ, section=None): | |
#print("Loading region: ", regionX, regionZ, section) | |
#regionLoadTimeEnd = start = datetime.datetime.now() | |
region = self.region | |
if region is None: | |
region = anvil.Region.from_file(f'server/world/region/r.{regionX}.{regionZ}.mca') | |
#regionLoadTimeEnd = datetime.datetime.now() | |
self.region = region | |
constraints = self.chunkconstraints | |
xStart = max((regionX << 5) + 0, constraints[0]) | |
xStop = min((regionX << 5) + 32, constraints[2]) | |
zStart = max((regionZ << 5) + 0, constraints[1]) | |
zStop = min((regionZ << 5) + 32, constraints[3]) | |
for x in range(xStart, xStop + 1): | |
byX = self.cachedChunks.setdefault(x, {}) | |
for z in range(zStart, zStop + 1): | |
try: | |
chunk = anvil.Chunk.from_region(region, x, z) | |
except Exception as e: | |
print("Failed to load chunk: ", x, z) | |
raise e | |
sectionList = byX.setdefault(z, []) | |
if section: | |
if len(sectionList) <= section: | |
sectionList.extend([0 for i in range(1 + (section - len(sectionList)))]) | |
sectionList[section] = self.parseSection(chunk, section) | |
elif not sectionList: | |
sectionList.extend([0 for i in sectionsToParse]) | |
for sectionIndex in sectionsToParse: | |
sectionList[sectionIndex] = self.parseSection(chunk, sectionIndex) | |
#end = datetime.datetime.now() | |
#self.totalLoadingTime += end - start | |
#print("parsing time:", self.parsingTime, "loading", self.totalLoadingTime - self.parsingTime, "read file", | |
#regionLoadTimeEnd - start, "total", self.totalLoadingTime) | |
def getBlockCached(self, x, y, z): | |
chunkX = x >> 4 | |
byX = self.cachedChunks.get(chunkX) | |
if byX is None: | |
self.loadRegion(x >> 9, z >> 9) | |
#try: | |
byX = self.cachedChunks[chunkX] | |
#except Exception as e: | |
#print(e) | |
#print(x, y, z) | |
#print(self.cachedChunks.keys()) | |
#raise e | |
chunkZ = z >> 4 | |
byZ = byX.get(chunkZ) | |
if byZ is None: | |
self.loadRegion(x >> 9, z >> 9) | |
#try: | |
byZ = byX[chunkZ] | |
#except Exception as e: | |
#print(e) | |
#print("xyz:", x, y, z) | |
#print(byX.keys()) | |
#print(len(byX)) | |
#raise e | |
#try: | |
section = y >> 4 | |
if section > 0 and len(byZ) <= section: | |
self.loadRegion(x >> 9, z >> 9, section) | |
return byZ[section][(y % 16) * 256 + (z % 16) * 16 + (x % 16)] | |
#except Exception as e: | |
#print(x, y, z) | |
#print(len(byZ)) | |
#print(len(byZ[y >> 4])) | |
#raise e | |
class Backpack: | |
def __init__(self, contents=None): | |
if contents is None: | |
self.contents = defaultdict(int) | |
else: | |
self.contents = contents | |
def addBlock(self, block): | |
self.contents[block] += 1 | |
def getContents(self): | |
return self.contents | |
def getTotalCount(self): | |
return sum(self.contents.values()) | |
def __getitem__(self, key): | |
return self.contents.get(key, 0) | |
def __add__(self, other): | |
if type(other) is not Backpack: | |
raise TypeError("unsupported operand type(s) for +: 'Backpack' and '" + str(type(other))) | |
for k,v in other.contents.items(): | |
self.contents[k] += v | |
class MinePattern: | |
def __init__(self, generator, name, mask, standingOnY): | |
self.generatorFunction = generator | |
self.name = name | |
self.mask = mask | |
self.backpack = Backpack() | |
self.standingOnY = standingOnY | |
def generator(self, x, y, max_x): | |
return self.generatorFunction(x, y, max_x) | |
def getName(self): | |
return self.name | |
def getMask(self): | |
return self.mask | |
def getBackpack(self): | |
return self.backpack | |
def setBackpack(self, backpack): | |
self.backpack = backpack | |
def getStandingOnY(self): | |
return self.standingOnY | |
offsets = [(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1)] | |
def mine_block(x, y, z, backpack, blocks, mask, check_type=False): | |
block = blocks.getBlockCached(x, y, z) | |
# if block[1] & mask: | |
if block[1] == mask: | |
return | |
if check_type and not block[0] in minable: | |
return | |
# block[1] |= mask | |
#try: | |
block[1] = mask | |
#except Exception as e: | |
#print(block) | |
#raise e | |
backpack.addBlock(block[0]) | |
constraints = blocks.blockconstraints | |
for offset in offsets: | |
nx = x + offset[0] | |
ny = y + offset[1] | |
nz = z + offset[2] | |
if nx >= constraints[0] and nx < constraints[2] and nz >= constraints[1] and nz < constraints[3]: | |
mine_block(nx, ny, nz, backpack, blocks, mask, True) | |
def mine_rectangular_prism(x_min, x_max, y_min, y_max, z_min, z_max, backpack, blocks, mask): | |
for x in range(x_min, x_max + 1): | |
for y in range(y_min, y_max + 1): | |
for z in range(z_min, z_max + 1): | |
mine_block(x, y, z, backpack, blocks, mask) | |
def mine1x2(mining_area, pattern, blocks): | |
for x, y in pattern.generator(mining_area[0], pattern.getStandingOnY(), mining_area[2]): | |
# print(x, y) | |
mine_rectangular_prism(x, x, y + 1, y + 2, mining_area[1], mining_area[3], | |
pattern.getBackpack(), blocks, pattern.getMask()) | |
def singleTunnelPattern(space): | |
def generator(x, y, max_x): | |
while x <= max_x: | |
yield x, y | |
x += 1 + space | |
return generator | |
def twoLevelTunnelPattern(space): | |
def generator(x, y, max_x): | |
while x <= max_x - 2: | |
yield x, y | |
yield x + (space + 1) // 2, y + 2 | |
x += 1 + space | |
return generator | |
def split_area_into_regions(mining_rectangle): | |
mining_regions = [] | |
for i in range((mining_rectangle[0]) >> 9, ((mining_rectangle[2]) >> 9) + 1): | |
for j in range((mining_rectangle[1]) >> 9, (mining_rectangle[3] >> 9) + 1): | |
mining_regions.append([max(i << 9, mining_rectangle[0]), max((j << 9), mining_rectangle[1]), | |
min(((i + 1) << 9) - 1, mining_rectangle[2]), | |
min(((j + 1) << 9) - 1, mining_rectangle[3])]) | |
return mining_regions | |
def prepare_mining_patterns(): | |
# Create and configure object representing a mining pattern | |
# Whether the script should make a tunnel every block or every other block and so on. | |
# Visual example ('W's represent blocks): | |
# W WW WW W | W WWW WWW W | W WWWW WWWW W | |
# W WW WW W | W WWW WWW W | W WWWW WWWW W | |
# | |
# But also patterns on multiple levels: | |
# W WWW WWW W | W W W W W W | |
# W WWW WWW W | W W W W W W | |
# WWW WWW WWW | W W W W W W | |
# WWW WWW WWW | W W W W W W | |
# Not fully implemented - supports only some patterns | |
mask = 1 | |
patterns = [] | |
for y in range(12, 13): | |
for gap in range(13): | |
patterns.append(MinePattern(singleTunnelPattern(gap), | |
"One level tunnels with " + str(gap) + " blocks between standing on " + str(y) + " level", mask, | |
y)) | |
mask += 1 | |
for gap in range(3, 8, 2): | |
patterns.append(MinePattern(twoLevelTunnelPattern(gap), | |
"Two level tunnels with " + str(gap) + " blocks between standing on " + str(y) + " level", mask, y)) | |
mask += 1 | |
return patterns | |
def simulateForRegion(mining_area, patterns): | |
blocks = Blocks(mining_area) | |
print("Running simulation for area", mining_area, "for", len(patterns), "patterns") | |
for e, pattern in enumerate(patterns): | |
#print("Running simulation for", pattern.getName(), "in area", mining_area) | |
mine1x2(mining_area, pattern, blocks) | |
return {i.getName(): i.getBackpack().getContents() for i in patterns} | |
def process_work(todo, resultsQueue): | |
patterns = prepare_mining_patterns() | |
while True: | |
task = todo.get(); | |
try: | |
result = simulateForRegion(task, patterns) | |
resultsQueue.put(result) | |
except Exception as e: | |
import traceback | |
import sys | |
print(type(e), e) | |
print(task) | |
traceback.print_exc(file=sys.stderr) | |
resultsQueue.put({}) | |
for pattern in patterns: | |
pattern.setBackpack(Backpack()) | |
class Executor: | |
def __init__(self): | |
self.processes = [] | |
self.workCount = 0 | |
self.results = [] | |
self.aggregatedResult = None | |
if threadCount > 1: | |
from multiprocessing import Process, Queue | |
self.toDo = Queue() | |
self.resultsQueue = Queue() | |
for i in range(threadCount): | |
p = Process(target=process_work, args=(self.toDo, self.resultsQueue)) | |
self.processes.append(p) | |
p.start() | |
else: | |
self.patterns = prepare_mining_patterns() | |
def execute(self, task): | |
if threadCount > 1: | |
self.workCount += 1 | |
self.toDo.put(task) | |
else: | |
simulateForRegion(task, self.patterns) | |
def getResults(self, start): | |
if self.aggregatedResult is None: | |
endResult = {} | |
if threadCount > 1: | |
for i in range(1, self.workCount + 1): | |
current = self.resultsQueue.get() | |
for k in current: | |
current[k] = Backpack(current[k]) | |
self.results.append(current) | |
time = datetime.datetime.now() - start | |
progress = i / self.workCount | |
print(str(i), "of", str(self.workCount), "regions done.", 100 * (progress), "%", | |
"Elapsed time:", time, "Remaining:", (time * (1 - progress) / progress)) | |
for process in self.processes: | |
process.terminate() | |
for result in self.results: | |
for key, value in result.items(): | |
backpack = endResult.get(key) | |
if backpack is None: | |
backpack = Backpack() | |
endResult[key] = backpack | |
backpack += value | |
self.aggregatedResult = endResult | |
else: | |
for pattern in self.patterns: | |
endResult[pattern.getName()] = pattern.getBackpack() | |
self.aggregatedResult = endResult | |
return self.aggregatedResult | |
def simulate(mining_rectangle, start): | |
mining_regions = split_area_into_regions(mining_rectangle) | |
executor = Executor() | |
for mining_area in mining_regions: | |
executor.execute(mining_area) | |
results = executor.getResults(start); | |
with open(csv_filename, "w") as f: | |
f.write("Pattern,Diamond Ore Mined,Total Blocks Mined,Diamond Efficiency") | |
f.write("\n") | |
for key, value in results.items(): | |
backpack = value | |
totalCount = backpack.getTotalCount() | |
f.write(key) | |
f.write(",") | |
f.write(str(backpack[diamond])) | |
f.write(",") | |
f.write(str(totalCount)) | |
f.write(",") | |
if totalCount == 0: | |
f.write("N/A") | |
else: | |
f.write(str(100 * backpack[diamond] / backpack.getTotalCount())) | |
f.write("%") | |
f.write("\n") | |
print(key, end='\t') | |
print("Total Diamonds Found:", str(backpack[diamond]), end='\t', sep=" ") | |
print("Total Blocks Mined:", str(backpack.getTotalCount()), end='\t', sep=" ") | |
if totalCount == 0: | |
print("Diamond Efficiency: N/A") | |
else: | |
print("Diamond Efficiency: ", str(100 * backpack[diamond] / backpack.getTotalCount()), "%", sep='') | |
def timed_simulate(): | |
print("Starting simulation") | |
mining_rectangle = [-2048, -2048, 2047, 2047] # (x1, z1, x2, z2) | |
#mining_rectangle = [-2048, -2048, -1537, -1537] | |
start = datetime.datetime.now() | |
simulate(mining_rectangle, start) | |
end = datetime.datetime.now() | |
print("Finished simulation in ", end - start) | |
def profile(): | |
import cProfile | |
cProfile.run('timed_simulate()') | |
if __name__ == "__main__": | |
timed_simulate() | |
#profile() | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment