Skip to content

Instantly share code, notes, and snippets.

@ozgurgulsuna
Last active February 17, 2022 20:27
Show Gist options
  • Save ozgurgulsuna/03185633ce5b3ba4a53d309faffb5a99 to your computer and use it in GitHub Desktop.
Save ozgurgulsuna/03185633ce5b3ba4a53d309faffb5a99 to your computer and use it in GitHub Desktop.
'''
Written by A.B.Koku & I.Ozcil
Jan 28 2022
'''
# start with imports
import numpy as np
from skimage import io
import random
import matplotlib.pyplot as plt
import random
import time
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
import signal
# continue defining parameters
# maze parameters
nCorr = 8
# assume that corridors and bases have the same width/height
boxSize = 50
maxStep = 2 * boxSize # maximum number of pixels a player can travel in one call
'''
colors are defined in the following dictionary
Each dicitonary entry is a list as follows:
((R,G,B), base_value, number of time it is repeated in the initial maze)
'''
colorz = {
'black':((1,1,1), 0, 13),
'clr100':((225, 1, 1), 100, 1),
'clr50':((1, 255, 1), 50, 2),
'clr30':((1, 1, 255), 30, 2),
'clr20':((200, 200, 1), 20, 2),
'clr10':((255, 1, 255), 10, 2),
'clr9':((1, 255, 255), 9, 3),
'clr8':((1,1,150), 8, 3),
'clr7':((120,120,40), 7, 3),
'clr6':((150,1,150), 6, 3),
'clr5':((1,150,150), 5, 3),
'clr4':((222,55,222), 4, 3),
'clr3':((1, 99, 55), 3, 3),
'clr2':((200, 100, 10),2, 3),
'clr1':((100, 10, 200),1, 3)
}
pathPen = (255,0,0) # red spared for drawing paths
maxTime = 0.2 # number of seconds allowed for a player to return from run()
# derived variables
imSize = nCorr * 2 * boxSize - boxSize
hw = int(boxSize / 2) # half of a box not really need but anyway
# 1-9 digits are used to label different paths on user responses
# let's generate digit images
digits = []
digit_size = (7,5)
base_image = np.full(digit_size, 255, dtype=np.uint8)
for d in range(10):
fname = f'd{d}.png'
img = Image.fromarray(base_image)
ImageDraw.Draw(img).text((0,-2), str(d), 0)
img.save(fname)
digits.append(io.imread(fname))
# set up timers to limit execution time for run() function calls
# setting timeout values for players and game engine
timeout_for_game = 1e6
timeout_for_players = 0.2
# for alarm handling, timeout handler function is defined
def timeout_handler(sign, fr):
'''
this function handles the timeout cases which took longer than 0.2 seconds
for processes took longer than timeout value, it raises exception
'''
# a print as timeout error
print('timeout has occured')
raise Exception()
# a timer to SIGALRM variable of signal module is set
signal.signal(signal.SIGALRM, timeout_handler)
# timer is set here using timeout value defined at top
signal.setitimer(signal.ITIMER_REAL, timeout_for_game)
# finally a utility function before classes
def printIF(mess, printIt):
'''
pretty straight forward, but very useful, but time consuming,
consinder not using at all if performance is your ultimage issue,
and you make zillions of calls to this function
'''
if printIt:
print(mess)
def TotalPoints():
res = 0
for clr in colorz:
res += colorz[clr][1] * colorz[clr][2]
return res
# a class to manage bases, a bases is nothing but one of those colored squres on the maze
class aBase():
digits = []
def __init__(self, name, bbCorner, size, color, points):
self.name = name
self.bbCorner = bbCorner
self.size = size
self.color = color
self.points = points
self.guests = []
self.baseTaken = False
# define the image for points
# c1 = np.concatenate((digits[1], digits[0], 255*np.ones((7,1)), digits[0]), axis = 1)
ds = {f'{d}':d for d in range(10)} # get a string key for a digit from 0 to 9
pstr = str(points)
blank = 255 * np.ones((digits[0].shape[0], 1))
pimg = blank.copy()
for d in pstr:
pimg = np.concatenate((pimg, digits[ds[d]], blank), axis=1)
baseimg = self.color * np.ones((*pimg.shape,3), np.uint8)
baseimg[np.where(pimg == 0)] = 255 # paint the base image with the digit with white
self.pointsImage = baseimg
def paint(self, img, ShowPoints = False):
'''
paints itself on the image
'''
# base index
y,x = self.bbCorner[0], self.bbCorner[1]
# from base index to bounding box coordinates
by = y * 2 * self.size - self.size
bx = x * 2 * self.size - self.size
# draw box on img
img[by : by + self.size,
bx : bx + self.size,:] = self.color
# draw points for base
if ShowPoints:
#img[yL:yL+h, xL:xL+w, 1] = header
dh, dw, dummy = self.pointsImage.shape
dy = by + int((self.size - dh)/2)
dx = bx + int((self.size - dw)/2)
img[dy:dy+dh, dx:dx+dw, :] = self.pointsImage
def __back2black__(self):
# someone conquered this bases, points are granted, goes back to black
self.name = 'black'
#self.points = 0
self.baseTaken = True
self.color = (1,1,1)
self.guests = []
def registerEntry(self, pk, steps, execTime):
# register those who enter the base at the same time step
# then the winner will be the one with maximum step
if not self.baseTaken:
self.guests.append({'ID':pk, 'Remaining': steps, 'Execution': execTime})
#print(f'{pk} enters {self.name}:{self.points}:{self.color}')
def andTheWinnerIs(self):
# return winner if there is one
if len(self.guests) > 0:
resSorted = sorted(self.guests, key = lambda d: d['Remaining'])
winnerz = []
for guest in resSorted:
if guest['Remaining'] == resSorted[-1]['Remaining']: # in case of equality, faster returned code wins
winnerz.append({'ID':guest['ID'], 'Execution': guest['Execution']})
res2Sorted = sorted(winnerz, key = lambda d: d['Execution']) # we assume that the execution times will not be equal in practice
winner = res2Sorted[0] # this is the winning player
pointsWon = self.points
self.__back2black__()
return winner['ID'], pointsWon
else:
return 'John Doe', -1
def __str__(self):
# enable self printing
return f'{self.name}: with color {self.color} at {self.bbCorner}'
class daMaze():
'''
This is the main class that manages the maze that is composed of bases
'''
def __init__(self, numCorridors, colorz):
self.nCorr = numCorridors
self.colorz = colorz
self.bases = {}
#bases will have points in between [1,...,9] x 3, [10, 20, 30, 50] x 2 , [100] x 1
#cmask = np.ones((imSize,imSize,3), dtype=np.uint8) * 255 # mask is white
# get colors
# generate all base indices and shuffle their order
coordz = [(i,j) for i in range(1,self.nCorr) for j in range(1,self.nCorr)]
random.shuffle(coordz)
# color bases
for Dclr in self.colorz:
clr, clrN = self.colorz[Dclr][0], self.colorz[Dclr][2] # get color and its base value using the current key
# color as many times as this color needs to show up in the maze
for i in range(clrN):
cind = coordz.pop()
self.bases[f'{cind[0]},{cind[1]}'] = aBase(name = Dclr, bbCorner = cind, size= boxSize, color = clr, points = self.colorz[Dclr][1])
def paint(self, img, ShowPoints = False):
# paints the maze on the given image: img
for baseK in self.bases:
self.bases[baseK].paint(img, ShowPoints)
def registerPath(self, img, path, playerKey, remSteps, execTime):
'''
path is registered for player playerKey with speed
'''
basecolors= [ np.array([0,0,0]), np.array([255,255,255])]
newcolors = [np.array([-1,-1,-1])]
for i in range(len(path)-1):
y1,x1 = path[i][0], path[i][1]
y2,x2 = path[i+1][0], path[i+1][1]
if x1 == x2: # moving along Y
ind = [[y, x1] for y in range(y1+np.sign(y2-y1), y2, np.sign(y2-y1))]
else: # moving along X
ind = [[y1, x] for x in range(x1+np.sign(x2-x1), x2, np.sign(x2-x1))]
for [yi,xi] in ind:
remSteps -= 1 # decrement speed
pixcolor = img[yi,xi,:]
if not ((pixcolor == basecolors).all(axis=1)).any(): # if this is a new color register it
if not ((pixcolor == newcolors).all(axis=1)).any():
yind = int((yi+99)/100)
xind = int((xi+99)/100)
bKey = f'{yind},{xind}'
# register this user in the base
self.bases[bKey].registerEntry(playerKey, remSteps, execTime)
newcolors.append(pixcolor)
else: # back on no point point region
newcolors = [np.array([-1,-1,-1])]
def AnnounceWinners(self):
# those who made a move that is worth some points are listed and returned
res = {}
for baseK in self.bases:
pk, pt = self.bases[baseK].andTheWinnerIs()
if pt > -1: # this is not john doe
# check if pk is already in the dictionary
if pk in res.keys(): # than we have at least 1 point already registered
res[pk].append(pt) # add the point to the end
else:
res[pk] = [pt] # add a list with the first point in it
return res
def RemainingPoints(self):
# returns the remaining points on the current game board
res = 0
for baseK in self.bases:
if not self.bases[baseK].baseTaken:
res += self.bases[baseK].points
return res
def DrawPolyLine(self, img, cList, header = None, dpen = (255,0,0)):
'''
draws multiple lines using the given list of points, i.e. cList, on to the image: img
header is used for printing the digit IDs of groups
dpne is the pen color
'''
for i in range(len(cList)-1):
P1, P2 = cList[i], cList[i+1]
maxNP = max(( abs(P2[0]-P1[0]) , abs(P2[1]-P1[1])))
img[ np.linspace(P1[0],P2[0], maxNP).astype('int'), np.linspace(P1[1],P2[1], maxNP).astype('int'), :] = dpen
# once line is onde if header is not None, draw the header
if header is not None: # draw the header
h,w = header.shape
yL, xL = cList[-1] # coordinates of the last point
H, W, dummy = img.shape
if yL + h >= H:
yL = H - h - 2
if xL + w >= W:
xL = W - w - 2
img[yL:yL+h, xL:xL+w, 1] = header
def TrimPath(self, path, L, debugMode = False):
'''
path is the candidate path, i.e a list of points [[y1,x1], ... [yn,xn]]
the function assumes that [y1,x1] is where the agent currently is,
and hence [y2,x2] is the first point to move
L is the total distance that the agent can cover
function returns the path trimmed so that it has a length of L
'''
# first veryfy path
res = [path[0]] # initially coordinate is approved by default, others will be added if they are valid and within the span of L
distRemaining = L # originially this is the distance to be covered by the send path
for i in range(len(path)-1): # using pairs move on the path
# move on only any distance to move is left
printIF(f'remaining distance: {distRemaining}', debugMode)
if distRemaining <= 0:
return res
# get the points and their coordinates explicitly
p1, p2 = path[i], path[i+1] # get two consequitive point coordinates
try:
y1, x1 = path[i][0], path[i][1]
y2, x2 = path[i+1][0], path[i+1][1]
dy = y2 - y1 #p2[0] - p1[0]
dx = x2 - x1 #p2[1] - p1[1]
# one of these has to be zero on a N4 path
if abs(dx) >0 and abs(dy)>0: # we have a problem, one of them has to be zero on a N4 path
# just return the valid path found so far
return res
# we also have a problem if consequtive points are the same, if so just ignore the latest one
if not(dx == 0 and dy == 0): #
pathL = max(abs(dy), abs(dx)) # length between p1-p2
if pathL <= distRemaining: # this part of the path (p1 to p2) completely belongs to the resulting path
res.append(p2)
distRemaining -= pathL
printIF(f'moving {pathL} from {[y1,x1]} to {[y2,x2]}', debugMode)
else: # this is the tricky part, some part of the path will belong
# partial path should expand either in X or Y direction
# note that either dx or dy has to be zero at all times
if abs(dx) > 0: # going in X direction
res.append([y1, x1+np.sign(dx)*distRemaining])
printIF(f'moving X {np.sign(dx)*distRemaining} from {[y1,x1]} to {[y1,x1+np.sign(dx)*distRemaining]}', debugMode)
else: # going in Y direction
res.append([y1+np.sign(dy)*distRemaining, x1])
printIF(f'moving Y {np.sign(dy)*distRemaining} from {[y1,x1]} to {[y1+np.sign(dy)*distRemaining,x1]}', debugMode)
return res
except:
printIF(f'most probably {p1}:{p2} did not correspond to a path', debugMode)
return res
class LetsPlayAGame():
def __init__(self, Players, initial_locations, numCorridors, colorz, imageSize, digits, maxStep):
self.numSteps = 0
self.numBoards = 0
self.nCorr = numCorridors
self.colorz = colorz
self.imSize = imageSize
self.maxStep = maxStep
# keep track of players
self.Players = Players
# set up a new board
self.ResetBoard()
# set the digits library for the bases
self.digits = digits
aBase.digits = digits
# generate new board
def ResetBoard(self):
self.aMaze = daMaze(self.nCorr, self.colorz)
self.maze = np.ones((self.imSize,self.imSize,3), dtype=np.uint8) * 255 # mask is white
self.pmaze = self.maze.copy()
self.aMaze.paint(self.maze)
self.aMaze.paint(self.pmaze, True)
self.numBoards += 1
def PlayAStep(self, debugMode = False):
# generate info
info = LetsPlayAGame.GenerateInfo(self.Players)
res = {} # keeps track of user reponse and path
rr = [] # used for sorting / ranking returned results
mess = '' # message to return at the end
# send info to all players
for pk in self.Players:
player = self.Players[pk][0] # get the player object
# player can only play if it has some positive points
if self.Players[pk][-1] > 0:
tStart = time.perf_counter()
signal.setitimer(signal.ITIMER_REAL, timeout_for_players)
#path = player.run(self.maze, info)
try:
path = player.run(self.maze, info)
except Exception as e:
#print(f'{pk} failed!!!\n')
path = []
signal.setitimer(signal.ITIMER_REAL, timeout_for_game)
tExec = time.perf_counter() - tStart
# log result for the current player, -1 is number of pixels to cover and to be updated later
res[pk] = [player, path, tExec, -1]
rr.append({'Playername':pk, 'player':player, 'path':path, 'time2run':tExec})
else:
printIF(f'{pk} did not play, no points left', debugMode)
# if no one can play return
if len(rr) < 1:
return [], 'No one left to play'
# now sort players in terms of their speed if there is more than 1 player
rr_ranked = sorted(rr, key = lambda d: d['time2run'])
fTime = rr_ranked[0]["time2run"] # fastest time
sTime = rr_ranked[-1]["time2run"] # slowest time
#print(f'{fTime}:{sTime}')
LetsPlayAGame.printIF(f'fastest in:{fTime} slowest in: {sTime}\n', debugMode)
# best goes for maxStep, worst goes for maxStep/2, update all players coverage
for fp in rr_ranked: # go from fastest to slowest
pk = fp['Playername'] # get player name / key
# calculate distance for each player
if len(rr_ranked) > 1:
res[pk][3] = int(self.maxStep/2 + self.maxStep/2 * (1- (res[pk][2]-fTime)/(sTime-fTime)) )
else:
res[pk][3] = self.maxStep
# validate path, check if current player can go for res[pk][3] pixels
LetsPlayAGame.printIF(f'{"{0:.>10}".format(pk)} returned in {round(res[pk][2], 4)} \t step size: {res[pk][3]}', debugMode)
pPath = [self.Players[pk][-2], *res[pk][1]] # proposed path from the current point on
LetsPlayAGame.printIF(f'trimmin: {pPath}', debugMode)
tPath = self.aMaze.TrimPath(pPath, res[pk][3], debugMode) # trim propsoed path
self.Players[pk][2].append([pPath, tPath, res[pk][2]]) # keep a history of proposed and accepted paths
LetsPlayAGame.printIF(f'proposed path:{pPath}, \nresulting path:{tPath}\n', debugMode)
# update path on res
res[pk][1] = tPath
self.aMaze.registerPath(self.maze, tPath, pk, res[pk][3], res[pk][2])
# get the winners of the current round and update points to complete the step
winners = self.aMaze.AnnounceWinners()
self.aMaze.paint(self.maze)
self.aMaze.paint(self.pmaze, True)
# prepare message and draw the result on pmaze
for pk in self.Players:
# update is required on the image if the player could have played, i.e. it has non-zero points
if self.Players[pk][-1] > 0:
fullP = res[pk][1] #[ Players[pk][2], *res[pk][1]]
LetsPlayAGame.printIF(fullP, debugMode)
self.aMaze.DrawPolyLine(self.pmaze, fullP, header = self.digits[self.Players[pk][1]])
# assume last point in fullP is reachable
self.Players[pk][-2] = fullP[-1]
# if pk is a winner of more than 0 points provide details:
if pk in winners.keys() and sum(winners[pk]) > 0:
self.Players[pk][-1] -= sum(winners[pk])
mess += f'{"{0: >10}".format(pk)}({self.Players[pk][1]}) currently has {self.Players[pk][-1]} points -> {self.Players[pk][-1] + sum(winners[pk])}-{sum(winners[pk])}={self.Players[pk][-1]}\n'
self.Players[pk][2][-1].append(sum(winners[pk]))
else:
mess += f'{"{0: >10}".format(pk)}({self.Players[pk][1]}) currently has {self.Players[pk][-1]} points\n'
self.Players[pk][2][-1].append(0)
# finally increment step
self.numSteps += 1
# finally return the winners that has more than 0 points, and a session summary
return {f'{pk}':sum(winners[pk]) for pk in winners.keys() if sum(winners[pk])>0}, mess
@staticmethod
def GenerateInfo(Players):
'''
Strips the internal data structure that keeps track of players into a verion
that can be sent to the run() function of players
'''
info = {}
for gName in Players:
# just return the last 2 elements in the list, this is all the players need
# rest is for management of the game
info[gName] = Players[gName][-2:]
return info
@staticmethod
def printIF(mess, printIt):
'''
pretty straight forward, but very useful, but time consuming,
consinder not using at all if performance is your ultimage issue,
and you make zillions of calls to this function
'''
if printIt:
print(mess)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment