359 lines
11 KiB
Python
359 lines
11 KiB
Python
import time
|
|
import random
|
|
import threading
|
|
import torch
|
|
from math import sqrt, inf
|
|
#from multiprocessing import Event
|
|
from abc import ABC, abstractmethod
|
|
from threading import Event
|
|
from queue import PriorityQueue, Empty
|
|
|
|
|
|
class Action():
|
|
# Should hold the data representing an action
|
|
# Actions are applied to a State in State.mutate
|
|
|
|
def __init__(self, player, data):
|
|
self.player = player
|
|
self.data = data
|
|
|
|
def __eq__(self, other):
|
|
# This should be implemented differently
|
|
# Two actions of different generations will never be compared
|
|
if type(other) != type(self):
|
|
return False
|
|
return str(self.data) == str(other.data)
|
|
|
|
def __str__(self):
|
|
# should return visual representation of this action
|
|
# should start with < and end with >
|
|
return "<P"+str(self.player)+"-"+str(self.data)+">"
|
|
|
|
class Universe():
|
|
def newOpen(self, node):
|
|
pass
|
|
|
|
def merge(self, node):
|
|
return node
|
|
|
|
def clearPQ(self):
|
|
pass
|
|
|
|
def iter(self):
|
|
return []
|
|
|
|
def activateEdge(self, head):
|
|
pass
|
|
|
|
class QueueingUniverse(Universe):
|
|
def __init__(self):
|
|
self.pq = []
|
|
|
|
def newOpen(self, node):
|
|
heapq.headpush(self.pq, (node.priority, node))
|
|
|
|
def clearPQ(self):
|
|
self.pq = []
|
|
|
|
def iter(self):
|
|
yield heapq.heappop(self.pq)
|
|
|
|
def activateEdge(self, head):
|
|
head._activateEdge()
|
|
|
|
|
|
class State(ABC):
|
|
# Hold a representation of the current game-state
|
|
# Allows retriving avaible actions (getAvaibleActions) and applying them (mutate)
|
|
# Mutations return a new State and should not have any effect on the current State
|
|
# Allows checking itself for a win (checkWin) or scoring itself based on a simple heuristic (getScore)
|
|
# The calculated score should be 0 when won; higher when in a worse state; highest for loosing
|
|
# getPriority is used for prioritising certain Nodes / States when expanding / walking the tree
|
|
|
|
def __init__(self, curPlayer=0, generation=0, playersNum=2):
|
|
self.curPlayer = curPlayer
|
|
self.generation = generation
|
|
self.playersNum = playersNum
|
|
|
|
@abstractmethod
|
|
def mutate(self, action):
|
|
# Returns a new state with supplied action performed
|
|
# self should not be changed
|
|
return State(curPlayer=(self.curPlayer+1) % self.playersNum, generation=self.generation+1, playersNum=self.playersNum)
|
|
|
|
@abstractmethod
|
|
def getAvaibleActions(self):
|
|
# Should return an array of all possible actions
|
|
return [i]
|
|
|
|
def askUserForAction(self, actions):
|
|
return choose('What does player '+str(self.curPlayer)+' want to do?', actions)
|
|
|
|
# improveMe
|
|
def getPriority(self, score, cascadeMemory=None):
|
|
# Used for ordering the priority queue
|
|
# Priority should not change for the same root
|
|
# Lower prioritys get worked on first
|
|
# Higher generations should have higher priority
|
|
# Higher cascadeMemory (more influence on higher-order-scores) should have lower priority
|
|
return score + self.generation*0.5 - cascadeMemory*0.35
|
|
|
|
@abstractmethod
|
|
def checkWin(self):
|
|
# -1 -> Draw
|
|
# None -> Not ended
|
|
# n e N -> player n won
|
|
return None
|
|
|
|
# improveMe
|
|
def getScoreFor(self, player):
|
|
# 0 <= score <= 1; should return close to zero when we are winning
|
|
w = self.checkWin()
|
|
if w == None:
|
|
return 0.5
|
|
if w == player:
|
|
return 0
|
|
if w == -1:
|
|
return 0.9
|
|
return 1
|
|
|
|
@abstractmethod
|
|
def __str__(self):
|
|
# return visual rep of state
|
|
return "[#]"
|
|
|
|
@abstractmethod
|
|
def getTensor(self, phase='default'):
|
|
return torch.tensor([0])
|
|
|
|
@classmethod
|
|
def getModel():
|
|
pass
|
|
|
|
def getScoreNeural(self):
|
|
return self.model(self.getTensor())
|
|
|
|
|
|
class Node():
|
|
def __init__(self, state, universe=None, parent=None, lastAction=None):
|
|
self.state = state
|
|
if universe==None:
|
|
universe = Universe()
|
|
self.universe = universe
|
|
self.parent = parent
|
|
self.lastAction = lastAction
|
|
|
|
self._childs = None
|
|
self._scores = [None]*self.state.playersNum
|
|
self._strongs = [None]*self.state.playersNum
|
|
self._alive = True
|
|
self._cascadeMemory = 0 # Used for our alternative to alpha-beta pruning
|
|
|
|
def kill(self):
|
|
self._alive = False
|
|
|
|
def revive(self):
|
|
self._alive = True
|
|
|
|
@property
|
|
def childs(self):
|
|
if self._childs == None:
|
|
self._expand()
|
|
return self._childs
|
|
|
|
def _expand(self):
|
|
self._childs = []
|
|
actions = self.state.getAvaibleActions()
|
|
for action in actions:
|
|
newNode = Node(self.state.mutate(action), self.universe, self, action)
|
|
self._childs.append(self.universe.merge(newNode))
|
|
|
|
def getStrongFor(self, player):
|
|
if self._strongs[player]!=None:
|
|
return self._strongs[player]
|
|
else:
|
|
return self.getScoreFor(player)
|
|
|
|
def _pullStrong(self): # Currently Expecti-Max
|
|
strongs = [None]*self.playersNum
|
|
for p in range(self.playersNum):
|
|
cp = self.state.curPlayer
|
|
if cp == p: # P owns the turn; controlls outcome
|
|
best = inf
|
|
for c in self.childs:
|
|
if c.getStrongFor(p) < best:
|
|
best = c.getStrongFor(p)
|
|
strongs[p] = best
|
|
else:
|
|
scos = [(c.getStrongFor(p), c.getStrongFor(cp)) for c in self.childs]
|
|
scos.sort(key=lambda x: x[1])
|
|
betterHalf = scos[:max(3,int(len(scos)/3))]
|
|
myScores = [bh[0]**2 for bh in betterHalf]
|
|
strongs[p] = sqrt(myScores[0]*0.75 + sum(myScores)/(len(myScores)*4))
|
|
update = False
|
|
for s in range(self.playersNum):
|
|
if strongs[s] != self._strongs[s]:
|
|
update = True
|
|
break
|
|
self._strongs = strongs
|
|
if update:
|
|
if self.parent!=None:
|
|
cascade = self.parent._pullStrong()
|
|
else:
|
|
cascade = 2
|
|
self._cascadeMemory = self._cascadeMemory/2 + cascade
|
|
return cascade + 1
|
|
self._cascadeMemory /= 2
|
|
return 0
|
|
|
|
def forceStrong(self, depth=3):
|
|
if depth==0:
|
|
self.strongDecay()
|
|
else:
|
|
if len(self.childs):
|
|
for c in self.childs:
|
|
c.forceStrong(depth-1)
|
|
else:
|
|
self.strongDecay()
|
|
|
|
def strongDecay(self):
|
|
if self._strongs == [None]*self.playersNum:
|
|
if not self.scoresAvaible():
|
|
self._calcScores()
|
|
self._strongs = self._scores
|
|
return self.parent._pullStrong()
|
|
return None
|
|
|
|
def getSelfScore(self):
|
|
return self.getScoreFor(self.curPlayer)
|
|
|
|
def getScoreFor(self, player):
|
|
if self._scores[player] == None:
|
|
self._calcScore(player)
|
|
return self._scores[player]
|
|
|
|
def scoreAvaible(self, player):
|
|
return self._scores[player] != None
|
|
|
|
def scoresAvaible(self):
|
|
for p in self._scores:
|
|
if p==None:
|
|
return False
|
|
return True
|
|
|
|
def strongScoresAvaible(self):
|
|
for p in self._strongs:
|
|
if p==None:
|
|
return False
|
|
return True
|
|
|
|
def _calcScores(self):
|
|
for p in range(self.state.playersNum):
|
|
self._calcScore(p)
|
|
|
|
def _calcScore(self, player):
|
|
self._scores[player] = self.state.getScoreFor(player)
|
|
|
|
@property
|
|
def priority(self):
|
|
return self.state.getPriority(self.score)
|
|
|
|
@property
|
|
def playersNum(self):
|
|
return self.state.playersNum
|
|
|
|
@property
|
|
def avaibleActions(self):
|
|
r = []
|
|
for c in self.childs:
|
|
r.append(c.lastAction)
|
|
return r
|
|
|
|
@property
|
|
def curPlayer(self):
|
|
return self.state.curPlayer
|
|
|
|
def _activateEdge(self):
|
|
if not self.strongScoresAvaible():
|
|
self.universe.newOpen(self)
|
|
else:
|
|
for c in self.childs:
|
|
c._activateEdge()
|
|
|
|
def __str__(self):
|
|
s = []
|
|
if self.lastAction == None:
|
|
s.append("[ {ROOT} ]")
|
|
else:
|
|
s.append("[ -> "+str(self.lastAction)+" ]")
|
|
s.append("[ turn: "+str(self.state.curPlayer)+" ]")
|
|
s.append(str(self.state))
|
|
s.append("[ score: "+str(self.getSelfScore())+" ]")
|
|
return '\n'.join(s)
|
|
|
|
def choose(txt, options):
|
|
while True:
|
|
print('[*] '+txt)
|
|
for num,opt in enumerate(options):
|
|
print('['+str(num+1)+'] ' + str(opt))
|
|
inp = input('[> ')
|
|
try:
|
|
n = int(inp)
|
|
if n in range(1,len(options)+1):
|
|
return options[n-1]
|
|
except:
|
|
pass
|
|
for opt in options:
|
|
if inp==str(opt):
|
|
return opt
|
|
if len(inp)==1:
|
|
for opt in options:
|
|
if inp==str(opt)[0]:
|
|
return opt
|
|
print('[!] Invalid Input.')
|
|
|
|
class Runtime():
|
|
def __init__(self, initState):
|
|
universe = Universe()
|
|
self.head = Node(initState,universe = universe)
|
|
|
|
def performAction(self, action):
|
|
for c in self.head.childs:
|
|
if action == c.lastAction:
|
|
self.head.universe.clearPQ()
|
|
self.head.kill()
|
|
self.head = c
|
|
self.head.universe.activateEdge(self.head)
|
|
return
|
|
raise Exception('No such action avaible...')
|
|
|
|
def turn(self, bot=None, calcDepth=7):
|
|
print(str(self.head))
|
|
if bot==None:
|
|
c = choose('Select action?', ['human', 'bot', 'undo'])
|
|
if c=='undo':
|
|
self.head = self.head.parent
|
|
return
|
|
bot = c=='bot'
|
|
if bot:
|
|
self.head.forceStrong(calcDepth)
|
|
opts = []
|
|
for c in self.head.childs:
|
|
opts.append((c, c.getStrongFor(self.head.curPlayer)))
|
|
opts.sort(key=lambda x: x[1])
|
|
print('[i] Evaluated Options:')
|
|
for o in opts:
|
|
#print('['+str(o[0])+']' + str(o[0].lastAction) + " (Score: "+str(o[1])+")")
|
|
print('[ ]' + str(o[0].lastAction) + " (Score: "+str(o[1])+")")
|
|
print('[#] I choose to play: ' + str(opts[0][0].lastAction))
|
|
self.performAction(opts[0][0].lastAction)
|
|
else:
|
|
action = self.head.askUserForAction(self.head.avaibleActions)
|
|
self.performAction(action)
|
|
|
|
def game(self, bots=None, calcDepth=7):
|
|
if bots==None:
|
|
bots = [None]*self.head.playersNum
|
|
while True:
|
|
self.turn(bots[self.head.curPlayer], calcDepth)
|