Skip to content

Instantly share code, notes, and snippets.

@spiwn
Forked from Sevadus/diamonds.py
Last active June 4, 2020 22:28
Show Gist options
  • Save spiwn/8c3545bb4d48ce8027e2b05facb38e1d to your computer and use it in GitHub Desktop.
Save spiwn/8c3545bb4d48ce8027e2b05facb38e1d to your computer and use it in GitHub Desktop.
Script to simulate mining diamonds in a Minecraft world and determine efficiency of different mining patterns
import anvil
import datetime
from collections import defaultdict
# Setting this to 1 turns off multi threading functionality
threadCount = 4
diamond = "minecraft:diamond_ore"
minable = {'coal_ore', 'iron_ore', 'redstone_ore', 'gold_ore', 'lapis_ore', 'emerald_ore'}
minable = set(map(lambda x: "minecraft:" + x, minable))
minable = set()
minable.add(diamond)
csv_filename = "minedata" + str(datetime.datetime.now().strftime("%Y%m%d-%H%M%S")) + ".csv"
air = ['air', -1]
twoToTheSixtyFourth = 2 ** 64
sectionsToParse = [0]
defaultBlockValue = -1
_Version_20w_17a = 2529
class Blocks:
def __init__(self, constraints):
self.chunkconstraints = [(constraints[0] >> 4), (constraints[1] >> 4), (constraints[2] >> 4),
(constraints[3] >> 4)]
self.blockconstraints = constraints
self.region = None
self.cachedChunks = {}
#self.parsingTime = datetime.timedelta()
#self.totalLoadingTime = datetime.timedelta()
#self.chunktime = datetime.timedelta()
def parseSection(self, chunk, section):
#start = datetime.datetime.now()
section = chunk.get_section(section)
#self.chunktime += datetime.datetime.now() - start
if section is None or section.get('BlockStates') is None:
return [air for i in range(4096)]
states = section['BlockStates'].value
palette = section['Palette']
bits = max((len(palette) - 1).bit_length(), 4)
state = 0
data = states[0]
if data < 0:
data += twoToTheSixtyFourth
bits_mask = 2 ** bits - 1
offset = 0
data_len = 64
# start = datetime.datetime.now()
result = [0 for i in range(4096)]
version = chunk.version.value
for i in range(4096):
if data_len < bits:
state += 1
new_data = states[state]
if new_data < 0:
new_data += twoToTheSixtyFourth
if version >= _Version_20w_17a:
data = new_data
data_len = 64
else:
leftover = data_len
data_len += 64
data = anvil.chunk.bin_append(new_data, data, leftover)
palette_id = data & bits_mask
data >>= bits
data_len -= bits
result[i] = [palette[palette_id]['Name'].value, defaultBlockValue]
#end = datetime.datetime.now()
#self.parsingTime += end - start
return result
def loadRegion(self, regionX, regionZ, section=None):
#print("Loading region: ", regionX, regionZ, section)
#regionLoadTimeEnd = start = datetime.datetime.now()
region = self.region
if region is None:
region = anvil.Region.from_file(f'server/world/region/r.{regionX}.{regionZ}.mca')
#regionLoadTimeEnd = datetime.datetime.now()
self.region = region
constraints = self.chunkconstraints
xStart = max((regionX << 5) + 0, constraints[0])
xStop = min((regionX << 5) + 32, constraints[2])
zStart = max((regionZ << 5) + 0, constraints[1])
zStop = min((regionZ << 5) + 32, constraints[3])
for x in range(xStart, xStop + 1):
byX = self.cachedChunks.setdefault(x, {})
for z in range(zStart, zStop + 1):
try:
chunk = anvil.Chunk.from_region(region, x, z)
except Exception as e:
print("Failed to load chunk: ", x, z)
raise e
sectionList = byX.setdefault(z, [])
if section:
if len(sectionList) <= section:
sectionList.extend([0 for i in range(1 + (section - len(sectionList)))])
sectionList[section] = self.parseSection(chunk, section)
elif not sectionList:
sectionList.extend([0 for i in sectionsToParse])
for sectionIndex in sectionsToParse:
sectionList[sectionIndex] = self.parseSection(chunk, sectionIndex)
#end = datetime.datetime.now()
#self.totalLoadingTime += end - start
#print("parsing time:", self.parsingTime, "loading", self.totalLoadingTime - self.parsingTime, "read file",
#regionLoadTimeEnd - start, "total", self.totalLoadingTime)
def getBlockCached(self, x, y, z):
chunkX = x >> 4
byX = self.cachedChunks.get(chunkX)
if byX is None:
self.loadRegion(x >> 9, z >> 9)
#try:
byX = self.cachedChunks[chunkX]
#except Exception as e:
#print(e)
#print(x, y, z)
#print(self.cachedChunks.keys())
#raise e
chunkZ = z >> 4
byZ = byX.get(chunkZ)
if byZ is None:
self.loadRegion(x >> 9, z >> 9)
#try:
byZ = byX[chunkZ]
#except Exception as e:
#print(e)
#print("xyz:", x, y, z)
#print(byX.keys())
#print(len(byX))
#raise e
#try:
section = y >> 4
if section > 0 and len(byZ) <= section:
self.loadRegion(x >> 9, z >> 9, section)
return byZ[section][(y % 16) * 256 + (z % 16) * 16 + (x % 16)]
#except Exception as e:
#print(x, y, z)
#print(len(byZ))
#print(len(byZ[y >> 4]))
#raise e
class Backpack:
def __init__(self, contents=None):
if contents is None:
self.contents = defaultdict(int)
else:
self.contents = contents
def addBlock(self, block):
self.contents[block] += 1
def getContents(self):
return self.contents
def getTotalCount(self):
return sum(self.contents.values())
def __getitem__(self, key):
return self.contents.get(key, 0)
def __add__(self, other):
if type(other) is not Backpack:
raise TypeError("unsupported operand type(s) for +: 'Backpack' and '" + str(type(other)))
for k,v in other.contents.items():
self.contents[k] += v
class MinePattern:
def __init__(self, generator, name, mask, standingOnY):
self.generatorFunction = generator
self.name = name
self.mask = mask
self.backpack = Backpack()
self.standingOnY = standingOnY
def generator(self, x, y, max_x):
return self.generatorFunction(x, y, max_x)
def getName(self):
return self.name
def getMask(self):
return self.mask
def getBackpack(self):
return self.backpack
def setBackpack(self, backpack):
self.backpack = backpack
def getStandingOnY(self):
return self.standingOnY
offsets = [(1, 0, 0), (-1, 0, 0), (0, 1, 0), (0, -1, 0), (0, 0, 1), (0, 0, -1)]
def mine_block(x, y, z, backpack, blocks, mask, check_type=False):
block = blocks.getBlockCached(x, y, z)
# if block[1] & mask:
if block[1] == mask:
return
if check_type and not block[0] in minable:
return
# block[1] |= mask
#try:
block[1] = mask
#except Exception as e:
#print(block)
#raise e
backpack.addBlock(block[0])
constraints = blocks.blockconstraints
for offset in offsets:
nx = x + offset[0]
ny = y + offset[1]
nz = z + offset[2]
if nx >= constraints[0] and nx < constraints[2] and nz >= constraints[1] and nz < constraints[3]:
mine_block(nx, ny, nz, backpack, blocks, mask, True)
def mine_rectangular_prism(x_min, x_max, y_min, y_max, z_min, z_max, backpack, blocks, mask):
for x in range(x_min, x_max + 1):
for y in range(y_min, y_max + 1):
for z in range(z_min, z_max + 1):
mine_block(x, y, z, backpack, blocks, mask)
def mine1x2(mining_area, pattern, blocks):
for x, y in pattern.generator(mining_area[0], pattern.getStandingOnY(), mining_area[2]):
# print(x, y)
mine_rectangular_prism(x, x, y + 1, y + 2, mining_area[1], mining_area[3],
pattern.getBackpack(), blocks, pattern.getMask())
def singleTunnelPattern(space):
def generator(x, y, max_x):
while x <= max_x:
yield x, y
x += 1 + space
return generator
def twoLevelTunnelPattern(space):
def generator(x, y, max_x):
while x <= max_x - 2:
yield x, y
yield x + (space + 1) // 2, y + 2
x += 1 + space
return generator
def split_area_into_regions(mining_rectangle):
mining_regions = []
for i in range((mining_rectangle[0]) >> 9, ((mining_rectangle[2]) >> 9) + 1):
for j in range((mining_rectangle[1]) >> 9, (mining_rectangle[3] >> 9) + 1):
mining_regions.append([max(i << 9, mining_rectangle[0]), max((j << 9), mining_rectangle[1]),
min(((i + 1) << 9) - 1, mining_rectangle[2]),
min(((j + 1) << 9) - 1, mining_rectangle[3])])
return mining_regions
def prepare_mining_patterns():
# Create and configure object representing a mining pattern
# Whether the script should make a tunnel every block or every other block and so on.
# Visual example ('W's represent blocks):
# W WW WW W | W WWW WWW W | W WWWW WWWW W
# W WW WW W | W WWW WWW W | W WWWW WWWW W
#
# But also patterns on multiple levels:
# W WWW WWW W | W W W W W W
# W WWW WWW W | W W W W W W
# WWW WWW WWW | W W W W W W
# WWW WWW WWW | W W W W W W
# Not fully implemented - supports only some patterns
mask = 1
patterns = []
for y in range(12, 13):
for gap in range(13):
patterns.append(MinePattern(singleTunnelPattern(gap),
"One level tunnels with " + str(gap) + " blocks between standing on " + str(y) + " level", mask,
y))
mask += 1
for gap in range(3, 8, 2):
patterns.append(MinePattern(twoLevelTunnelPattern(gap),
"Two level tunnels with " + str(gap) + " blocks between standing on " + str(y) + " level", mask, y))
mask += 1
return patterns
def simulateForRegion(mining_area, patterns):
blocks = Blocks(mining_area)
print("Running simulation for area", mining_area, "for", len(patterns), "patterns")
for e, pattern in enumerate(patterns):
#print("Running simulation for", pattern.getName(), "in area", mining_area)
mine1x2(mining_area, pattern, blocks)
return {i.getName(): i.getBackpack().getContents() for i in patterns}
def process_work(todo, resultsQueue):
patterns = prepare_mining_patterns()
while True:
task = todo.get();
try:
result = simulateForRegion(task, patterns)
resultsQueue.put(result)
except Exception as e:
import traceback
import sys
print(type(e), e)
print(task)
traceback.print_exc(file=sys.stderr)
resultsQueue.put({})
for pattern in patterns:
pattern.setBackpack(Backpack())
class Executor:
def __init__(self):
self.processes = []
self.workCount = 0
self.results = []
self.aggregatedResult = None
if threadCount > 1:
from multiprocessing import Process, Queue
self.toDo = Queue()
self.resultsQueue = Queue()
for i in range(threadCount):
p = Process(target=process_work, args=(self.toDo, self.resultsQueue))
self.processes.append(p)
p.start()
else:
self.patterns = prepare_mining_patterns()
def execute(self, task):
if threadCount > 1:
self.workCount += 1
self.toDo.put(task)
else:
simulateForRegion(task, self.patterns)
def getResults(self, start):
if self.aggregatedResult is None:
endResult = {}
if threadCount > 1:
for i in range(1, self.workCount + 1):
current = self.resultsQueue.get()
for k in current:
current[k] = Backpack(current[k])
self.results.append(current)
time = datetime.datetime.now() - start
progress = i / self.workCount
print(str(i), "of", str(self.workCount), "regions done.", 100 * (progress), "%",
"Elapsed time:", time, "Remaining:", (time * (1 - progress) / progress))
for process in self.processes:
process.terminate()
for result in self.results:
for key, value in result.items():
backpack = endResult.get(key)
if backpack is None:
backpack = Backpack()
endResult[key] = backpack
backpack += value
self.aggregatedResult = endResult
else:
for pattern in self.patterns:
endResult[pattern.getName()] = pattern.getBackpack()
self.aggregatedResult = endResult
return self.aggregatedResult
def simulate(mining_rectangle, start):
mining_regions = split_area_into_regions(mining_rectangle)
executor = Executor()
for mining_area in mining_regions:
executor.execute(mining_area)
results = executor.getResults(start);
with open(csv_filename, "w") as f:
f.write("Pattern,Diamond Ore Mined,Total Blocks Mined,Diamond Efficiency")
f.write("\n")
for key, value in results.items():
backpack = value
totalCount = backpack.getTotalCount()
f.write(key)
f.write(",")
f.write(str(backpack[diamond]))
f.write(",")
f.write(str(totalCount))
f.write(",")
if totalCount == 0:
f.write("N/A")
else:
f.write(str(100 * backpack[diamond] / backpack.getTotalCount()))
f.write("%")
f.write("\n")
print(key, end='\t')
print("Total Diamonds Found:", str(backpack[diamond]), end='\t', sep=" ")
print("Total Blocks Mined:", str(backpack.getTotalCount()), end='\t', sep=" ")
if totalCount == 0:
print("Diamond Efficiency: N/A")
else:
print("Diamond Efficiency: ", str(100 * backpack[diamond] / backpack.getTotalCount()), "%", sep='')
def timed_simulate():
print("Starting simulation")
mining_rectangle = [-2048, -2048, 2047, 2047] # (x1, z1, x2, z2)
#mining_rectangle = [-2048, -2048, -1537, -1537]
start = datetime.datetime.now()
simulate(mining_rectangle, start)
end = datetime.datetime.now()
print("Finished simulation in ", end - start)
def profile():
import cProfile
cProfile.run('timed_simulate()')
if __name__ == "__main__":
timed_simulate()
#profile()
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment