Great Refactor

This commit is contained in:
Dominik Moritz Roth 2024-06-10 18:10:27 +02:00
parent 3cb01a2e7b
commit d6a7530599
14 changed files with 944 additions and 742 deletions

View File

@ -1,18 +0,0 @@
class State():
pass
class Action():
pass
class BotAction():
pass
class PlayerAction():
pass
class EnvAction():
pass

View File

@ -1,669 +0,0 @@
if __name__ == '__main__':
print('[!] VacuumDecay should not be started directly')
exit()
import os
import io
import time
import random
import threading
import torch
import torch.nn as nn
from torch import optim
from math import sqrt, pow, inf
#from multiprocessing import Event
from abc import ABC, abstractmethod
from threading import Event
from queue import PriorityQueue, Empty
from dataclasses import dataclass, field
from typing import Any
import random
import datetime
import pickle
class Action():
# Should hold the data representing an action
# Actions are applied to a State in State.mutate
def __init__(self, player, data):
self.player = player
self.data = data
def __eq__(self, other):
# This should be implemented differently
# Two actions of different generations will never be compared
if type(other) != type(self):
return False
return str(self.data) == str(other.data)
def __str__(self):
# should return visual representation of this action
# should start with < and end with >
return "<P"+str(self.player)+"-"+str(self.data)+">"
class State(ABC):
# Hold a representation of the current game-state
# Allows retriving avaible actions (getAvaibleActions) and applying them (mutate)
# Mutations return a new State and should not have any effect on the current State
# Allows checking itself for a win (checkWin) or scoring itself based on a simple heuristic (getScore)
# The calculated score should be 0 when won; higher when in a worse state; highest for loosing
# getPriority is used for prioritising certain Nodes / States when expanding / walking the tree
def __init__(self, curPlayer=0, generation=0, playersNum=2):
self.curPlayer = curPlayer
self.generation = generation
self.playersNum = playersNum
@abstractmethod
def mutate(self, action):
# Returns a new state with supplied action performed
# self should not be changed
return State(curPlayer=(self.curPlayer+1) % self.playersNum, generation=self.generation+1, playersNum=self.playersNum)
@abstractmethod
def getAvaibleActions(self):
# Should return an array of all possible actions
return []
def askUserForAction(self, actions):
return choose('What does player '+str(self.curPlayer)+' want to do?', actions)
# improveMe
def getPriority(self, score, cascadeMemory):
# Used for ordering the priority queue
# Priority should not change for the same root
# Lower prioritys get worked on first
# Higher generations should have higher priority
# Higher cascadeMemory (more influence on higher-order-scores) should have lower priority
return -cascadeMemory + 100
@abstractmethod
def checkWin(self):
# -1 -> Draw
# None -> Not ended
# n e N -> player n won
return None
# improveMe
def getScoreFor(self, player):
# 0 <= score <= 1; should return close to zero when we are winning
w = self.checkWin()
if w == None:
return 0.5
if w == player:
return 0
if w == -1:
return 0.9
return 1
@abstractmethod
def __str__(self):
# return visual rep of state
return "[#]"
@abstractmethod
def getTensor(self, player=None, phase='default'):
if player == None:
player = self.curPlayer
return torch.tensor([0])
@classmethod
def getModel(cls, phase='default'):
pass
def getScoreNeural(self, model, player=None, phase='default'):
return model(self.getTensor(player=player, phase=phase)).item()
class Universe():
def __init__(self):
self.scoreProvider = 'naive'
def newOpen(self, node):
pass
def merge(self, node):
return node
def clearPQ(self):
pass
def iter(self):
return []
def activateEdge(self, head):
pass
@dataclass(order=True)
class PQItem:
priority: int
data: Any = field(compare=False)
class QueueingUniverse(Universe):
def __init__(self):
super().__init__()
self.pq = PriorityQueue()
def newOpen(self, node):
item = PQItem(node.getPriority(), node)
self.pq.put(item)
def merge(self, node):
self.newOpen(node)
return node
def clearPQ(self):
self.pq = PriorityQueue()
def iter(self):
while True:
try:
yield self.pq.get(False).data
except Empty:
return None
def activateEdge(self, head):
head._activateEdge()
class Node():
def __init__(self, state, universe=None, parent=None, lastAction=None):
self.state = state
if universe == None:
print('[!] No Universe defined. Spawning one...')
universe = Universe()
self.universe = universe
self.parent = parent
self.lastAction = lastAction
self._childs = None
self._scores = [None]*self.state.playersNum
self._strongs = [None]*self.state.playersNum
self._alive = True
self._cascadeMemory = 0 # Used for our alternative to alpha-beta pruning
def kill(self):
self._alive = False
def revive(self):
self._alive = True
@property
def childs(self):
if self._childs == None:
self._expand()
return self._childs
def _expand(self):
self._childs = []
actions = self.state.getAvaibleActions()
for action in actions:
newNode = Node(self.state.mutate(action),
self.universe, self, action)
self._childs.append(self.universe.merge(newNode))
def getStrongFor(self, player):
if self._strongs[player] != None:
return self._strongs[player]
else:
return self.getScoreFor(player)
def _pullStrong(self): # Currently Expecti-Max
strongs = [None]*self.playersNum
for p in range(self.playersNum):
cp = self.state.curPlayer
if cp == p: # P owns the turn; controlls outcome
best = inf
for c in self.childs:
if c.getStrongFor(p) < best:
best = c.getStrongFor(p)
strongs[p] = best
else:
scos = [(c.getStrongFor(p), c.getStrongFor(cp))
for c in self.childs]
scos.sort(key=lambda x: x[1])
betterHalf = scos[:max(3, int(len(scos)/3))]
myScores = [bh[0]**2 for bh in betterHalf]
strongs[p] = sqrt(myScores[0]*0.75 +
sum(myScores)/(len(myScores)*4))
update = False
for s in range(self.playersNum):
if strongs[s] != self._strongs[s]:
update = True
break
self._strongs = strongs
if update:
if self.parent != None:
cascade = self.parent._pullStrong()
else:
cascade = 2
self._cascadeMemory = self._cascadeMemory/2 + cascade
return cascade + 1
self._cascadeMemory /= 2
return 0
def forceStrong(self, depth=3):
if depth == 0:
self.strongDecay()
else:
if len(self.childs):
for c in self.childs:
c.forceStrong(depth-1)
else:
self.strongDecay()
def decayEvent(self):
for c in self.childs:
c.strongDecay()
def strongDecay(self):
if self._strongs == [None]*self.playersNum:
if not self.scoresAvaible():
self._calcScores()
self._strongs = self._scores
if self.parent:
return self.parent._pullStrong()
return 1
return None
def getSelfScore(self):
return self.getScoreFor(self.curPlayer)
def getScoreFor(self, player):
if self._scores[player] == None:
self._calcScore(player)
return self._scores[player]
def scoreAvaible(self, player):
return self._scores[player] != None
def scoresAvaible(self):
for p in self._scores:
if p == None:
return False
return True
def strongScoresAvaible(self):
for p in self._strongs:
if p == None:
return False
return True
def askUserForAction(self):
return self.state.askUserForAction(self.avaibleActions)
def _calcScores(self):
for p in range(self.state.playersNum):
self._calcScore(p)
def _calcScore(self, player):
winner = self._getWinner()
if winner != None:
if winner == player:
self._scores[player] = 0.0
elif winner == -1:
self._scores[player] = 2/3
else:
self._scores[player] = 1.0
return
if self.universe.scoreProvider == 'naive':
self._scores[player] = self.state.getScoreFor(player)
elif self.universe.scoreProvider == 'neural':
self._scores[player] = self.state.getScoreNeural(
self.universe.model, player)
else:
raise Exception('Uknown Score-Provider')
def getPriority(self):
return self.state.getPriority(self.getSelfScore(), self._cascadeMemory)
@property
def playersNum(self):
return self.state.playersNum
@property
def avaibleActions(self):
r = []
for c in self.childs:
r.append(c.lastAction)
return r
@property
def curPlayer(self):
return self.state.curPlayer
def _getWinner(self):
return self.state.checkWin()
def getWinner(self):
if len(self.childs) == 0:
return -1
return self._getWinner()
def _activateEdge(self, dist=0):
if not self.strongScoresAvaible():
self.universe.newOpen(self)
else:
for c in self.childs:
if c._cascadeMemory > 0.001*(dist-2) or random.random() < 0.01:
c._activateEdge(dist=dist+1)
def __str__(self):
s = []
if self.lastAction == None:
s.append("[ {ROOT} ]")
else:
s.append("[ -> "+str(self.lastAction)+" ]")
s.append("[ turn: "+str(self.state.curPlayer)+" ]")
s.append(str(self.state))
s.append("[ score: "+str(self.getScoreFor(0))+" ]")
return '\n'.join(s)
def choose(txt, options):
while True:
print('[*] '+txt)
for num, opt in enumerate(options):
print('['+str(num+1)+'] ' + str(opt))
inp = input('[> ')
try:
n = int(inp)
if n in range(1, len(options)+1):
return options[n-1]
except:
pass
for opt in options:
if inp == str(opt):
return opt
if len(inp) == 1:
for opt in options:
if inp == str(opt)[0]:
return opt
print('[!] Invalid Input.')
class Worker():
def __init__(self, universe):
self.universe = universe
self._alive = True
def run(self):
import threading
self.thread = threading.Thread(target=self.runLocal)
self.thread.start()
def runLocal(self):
for i, node in enumerate(self.universe.iter()):
if node == None:
time.sleep(1)
if not self._alive:
return
node.decayEvent()
def kill(self):
self._alive = False
self.thread.join(15)
def revive(self):
self._alive = True
class Runtime():
def __init__(self, initState):
universe = QueueingUniverse()
self.head = Node(initState, universe=universe)
_ = self.head.childs
universe.newOpen(self.head)
def spawnWorker(self):
self.worker = Worker(self.head.universe)
self.worker.run()
def killWorker(self):
self.worker.kill()
def performAction(self, action):
for c in self.head.childs:
if action == c.lastAction:
self.head.universe.clearPQ()
self.head.kill()
self.head = c
self.head.universe.activateEdge(self.head)
return
raise Exception('No such action avaible...')
def turn(self, bot=None, calcDepth=3, bg=True):
print(str(self.head))
if bot == None:
c = choose('Select action?', ['human', 'bot', 'undo', 'qlen'])
if c == 'undo':
self.head = self.head.parent
return
elif c == 'qlen':
print(self.head.universe.pq.qsize())
return
bot = c == 'bot'
if bot:
self.head.forceStrong(calcDepth)
opts = []
for c in self.head.childs:
opts.append((c, c.getStrongFor(self.head.curPlayer)))
opts.sort(key=lambda x: x[1])
print('[i] Evaluated Options:')
for o in opts:
#print('['+str(o[0])+']' + str(o[0].lastAction) + " (Score: "+str(o[1])+")")
print('[ ]' + str(o[0].lastAction) + " (Score: "+str(o[1])+")")
print('[#] I choose to play: ' + str(opts[0][0].lastAction))
self.performAction(opts[0][0].lastAction)
else:
action = self.head.askUserForAction()
self.performAction(action)
def game(self, bots=None, calcDepth=7, bg=True):
if bg:
self.spawnWorker()
if bots == None:
bots = [None]*self.head.playersNum
while self.head.getWinner() == None:
self.turn(bots[self.head.curPlayer], calcDepth, bg=True)
print(['O', 'X', 'No one'][self.head.getWinner()] + ' won!')
if bg:
self.killWorker()
def saveModel(self, model, gen):
dat = model.state_dict()
with open(self.getModelFileName(), 'wb') as f:
pickle.dump((gen, dat), f)
def loadModelState(self, model):
with open(self.getModelFileName(), 'rb') as f:
gen, dat = pickle.load(f)
model.load_state_dict(dat)
model.eval()
return gen
def loadModel(self):
model = self.head.state.getModel()
gen = self.loadModelState(model)
return model, gen
def getModelFileName(self):
return 'brains/uttt.vac'
def saveToMemoryBank(self, term):
return
with open('memoryBank/uttt/'+datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')+'_'+str(int(random.random()*99999))+'.vdm', 'wb') as f:
pickle.dump(term, f)
class NeuralRuntime(Runtime):
def __init__(self, initState):
super().__init__(initState)
model, gen = self.loadModel()
self.head.universe.model = model
self.head.universe.scoreProvider = 'neural'
class Trainer(Runtime):
def __init__(self, initState):
super().__init__(initState)
#self.universe = Universe()
self.universe = self.head.universe
self.rootNode = self.head
self.terminal = None
def buildDatasetFromModel(self, model, depth=4, refining=True, fanOut=[5, 5, 5, 5, 4, 4, 4, 4], uncertainSec=15, exacity=5):
print('[*] Building Timeline')
term = self.linearPlay(model, calcDepth=depth, exacity=exacity)
if refining:
print('[*] Refining Timeline (exploring alternative endings)')
cur = term
for d in fanOut:
cur = cur.parent
cur.forceStrong(d)
print('.', end='', flush=True)
print('')
print('[*] Refining Timeline (exploring uncertain regions)')
self.timelineExpandUncertain(term, uncertainSec)
return term
def linearPlay(self, model, calcDepth=7, exacity=5, verbose=False, firstNRandom=2):
head = self.rootNode
self.universe.model = model
self.spawnWorker()
while head.getWinner() == None:
if verbose:
print(head)
else:
print('.', end='', flush=True)
head.forceStrong(calcDepth)
opts = []
if len(head.childs) == 0:
break
for c in head.childs:
opts.append((c, c.getStrongFor(head.curPlayer)))
if firstNRandom:
firstNRandom -= 1
ind = int(random.random()*len(opts))
else:
opts.sort(key=lambda x: x[1])
if exacity >= 10:
ind = 0
else:
ind = int(pow(random.random(), exacity)*(len(opts)-1))
head = opts[ind][0]
self.killWorker()
if verbose:
print(head)
print(' => '+['O', 'X', 'No one'][head.getWinner()] + ' won!')
return head
def timelineIterSingle(self, term):
for i in self.timelineIter(self, [term]):
yield i
def timelineIter(self, terms, altChildPerNode=-1):
batch = len(terms)
heads = terms
while True:
empty = True
for b in range(batch):
head = heads[b]
if head == None:
continue
empty = False
yield head
if len(head.childs):
if altChildPerNode == -1: # all
for child in head.childs:
yield child
else:
for j in range(min(altChildPerNode, int(len(head.childs)/2))):
yield random.choice(head.childs)
if head.parent == None:
head = None
else:
head = head.parent
heads[b] = head
if empty:
return
def timelineExpandUncertain(self, term, secs):
self.rootNode.universe.clearPQ()
self.rootNode.universe.activateEdge(self.rootNode)
self.spawnWorker()
for s in range(secs):
time.sleep(1)
print('.', end='', flush=True)
self.rootNode.universe.clearPQ()
self.killWorker()
print('')
def trainModel(self, model, lr=0.000001, cut=0.01, calcDepth=4, exacity=5, terms=None, batch=16):
loss_func = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr)
if terms == None:
terms = []
for i in range(batch):
terms.append(self.buildDatasetFromModel(
model, depth=calcDepth, exacity=exacity))
print('[*] Conditioning Brain')
for r in range(64):
loss_sum = 0
lLoss = 0
zeroLen = 0
for i, node in enumerate(self.timelineIter(terms)):
for p in range(self.rootNode.playersNum):
inp = node.state.getTensor(player=p)
gol = torch.tensor(
[node.getStrongFor(p)], dtype=torch.float)
out = model(inp)
loss = loss_func(out, gol)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_sum += loss.item()
if loss.item() == 0.0:
zeroLen += 1
if zeroLen == 5:
break
print(loss_sum/i)
if r > 16 and (loss_sum/i < cut or lLoss == loss_sum):
return loss_sum
lLoss = loss_sum
return loss_sum
def main(self, model=None, gens=1024, startGen=0):
newModel = False
if model == None:
print('[!] No brain found. Creating new one...')
newModel = True
model = self.rootNode.state.getModel()
self.universe.scoreProvider = ['neural', 'naive'][newModel]
model.train()
for gen in range(startGen, startGen+gens):
print('[#####] Gen '+str(gen)+' training:')
loss = self.trainModel(model, calcDepth=min(
4, 3+int(gen/16)), exacity=int(gen/3+1), batch=4)
print('[L] '+str(loss))
self.universe.scoreProvider = 'neural'
self.saveModel(model, gen)
def trainFromTerm(self, term):
model, gen = self.loadModel()
self.universe.scoreProvider = 'neural'
self.trainModel(model, calcDepth=4, exacity=10, term=term)
self.saveModel(model)
def train(self):
if os.path.exists(self.getModelFileName()):
model, gen = self.loadModel()
self.main(model, startGen=gen+1)
else:
self.main()

4
vacuumDecay/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from vacuumDecay.runtime import Runtime, NeuralRuntime, Trainer
from vacuumDecay.base import Node, Action, Universe, QueueingUniverse
from vacuumDecay.utils import choose
from vacuumDecay.run import main

162
vacuumDecay/base.py Normal file
View File

@ -0,0 +1,162 @@
import torch
from abc import ABC, abstractmethod
from queue import PriorityQueue, Empty
from dataclasses import dataclass, field
from typing import Any
from vacuumDecay.utils import choose
class Action():
# Should hold the data representing an action
# Actions are applied to a State in State.mutate
def __init__(self, player, data):
self.player = player
self.data = data
def __eq__(self, other):
# This should be implemented differently
# Two actions of different generations will never be compared
if type(other) != type(self):
return False
return str(self.data) == str(other.data)
def __str__(self):
# should return visual representation of this action
# should start with < and end with >
return "<P"+str(self.player)+"-"+str(self.data)+">"
def getImage(self, state):
# Should return an image representation of this action given the current state
# Return None if not implemented
return None
class State(ABC):
# Hold a representation of the current game-state
# Allows retriving avaible actions (getAvaibleActions) and applying them (mutate)
# Mutations return a new State and should not have any effect on the current State
# Allows checking itself for a win (checkWin) or scoring itself based on a simple heuristic (getScore)
# The calculated score should be 0 when won; higher when in a worse state; highest for loosing
# getPriority is used for prioritising certain Nodes / States when expanding / walking the tree
def __init__(self, curPlayer=0, generation=0, playersNum=2):
self.curPlayer = curPlayer
self.generation = generation
self.playersNum = playersNum
@abstractmethod
def mutate(self, action):
# Returns a new state with supplied action performed
# self should not be changed
return State(curPlayer=(self.curPlayer+1) % self.playersNum, generation=self.generation+1, playersNum=self.playersNum)
@abstractmethod
def getAvaibleActions(self):
# Should return an array of all possible actions
return []
def askUserForAction(self, actions):
return choose('What does player '+str(self.curPlayer)+' want to do?', actions)
# improveMe
def getPriority(self, score, cascadeMemory):
# Used for ordering the priority queue
# Priority should not change for the same root
# Lower prioritys get worked on first
# Higher generations should have higher priority
# Higher cascadeMemory (more influence on higher-order-scores) should have lower priority
return -cascadeMemory + 100
@abstractmethod
def checkWin(self):
# -1 -> Draw
# None -> Not ended
# n e N -> player n won
return None
# improveMe
def getScoreFor(self, player):
# 0 <= score <= 1; should return close to zero when we are winning
w = self.checkWin()
if w == None:
return 0.5
if w == player:
return 0
if w == -1:
return 0.9
return 1
@abstractmethod
def __str__(self):
# return visual rep of state
return "[#]"
@abstractmethod
def getTensor(self, player=None, phase='default'):
if player == None:
player = self.curPlayer
return torch.tensor([0])
@classmethod
def getModel(cls, phase='default'):
pass
def getScoreNeural(self, model, player=None, phase='default'):
return model(self.getTensor(player=player, phase=phase)).item()
def getImage(self):
# Should return an image representation of this state
# Return None if not implemented
return None
class Universe():
def __init__(self):
self.scoreProvider = 'naive'
def newOpen(self, node):
pass
def merge(self, node):
return node
def clearPQ(self):
pass
def iter(self):
return []
def activateEdge(self, head):
pass
@dataclass(order=True)
class PQItem:
priority: int
data: Any = field(compare=False)
class QueueingUniverse(Universe):
def __init__(self):
super().__init__()
self.pq = PriorityQueue()
def newOpen(self, node):
item = PQItem(node.getPriority(), node)
self.pq.put(item)
def merge(self, node):
self.newOpen(node)
return node
def clearPQ(self):
self.pq = PriorityQueue()
def iter(self):
while True:
try:
yield self.pq.get(False).data
except Empty:
return None
def activateEdge(self, head):
head._activateEdge()

View File

@ -1,23 +1,46 @@
from vacuumDecay import * from vacuumDecay import *
import numpy as np import numpy as np
from enum import Enum
class TTTState(State):
def __init__(self, curPlayer=0, generation=0, playersNum=2, board=None): class Face(Enum):
if type(board) == type(None): TANK = 1
board = np.array([None]*9) LASER = 2
self.curPlayer = curPlayer HUMAN = 3
COW = 4
CHICKEN = 5
@property
def num_faces(self):
return 2 if self == Face.LASER else 1
@property
def prob(self):
return self.num_faces/6
@property
def is_collectable(self):
return not self in [Face.TANK, Face.LASER]
@property
def force_pickup(self):
return self in [Face.TANK]
class MCState(State):
def __init__(self, generation=0, hand_dices_num=12, table_dices=[0]*5):
self.generation = generation self.generation = generation
self.playersNum = playersNum self.hand_dices_num = hand_dices_num
self.board = board self.table_dices = table_dices
def mutate(self, action): def mutate(self, action):
newBoard = np.copy(self.board) newBoard = np.copy(self.board)
newBoard[action.data] = self.curPlayer newBoard[action.data] = self.curPlayer
return TTTState(curPlayer=(self.curPlayer+1)%self.playersNum, playersNum=self.playersNum, board=newBoard) return MCState(curPlayer=(self.curPlayer+1) % self.playersNum, playersNum=self.playersNum, board=newBoard)
def getAvaibleActions(self): def getAvaibleActions(self):
for i in range(9): for i in range(9):
if self.board[i]==None: if self.board[i] == None:
yield Action(self.curPlayer, i) yield Action(self.curPlayer, i)
def checkWin(self): def checkWin(self):
@ -39,7 +62,8 @@ class TTTState(State):
def __str__(self): def __str__(self):
s = [] s = []
for l in range(3): for l in range(3):
s.append(" ".join([str(p) if p!=None else '.' for p in self.board[l*3:][:3]])) s.append(
" ".join([str(p) if p != None else '.' for p in self.board[l*3:][:3]]))
return "\n".join(s) return "\n".join(s)
def getTensor(self): def getTensor(self):
@ -52,9 +76,10 @@ class TTTState(State):
torch.nn.ReLu(), torch.nn.ReLu(),
torch.nn.Linear(10, 3), torch.nn.Linear(10, 3),
torch.nn.Sigmoid(), torch.nn.Sigmoid(),
torch.nn.Linear(3,1) torch.nn.Linear(3, 1)
) )
if __name__=="__main__":
run = Runtime(TTTState()) if __name__ == "__main__":
run = Runtime(MCState())
run.game() run.game()

View File

@ -0,0 +1,101 @@
import numpy as np
import torch
from PIL import Image, ImageDraw
from vacuumDecay import State, Action, Runtime, NeuralRuntime, Trainer, choose, main
class TTTAction(Action):
def __init__(self, player, data):
super().__init__(player, data)
def getImage(self, state=None):
# Should return an image representation of this action given the current state
if state is None or not isinstance(state, TTTState):
return None
img = state.getImage()
if img is not None:
draw = ImageDraw.Draw(img)
x = (self.data % 3) * 100 + 50
y = (self.data // 3) * 100 + 50
if self.player == 0:
draw.ellipse((x-40, y-40, x+40, y+40), outline='blue', width=2)
else:
draw.line((x-40, y-40, x+40, y+40), fill='red', width=2)
draw.line((x+40, y-40, x-40, y+40), fill='red', width=2)
return img
class TTTState(State):
def __init__(self, curPlayer=0, generation=0, playersNum=2, board=None):
if type(board) == type(None):
board = np.array([None]*9)
self.curPlayer = curPlayer
self.generation = generation
self.playersNum = playersNum
self.board = board
def mutate(self, action):
newBoard = np.copy(self.board)
newBoard[action.data] = self.curPlayer
return TTTState(curPlayer=(self.curPlayer+1)%self.playersNum, playersNum=self.playersNum, board=newBoard)
def getAvaibleActions(self):
for i in range(9):
if self.board[i]==None:
yield TTTAction(self.curPlayer, i)
def checkWin(self):
s = self.board
for i in range(3):
if (s[i] == s[i+3] == s[i+6] != None):
return s[i]
if (s[i*3] == s[i*3+1] == s[i*3+2] != None):
return s[i*3]
if (s[0] == s[4] == s[8] != None):
return s[0]
if (s[2] == s[4] == s[6] != None):
return s[2]
for i in range(9):
if s[i] == None:
return None
return -1
def __str__(self):
s = []
for l in range(3):
s.append(" ".join([str(p) if p!=None else '.' for p in self.board[l*3:][:3]]))
return "\n".join(s)
def getTensor(self):
return torch.tensor([self.turn] + self.board)
@classmethod
def getModel():
return torch.nn.Sequential(
torch.nn.Linear(10, 10),
torch.nn.ReLu(),
torch.nn.Linear(10, 3),
torch.nn.Sigmoid(),
torch.nn.Linear(3,1)
)
def getImage(self):
img = Image.new('RGB', (300, 300), color='white')
draw = ImageDraw.Draw(img)
for i in range(1, 3):
draw.line((0, 100*i, 300, 100*i), fill='black', width=2)
draw.line((100*i, 0, 100*i, 300), fill='black', width=2)
for i, mark in enumerate(self.board):
if mark is not None:
x = (i % 3) * 100 + 50
y = (i // 3) * 100 + 50
if mark == 0:
draw.ellipse((x-40, y-40, x+40, y+40), outline='blue', width=2)
else:
draw.line((x-40, y-40, x+40, y+40), fill='red', width=2)
draw.line((x+40, y-40, x-40, y+40), fill='red', width=2)
return img
if __name__=="__main__":
main(TTTState)

View File

@ -1,11 +1,16 @@
""" """
A lot of this code was stolen from Pulkit Maloo (https://github.com/pulkitmaloo/Ultimate-Tic-Tac-Toe) A lot of this code was stolen from Pulkit Maloo (https://github.com/pulkitmaloo/Ultimate-Tic-Tac-Toe)
""" """
import numpy as np
import torch
from troch import nn
from PIL import Image, ImageDraw
from vacuumDecay import *
from collections import Counter from collections import Counter
import itertools import itertools
from vacuumDecay import State, Action, Runtime, NeuralRuntime, Trainer, choose, main
class TTTState(State): class TTTState(State):
def __init__(self, curPlayer=0, generation=0, playersNum=2, board=None, lastMove=-1): def __init__(self, curPlayer=0, generation=0, playersNum=2, board=None, lastMove=-1):
@ -46,7 +51,7 @@ class TTTState(State):
return TTTState(curPlayer=(self.curPlayer+1) % self.playersNum, playersNum=self.playersNum, board=newBoard, lastMove=action.data) return TTTState(curPlayer=(self.curPlayer+1) % self.playersNum, playersNum=self.playersNum, board=newBoard, lastMove=action.data)
def box(self, x, y): def box(self, x, y):
return index(x, y) // 9 return self.index(x, y) // 9
def next_box(self, i): def next_box(self, i):
return i % 9 return i % 9
@ -197,43 +202,5 @@ class Model(nn.Module):
y = self.out(x) y = self.out(x)
return y return y
if __name__=="__main__":
def humanVsAi(train=True, remember=False, depth=3, bots=[0, 1], noBg=False): main(TTTState)
init = TTTState()
run = NeuralRuntime(init)
run.game(bots, depth, bg=not noBg)
if remember or train:
trainer = Trainer(init)
if remember:
trainer.saveToMemoryBank(run.head)
print('[!] Your cognitive and strategic destinctiveness was added to my own! (Game inserted into memoryBank)')
if train:
print(
"[!] Your knowledge will be assimilated!!! Please stand by.... (Updating Neuristic)")
trainer.trainFromTerm(run.head)
print('[!] I have become smart! Destroyer of human Ultimate-TicTacToe players! (Neuristic update completed)')
print('[!] This marks the beginning of the end of humankind!')
print('[i] Thanks for playing! Goodbye...')
def aiVsAiLoop():
init = TTTState()
trainer = Trainer(init)
trainer.train()
if __name__ == '__main__':
options = ['Play Against AI',
'Play Against AI (AI begins)', 'Play Against AI (Fast Play)', 'Playground', 'Let AI train']
opt = choose('?', options)
if opt == options[0]:
humanVsAi()
elif opt == options[1]:
humanVsAi(bots[1, 0])
elif opt == options[2]:
humanVsAi(depth=2, noBg=True)
elif opt == options[3]:
humanVsAi(bots=[None, None])
else:
aiVsAiLoop()

204
vacuumDecay/node.py Normal file
View File

@ -0,0 +1,204 @@
class Node:
def __init__(self, state, universe=None, parent=None, lastAction=None):
self.state = state
if universe == None:
print('[!] No Universe defined. Spawning one...')
universe = Universe()
self.universe = universe
self.parent = parent
self.lastAction = lastAction
self._childs = None
self._scores = [None]*self.state.playersNum
self._strongs = [None]*self.state.playersNum
self._alive = True
self._cascadeMemory = 0 # Used for our alternative to alpha-beta pruning
self.last_updated = time.time() # New attribute
def update(self):
self.last_updated = time.time()
if hasattr(self.universe, 'visualizer'):
self.universe.visualizer.send_update()
def kill(self):
self._alive = False
def revive(self):
self._alive = True
@property
def childs(self):
if self._childs == None:
self._expand()
return self._childs
def _expand(self):
self._childs = []
actions = self.state.getAvaibleActions()
for action in actions:
newNode = Node(self.state.mutate(action),
self.universe, self, action)
self._childs.append(self.universe.merge(newNode))
self.update()
def getStrongFor(self, player):
if self._strongs[player] != None:
return self._strongs[player]
else:
return self.getScoreFor(player)
def _pullStrong(self):
strongs = [None]*self.playersNum
for p in range(self.playersNum):
cp = self.state.curPlayer
if cp == p:
best = float('inf')
for c in self.childs:
if c.getStrongFor(p) < best:
best = c.getStrongFor(p)
strongs[p] = best
else:
scos = [(c.getStrongFor(p), c.getStrongFor(cp)) for c in self.childs]
scos.sort(key=lambda x: x[1])
betterHalf = scos[:max(3, int(len(scos)/3))]
myScores = [bh[0]**2 for bh in betterHalf]
strongs[p] = sqrt(myScores[0]*0.75 + sum(myScores)/(len(myScores)*4))
update = False
for s in range(self.playersNum):
if strongs[s] != self._strongs[s]:
update = True
break
self._strongs = strongs
if update:
if self.parent != None:
cascade = self.parent._pullStrong()
else:
cascade = 2
self._cascadeMemory = self._cascadeMemory/2 + cascade
self.update()
return cascade + 1
self._cascadeMemory /= 2
return 0
def forceStrong(self, depth=3):
if depth == 0:
self.strongDecay()
else:
if len(self.childs):
for c in self.childs:
c.forceStrong(depth-1)
else:
self.strongDecay()
self.update()
def decayEvent(self):
for c in self.childs:
c.strongDecay()
self.update()
def strongDecay(self):
if self._strongs == [None]*self.playersNum:
if not self.scoresAvaible():
self._calcScores()
self._strongs = self._scores
if self.parent:
return self.parent._pullStrong()
return 1
return None
def getSelfScore(self):
return self.getScoreFor(self.curPlayer)
def getScoreFor(self, player):
if self._scores[player] == None:
self._calcScore(player)
self.update()
return self._scores[player]
def scoreAvaible(self, player):
return self._scores[player] != None
def scoresAvaible(self):
for p in self._scores:
if p == None:
return False
return True
def strongScoresAvaible(self):
for p in self._strongs:
if p == None:
return False
return True
def askUserForAction(self):
return self.state.askUserForAction(self.avaibleActions)
def _calcScores(self):
for p in range(self.state.playersNum):
self._calcScore(p)
def _calcScore(self, player):
winner = self._getWinner()
if winner != None:
if winner == player:
self._scores[player] = 0.0
elif winner == -1:
self._scores[player] = 2/3
else:
self._scores[player] = 1.0
self.update()
return
if self.universe.scoreProvider == 'naive':
self._scores[player] = self.state.getScoreFor(player)
elif self.universe.scoreProvider == 'neural':
self._scores[player] = self.state.getScoreNeural(self.universe.model, player)
else:
raise Exception('Unknown Score-Provider')
self.update()
def getPriority(self):
return self.state.getPriority(self.getSelfScore(), self._cascadeMemory)
@property
def playersNum(self):
return self.state.playersNum
@property
def avaibleActions(self):
r = []
for c in self.childs:
r.append(c.lastAction)
return r
@property
def curPlayer(self):
return self.state.curPlayer
def _getWinner(self):
return self.state.checkWin()
def getWinner(self):
if len(self.childs) == 0:
return -1
return self._getWinner()
def _activateEdge(self, dist=0):
if not self.strongScoresAvaible():
self.universe.newOpen(self)
else:
for c in self.childs:
if c._cascadeMemory > 0.001*(dist-2) or random.random() < 0.01:
c._activateEdge(dist=dist+1)
self.update()
def __str__(self):
s = []
if self.lastAction == None:
s.append("[ {ROOT} ]")
else:
s.append("[ -> "+str(self.lastAction)+" ]")
s.append("[ turn: "+str(self.state.curPlayer)+" ]")
s.append(str(self.state))
s.append("[ score: "+str(self.getScoreFor(0))+" ]")
return '\n'.join(s)

47
vacuumDecay/run.py Normal file
View File

@ -0,0 +1,47 @@
from vacuumDecay.runtime import NeuralRuntime, Runtime, Trainer
from vacuumDecay.utils import choose
def humanVsAi(StateClass, train=True, remember=False, depth=3, bots=[0, 1], noBg=False, start_visualizer=False):
init = StateClass()
run = NeuralRuntime(init, start_visualizer=start_visualizer)
run.game(bots, depth, bg=not noBg)
if remember or train:
trainer = Trainer(init)
if remember:
trainer.saveToMemoryBank(run.head)
print('[!] Your cognitive and strategic distinctiveness was added to my own! (Game inserted into memoryBank)')
if train:
print("[!] Your knowledge will be assimilated!!! Please stand by.... (Updating Neuristic)")
trainer.trainFromTerm(run.head)
print('[!] I have become smart! Destroyer of human Ultimate-TicTacToe players! (Neuristic update completed)')
print('[!] This marks the beginning of the end of humankind!')
print('[i] Thanks for playing! Goodbye...')
def aiVsAiLoop(StateClass, start_visualizer=False):
init = StateClass()
trainer = Trainer(init, start_visualizer=start_visualizer)
trainer.train()
def humanVsNaive(StateClass, start_visualizer=False):
run = Runtime(StateClass(), start_visualizer=start_visualizer)
run.game()
def main(StateClass):
options = ['Play Against AI',
'Play Against AI (AI begins)', 'Play Against AI (Fast Play)', 'Playground', 'Let AI train', 'Play against Naive']
opt = choose('?', options)
if opt == options[0]:
humanVsAi(StateClass)
elif opt == options[1]:
humanVsAi(StateClass, bots=[1, 0])
elif opt == options[2]:
humanVsAi(StateClass, depth=2, noBg=True)
elif opt == options[3]:
humanVsAi(StateClass, bots=[None, None])
elif opt == options[4]:
aiVsAiLoop(StateClass)
elif opt == options[5]:
humanVsNaive(StateClass)
else:
aiVsAiLoop(StateClass)

300
vacuumDecay/runtime.py Normal file
View File

@ -0,0 +1,300 @@
import os
import time
import datetime
import pickle
import torch
import torch.nn as nn
from torch import optim
from math import pow
import random
import datetime
import pickle
from vacuumDecay.base import QueueingUniverse, Node
from vacuumDecay.utils import choose
from vacuumDecay.visualizer import Visualizer
class Worker():
def __init__(self, universe):
self.universe = universe
self._alive = True
def run(self):
import threading
self.thread = threading.Thread(target=self.runLocal)
self.thread.start()
def runLocal(self):
for i, node in enumerate(self.universe.iter()):
if node == None:
time.sleep(1)
if not self._alive:
return
node.decayEvent()
def kill(self):
self._alive = False
self.thread.join(15)
def revive(self):
self._alive = True
class Runtime():
def __init__(self, initState, start_visualizer=False):
universe = QueueingUniverse()
self.head = Node(initState, universe=universe)
_ = self.head.childs
universe.newOpen(self.head)
self.visualizer = None
if start_visualizer:
self.startVisualizer()
def startVisualizer(self):
self.visualizer = Visualizer(self.head.universe)
self.visualizer.start()
def spawnWorker(self):
self.worker = Worker(self.head.universe)
self.worker.run()
def killWorker(self):
self.worker.kill()
def performAction(self, action):
for c in self.head.childs:
if action == c.lastAction:
self.head.universe.clearPQ()
self.head.kill()
self.head = c
self.head.universe.activateEdge(self.head)
return
raise Exception('No such action avaible...')
def turn(self, bot=None, calcDepth=3, bg=True):
print(str(self.head))
if bot == None:
c = choose('Select action?', ['human', 'bot', 'undo', 'qlen'])
if c == 'undo':
self.head = self.head.parent
return
elif c == 'qlen':
print(self.head.universe.pq.qsize())
return
bot = c == 'bot'
if bot:
self.head.forceStrong(calcDepth)
opts = []
for c in self.head.childs:
opts.append((c, c.getStrongFor(self.head.curPlayer)))
opts.sort(key=lambda x: x[1])
print('[i] Evaluated Options:')
for o in opts:
print('[ ]' + str(o[0].lastAction) + " (Score: "+str(o[1])+")")
print('[#] I choose to play: ' + str(opts[0][0].lastAction))
self.performAction(opts[0][0].lastAction)
else:
action = self.head.askUserForAction()
self.performAction(action)
def game(self, bots=None, calcDepth=7, bg=True):
if bg:
self.spawnWorker()
if bots == None:
bots = [None]*self.head.playersNum
while self.head.getWinner() == None:
self.turn(bots[self.head.curPlayer], calcDepth, bg=True)
print(['O', 'X', 'No one'][self.head.getWinner()] + ' won!')
if bg:
self.killWorker()
def saveModel(self, model, gen):
dat = model.state_dict()
with open(self.getModelFileName(), 'wb') as f:
pickle.dump((gen, dat), f)
def loadModelState(self, model):
with open(self.getModelFileName(), 'rb') as f:
gen, dat = pickle.load(f)
model.load_state_dict(dat)
model.eval()
return gen
def loadModel(self):
model = self.head.state.getModel()
gen = self.loadModelState(model)
return model, gen
def getModelFileName(self):
return 'brains/uttt.vac'
def saveToMemoryBank(self, term):
with open('memoryBank/uttt/'+datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')+'_'+str(int(random.random()*99999))+'.vdm', 'wb') as f:
pickle.dump(term, f)
class NeuralRuntime(Runtime):
def __init__(self, initState, **kwargs):
super().__init__(initState, **kwargs)
model, gen = self.loadModel()
self.head.universe.model = model
self.head.universe.scoreProvider = 'neural'
class Trainer(Runtime):
def __init__(self, initState, **kwargs):
super().__init__(initState, **kwargs)
#self.universe = Universe()
self.universe = self.head.universe
self.rootNode = self.head
self.terminal = None
def buildDatasetFromModel(self, model, depth=4, refining=True, fanOut=[5, 5, 5, 5, 4, 4, 4, 4], uncertainSec=15, exacity=5):
print('[*] Building Timeline')
term = self.linearPlay(model, calcDepth=depth, exacity=exacity)
if refining:
print('[*] Refining Timeline (exploring alternative endings)')
cur = term
for d in fanOut:
cur = cur.parent
cur.forceStrong(d)
print('.', end='', flush=True)
print('')
print('[*] Refining Timeline (exploring uncertain regions)')
self.timelineExpandUncertain(term, uncertainSec)
return term
def linearPlay(self, model, calcDepth=7, exacity=5, verbose=False, firstNRandom=2):
head = self.rootNode
self.universe.model = model
self.spawnWorker()
while head.getWinner() == None:
if verbose:
print(head)
else:
print('.', end='', flush=True)
head.forceStrong(calcDepth)
opts = []
if len(head.childs) == 0:
break
for c in head.childs:
opts.append((c, c.getStrongFor(head.curPlayer)))
if firstNRandom:
firstNRandom -= 1
ind = int(random.random()*len(opts))
else:
opts.sort(key=lambda x: x[1])
if exacity >= 10:
ind = 0
else:
ind = int(pow(random.random(), exacity)*(len(opts)-1))
head = opts[ind][0]
self.killWorker()
if verbose:
print(head)
print(' => '+['O', 'X', 'No one'][head.getWinner()] + ' won!')
return head
def timelineIterSingle(self, term):
for i in self.timelineIter(self, [term]):
yield i
def timelineIter(self, terms, altChildPerNode=-1):
batch = len(terms)
heads = terms
while True:
empty = True
for b in range(batch):
head = heads[b]
if head == None:
continue
empty = False
yield head
if len(head.childs):
if altChildPerNode == -1: # all
for child in head.childs:
yield child
else:
for j in range(min(altChildPerNode, int(len(head.childs)/2))):
yield random.choice(head.childs)
if head.parent == None:
head = None
else:
head = head.parent
heads[b] = head
if empty:
return
def timelineExpandUncertain(self, term, secs):
self.rootNode.universe.clearPQ()
self.rootNode.universe.activateEdge(self.rootNode)
self.spawnWorker()
for s in range(secs):
time.sleep(1)
print('.', end='', flush=True)
self.rootNode.universe.clearPQ()
self.killWorker()
print('')
def trainModel(self, model, lr=0.00005, cut=0.01, calcDepth=4, exacity=5, terms=None, batch=16):
loss_func = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr)
if terms == None:
terms = []
for i in range(batch):
terms.append(self.buildDatasetFromModel(
model, depth=calcDepth, exacity=exacity))
print('[*] Conditioning Brain')
for r in range(64):
loss_sum = 0
lLoss = 0
zeroLen = 0
for i, node in enumerate(self.timelineIter(terms)):
for p in range(self.rootNode.playersNum):
inp = node.state.getTensor(player=p)
gol = torch.tensor(
[node.getStrongFor(p)], dtype=torch.float)
out = model(inp)
loss = loss_func(out, gol)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_sum += loss.item()
if loss.item() == 0.0:
zeroLen += 1
if zeroLen == 5:
break
print(loss_sum/i)
if r > 16 and (loss_sum/i < cut or lLoss == loss_sum):
return loss_sum
lLoss = loss_sum
return loss_sum
def main(self, model=None, gens=1024, startGen=0):
newModel = False
if model == None:
print('[!] No brain found. Creating new one...')
newModel = True
model = self.rootNode.state.getModel()
self.universe.scoreProvider = ['neural', 'naive'][newModel]
model.train()
for gen in range(startGen, startGen+gens):
print('[#####] Gen '+str(gen)+' training:')
loss = self.trainModel(model, calcDepth=min(
4, 3+int(gen/16)), exacity=int(gen/3+1), batch=4)
print('[L] '+str(loss))
self.universe.scoreProvider = 'neural'
self.saveModel(model, gen)
def trainFromTerm(self, term):
model, gen = self.loadModel()
self.universe.scoreProvider = 'neural'
self.trainModel(model, calcDepth=4, exacity=10, term=term)
self.saveModel(model)
def train(self):
if os.path.exists(self.getModelFileName()):
model, gen = self.loadModel()
self.main(model, startGen=gen+1)
else:
self.main()

21
vacuumDecay/utils.py Normal file
View File

@ -0,0 +1,21 @@
def choose(txt, options):
while True:
print('[*] '+txt)
for num, opt in enumerate(options):
print('['+str(num+1)+'] ' + str(opt))
inp = input('[> ')
try:
n = int(inp)
if n in range(1, len(options)+1):
return options[n-1]
except:
pass
for opt in options:
if inp == str(opt):
return opt
if len(inp) == 1:
for opt in options:
if inp == str(opt)[0]:
return opt
print('[!] Invalid Input.')

58
vacuumDecay/visualizer.py Normal file
View File

@ -0,0 +1,58 @@
import threading
import time
import networkx as nx
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
class Visualizer:
def __init__(self, universe):
self.universe = universe
self.graph = nx.DiGraph()
self.app = Flask(__name__)
self.socketio = SocketIO(self.app)
self.init_flask()
def init_flask(self):
@self.app.route('/')
def index():
return render_template('index.html')
@self.app.route('/data')
def data():
nodes_data = []
edges_data = []
for node in self.universe.iter():
nodes_data.append({
'id': id(node),
'image': node.state.getImage().tobytes() if node.state.getImage() else None,
'value': node.getScoreFor(node.state.curPlayer),
'last_updated': node.last_updated
})
for child in node.childs:
edges_data.append({'source': id(node), 'target': id(child)})
return jsonify(nodes=nodes_data, edges=edges_data)
@self.socketio.on('connect')
def handle_connect():
print('Client connected')
def send_update(self):
nodes_data = []
edges_data = []
for node in self.universe.iter():
nodes_data.append({
'id': id(node),
'image': node.state.getImage().tobytes() if node.state.getImage() else None,
'value': node.getScoreFor(node.state.curPlayer),
'last_updated': node.last_updated
})
for child in node.childs:
edges_data.append({'source': id(node), 'target': id(child)})
self.socketio.emit('update', {'nodes': nodes_data, 'edges': edges_data})
def run(self):
self.socketio.run(self.app, debug=True, use_reloader=False)
def start(self):
self.thread = threading.Thread(target=self.run)
self.thread.start()