diff --git a/README.md b/README.md index 7550d32..78b8a1a 100644 --- a/README.md +++ b/README.md @@ -2,24 +2,27 @@ Project vacuumDecay is a framework for building AIs for games. Avaible architectures are - - those used in Deep Blue (mini-max / expecti-max) - - advanced expecti-max exploration based on utility heuristics - - those used in AlphaGo Zero (knowledge distilation using neural-networks) + +- those used in Deep Blue (mini-max / expecti-max) +- advanced expecti-max exploration based on utility heuristics +- those used in AlphaGo Zero (knowledge distilation using neural-networks) A new AI is created by subclassing the State-class and defining the following functionality (mycelia.py provies a template): - - initialization (generating the gameboard or similar) - - getting avaible actions for the current situation (returns an Action-object, which can be subclassed to add additional functionality) - - applying an action (the state itself should be immutable, a new state should be returned) - - checking for a winning-condition (should return None if game has not yet ended) - - (optional) a getter for a string-representation of the current state - - (optional) a heuristic for the winning-condition (greatly improves capability) - - (optional) a getter for a tensor that describes the current game state (required for knowledge distilation) - - (optional) interface to allow a human to select an action + +- initialization (generating the gameboard or similar) +- getting avaible actions for the current situation (returns an Action-object, which can be subclassed to add additional functionality) +- applying an action (the state itself should be immutable, a new state should be returned) +- checking for a winning-condition (should return None if game has not yet ended) +- (optional) a getter for a string-representation of the current state +- (optional) a heuristic for the winning-condition (greatly improves capability for expecti-max) +- (optional) a getter for a tensor that describes the current game state (required for knowledge distilation) +- (optional) interface to allow a human to select an action ### Current state of the project + The only thing that currently works is the AI for Ultimate TicTacToe. It uses a trained neural heuristic (neuristic) -You can train it or play against it (will also train it) using 'python ultimatetictactoe.py' - +You can train it or play against it (will also train it) using 'python ultimatetictactoe.py' + The performance of the trained neuristic is pretty bad. I have some ideas on what could be the problems but no time to implement fixes. (Focus on the ending of games at the beginning of training; more consistent exploration-depth for expanding while training; ...) diff --git a/pyproject.toml b/pyproject.toml index 1334655..3bd5569 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ name = "vacuumDecay" version = "0.1.0" dependencies = [ "torch", + "numpy", "flask", "flask-socketio", "networkx", diff --git a/vacuumDecay/__init__.py b/vacuumDecay/__init__.py index d9b69f1..076fa7d 100644 --- a/vacuumDecay/__init__.py +++ b/vacuumDecay/__init__.py @@ -1,4 +1,4 @@ from vacuumDecay.runtime import Runtime, NeuralRuntime, Trainer -from vacuumDecay.base import Node, Action, Universe, QueueingUniverse +from vacuumDecay.base import Node, State, Action, Universe, QueueingUniverse from vacuumDecay.utils import choose from vacuumDecay.run import main \ No newline at end of file diff --git a/vacuumDecay/base.py b/vacuumDecay/base.py index 01fe289..cc03509 100644 --- a/vacuumDecay/base.py +++ b/vacuumDecay/base.py @@ -1,19 +1,24 @@ import torch +import time +import random +from math import sqrt from abc import ABC, abstractmethod from queue import PriorityQueue, Empty from dataclasses import dataclass, field from typing import Any +from torch import nn +import torch.nn.functional as F from vacuumDecay.utils import choose class Action(): # Should hold the data representing an action # Actions are applied to a State in State.mutate - def __init__(self, player, data): self.player = player self.data = data + # ImproveMe def __eq__(self, other): # This should be implemented differently # Two actions of different generations will never be compared @@ -21,23 +26,33 @@ class Action(): return False return str(self.data) == str(other.data) + # ImproveMe def __str__(self): # should return visual representation of this action # should start with < and end with > return "" + # ImproveMe def getImage(self, state): # Should return an image representation of this action given the current state # Return None if not implemented return None + # ImproveMe + def getTensor(self, state, player=None): + # Should return a complete description of the action (including previous state) + # This default will work, but may be suboptimal... + return (state.getTensor(), state.mutate(self).getTensor()) + class State(ABC): # Hold a representation of the current game-state # Allows retriving avaible actions (getAvaibleActions) and applying them (mutate) # Mutations return a new State and should not have any effect on the current State # Allows checking itself for a win (checkWin) or scoring itself based on a simple heuristic (getScore) # The calculated score should be 0 when won; higher when in a worse state; highest for loosing - # getPriority is used for prioritising certain Nodes / States when expanding / walking the tree + # getPriority is used for prioritising certain Nodes / States when expanding / walking the tree (TODO: Remove) + + # Abstract Methodas need to be overrieden, improveMe methods can be overrieden def __init__(self, curPlayer=0, generation=0, playersNum=2): self.curPlayer = curPlayer @@ -81,10 +96,10 @@ class State(ABC): if w == None: return 0.5 if w == player: - return 0 + return 1 if w == -1: - return 0.9 - return 1 + return 0.1 + return 0 @abstractmethod def __str__(self): @@ -92,23 +107,40 @@ class State(ABC): return "[#]" @abstractmethod - def getTensor(self, player=None, phase='default'): + def getTensor(self, player=None): if player == None: player = self.curPlayer return torch.tensor([0]) @classmethod - def getModel(cls, phase='default'): + def getVModel(cls): + # input will be output from state.getTensor pass - def getScoreNeural(self, model, player=None, phase='default'): - return model(self.getTensor(player=player, phase=phase)).item() + #improveMe + def getQModel(cls): + # input will be output from action.getTensor + return DefaultQ(cls.getVModel()) + def getScoreNeural(self, model, player=None): + return model(self.getTensor(player=player)).item() + + # improveMe def getImage(self): # Should return an image representation of this state # Return None if not implemented return None +class DefaultQ(nn.Module): + def __init__(self, vModel): + super().__init__() + self.V = vModel + + def forward(self, inp): + s, s_prime = inp + v, v_prime = self.V(s), self.V(s_prime) + return F.sigmoid(v_prime - v) + class Universe(): def __init__(self): self.scoreProvider = 'naive' @@ -160,3 +192,208 @@ class QueueingUniverse(Universe): def activateEdge(self, head): head._activateEdge() + +class Node: + def __init__(self, state, universe=None, parent=None, lastAction=None): + self.state = state + if universe == None: + print('[!] No Universe defined. Spawning one...') + universe = Universe() + self.universe = universe + self.parent = parent + self.lastAction = lastAction + + self._childs = None + self._scores = [None]*self.state.playersNum + self._strongs = [None]*self.state.playersNum + self._alive = True + self._cascadeMemory = 0 # Used for our alternative to alpha-beta pruning + self._winner = -2 + + self.leaf = True + self.last_updated = time.time() # New attribute + + def mark_update(self): + self.last_updated = time.time() + + def kill(self): + self._alive = False + + def revive(self): + self._alive = True + + @property + def childs(self): + if self._childs == None: + self._expand() + return self._childs + + def _expand(self): + self.leaf = False + self._childs = [] + actions = self.state.getAvaibleActions() + for action in actions: + newNode = Node(self.state.mutate(action), + self.universe, self, action) + self._childs.append(self.universe.merge(newNode)) + self.mark_update() + + def getStrongFor(self, player): + if self._strongs[player] != None: + return self._strongs[player] + else: + return self.getScoreFor(player) + + def _pullStrong(self): + strongs = [None]*self.playersNum + has_winner = self.getWinner() != None + for p in range(self.playersNum): + cp = self.state.curPlayer + if has_winner: + strongs[p] = self.getScoreFor(p) + elif cp == p: + best = float('-inf') + for c in self.childs: + if c.getStrongFor(p) > best: + best = c.getStrongFor(p) + strongs[p] = best + else: + scos = [(c.getStrongFor(p), c.getStrongFor(cp)) for c in self.childs] + scos.sort(key=lambda x: x[1], reverse=True) + betterHalf = [sco for sco, osc in scos[:max(3, int(len(scos)/2))]] + strongs[p] = betterHalf[0]*0.9 + sum(betterHalf)/(len(betterHalf))*0.1 + update = False + for s in range(self.playersNum): + if strongs[s] != self._strongs[s]: + update = True + break + self._strongs = strongs + if update: + if self.parent != None: + cascade = self.parent._pullStrong() + else: + cascade = 2 + self._cascadeMemory = self._cascadeMemory/2 + cascade + self.mark_update() + return cascade + 1 + self._cascadeMemory /= 2 + return 0 + + def forceStrong(self, depth=3): + if depth == 0: + self.strongDecay() + else: + if len(self.childs): + for c in self.childs: + c.forceStrong(depth-1) + else: + self.strongDecay() + + def decayEvent(self): + for c in self.childs: + c.strongDecay() + + def strongDecay(self): + if self._strongs == [None]*self.playersNum: + if not self.scoresAvaible(): + self._calcScores() + self._strongs = self._scores + if self.parent: + return self.parent._pullStrong() + return 1 + return None + + def getSelfScore(self): + return self.getScoreFor(self.curPlayer) + + def getScoreFor(self, player): + if self._scores[player] == None: + self._calcScore(player) + return self._scores[player] + + def scoreAvaible(self, player): + return self._scores[player] != None + + def scoresAvaible(self): + for p in self._scores: + if p == None: + return False + return True + + def strongScoresAvaible(self): + for p in self._strongs: + if p == None: + return False + return True + + def askUserForAction(self): + return self.state.askUserForAction(self.avaibleActions) + + def _calcScores(self): + for p in range(self.state.playersNum): + self._calcScore(p) + + def _calcScore(self, player): + winner = self._getWinner() + if winner != None: + if winner == player: + self._scores[player] = 1.0 + elif winner == -1: + self._scores[player] = 0.1 + else: + self._scores[player] = 0.0 + return + if self.universe.scoreProvider == 'naive': + self._scores[player] = self.state.getScoreFor(player) + elif self.universe.scoreProvider == 'neural': + self._scores[player] = self.state.getScoreNeural(self.universe.v_model, player) + else: + raise Exception('Unknown Score-Provider') + + def getPriority(self): + return self.state.getPriority(self.getSelfScore(), self._cascadeMemory) + + @property + def playersNum(self): + return self.state.playersNum + + @property + def avaibleActions(self): + r = [] + for c in self.childs: + r.append(c.lastAction) + return r + + @property + def curPlayer(self): + return self.state.curPlayer + + def _getWinner(self): + return self.state.checkWin() + + def getWinner(self): + if len(self.childs) == 0: + return -1 + if self._winner==-2: + self._winner = self._getWinner() + return self._winner + + def _activateEdge(self, dist=0): + if not self.strongScoresAvaible(): + self.universe.newOpen(self) + else: + for c in self.childs: + if c._cascadeMemory > 0.001*(dist-2) or random.random() < 0.01: + c._activateEdge(dist=dist+1) + self.mark_update() + + def __str__(self): + s = [] + if self.lastAction == None: + s.append("[ {ROOT} ]") + else: + s.append("[ -> "+str(self.lastAction)+" ]") + s.append("[ turn: "+str(self.state.curPlayer)+" ]") + s.append(str(self.state)) + s.append("[ score: "+str(self.getScoreFor(0))+" ]") + return '\n'.join(s) diff --git a/vacuumDecay/games/chess_game.py b/vacuumDecay/games/chess_game.py new file mode 100644 index 0000000..519212a --- /dev/null +++ b/vacuumDecay/games/chess_game.py @@ -0,0 +1,248 @@ +import numpy as np +import torch as th +from torch import nn +import torch.nn.functional as F +from PIL import Image +import chess +import chess.svg +import io + +from vacuumDecay import State, Action, Runtime, NeuralRuntime, Trainer, choose, main + +class ChessAction(Action): + def __init__(self, player, data): + super().__init__(player, data) + + def __str__(self): + return "" + + def getImage(self, state=None): + return Image.open(io.BytesIO(chess.svg.board(board=state.board, format='png', squares=[self.data.from_square, self.data.to_square], arrows=[self.move]))) + + def getTensor(self, state): + board, additionals = state.getTensor() + + tensor = np.zeros((8, 8), dtype=int) # 13 channels for piece types and move squares + + # Mark the from_square and to_square + from_row, from_col = divmod(self.data.from_square, 8) + to_row, to_col = divmod(self.data.to_square, 8) + + tensor[from_row, from_col] = 1 # Mark the "from" square + tensor[to_row, to_col] = 1 # Mark the "to" square + + # Get the piece that was moved + pieceT = np.zeros((12), dtype=int) # 13 channels for piece types and move squares + piece = state.board.piece_at(self.data.from_square) + if piece: + piece_type = { + 'p': 0, 'n': 1, 'b': 2, 'r': 3, 'q': 4, 'k': 5, + 'P': 6, 'N': 7, 'B': 8, 'R': 9, 'Q': 10, 'K': 11 + } + pieceT[piece_type[piece.symbol()]] = 1 + + # Flatten the tensor and return as a PyTorch tensor + return (board, additionals, th.concat(tensor.flatten(), pieceT.flatten())) + +piece_values = { + chess.PAWN: 1, + chess.KNIGHT: 3, + chess.BISHOP: 3, + chess.ROOK: 5, + chess.QUEEN: 9 +} + +class ChessState(State): + def __init__(self, curPlayer=0, generation=0, board=None): + if type(board) == type(None): + board = chess.Board() + self.curPlayer = curPlayer + self.generation = generation + self.playersNum = 2 + self.board = board + + def mutate(self, action): + newBoard = self.board.copy() + newBoard.push(action.data) + return ChessState(curPlayer=(self.curPlayer+1)%2, board=newBoard) + + # Function to calculate total value of pieces for a player + def calculate_piece_value(self, board, color): + value = 0 + for square in chess.scan_reversed(board.occupied_co[color]): + piece = board.piece_at(square) + if piece is not None: + value += piece_values.get(piece.piece_type, 0) + return value + + # Function to calculate winning probability for each player + def calculate_winning_probability(self): + white_piece_value = self.calculate_piece_value(self.board, chess.WHITE) + black_piece_value = self.calculate_piece_value(self.board, chess.BLACK) + total_piece_value = white_piece_value + black_piece_value + + # Calculate winning probabilities + white_probability = white_piece_value / total_piece_value + black_probability = black_piece_value / total_piece_value + + return white_probability, black_probability + + def getScoreFor(self, player): + w = self.checkWin() + if w == None: + return self.calculate_winning_probability()[player] + if w == player: + return 1 + if w == -1: + return 0.1 + return 0 + + def getAvaibleActions(self): + for move in self.board.legal_moves: + yield ChessAction(self.curPlayer, move) + + def checkWin(self): + if self.board.is_checkmate(): + return (self.curPlayer+1)%2 + elif self.board.is_stalemate(): + return -1 + return None + + def __str__(self): + return str(self.board) + + def getTensor(self): + board = self.board + piece_to_plane = { + 'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5, + 'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11 + } + + tensor = np.zeros((12, 8, 8), dtype=int) + for square in chess.SQUARES: + piece = board.piece_at(square) + if piece: + plane = piece_to_plane[piece.symbol()] + row, col = divmod(square, 8) + tensor[plane, row, col] = 1 + + # Side to move + side_to_move = np.array([1 if board.turn == chess.WHITE else 0]) + + # Castling rights + castling_rights = np.array([ + 1 if board.has_kingside_castling_rights(chess.WHITE) else 0, + 1 if board.has_queenside_castling_rights(chess.WHITE) else 0, + 1 if board.has_kingside_castling_rights(chess.BLACK) else 0, + 1 if board.has_queenside_castling_rights(chess.BLACK) else 0 + ]) + + # En passant target square + en_passant = np.zeros((8, 8), dtype=int) + if board.ep_square: + row, col = divmod(board.ep_square, 8) + en_passant[row, col] = 1 + + # Half-move clock and full-move number + half_move_clock = np.array([board.halfmove_clock]) + full_move_number = np.array([board.fullmove_number]) + + additionals = np.concatenate([ + side_to_move, + castling_rights, + en_passant.flatten(), + half_move_clock, + full_move_number + ]) + + return (th.tensor(tensor), th.tensor(additionals)) + + @classmethod + def getVModel(): + return ChessV() + + @classmethod + def getQModel(): + return ChessQ() + + def getImage(self): + return Image.open(io.BytesIO(chess.svg.board(board=self.board, format='png'))) + +class ChessV(nn.Module): + def __init__(self): + super().__init__() + # CNN for the board tensor + self.conv1 = nn.Conv2d(12, 16, kernel_size=3, padding=1) + self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1) + self.fc1 = nn.Linear(32 * 8 * 8, 256) + + # FCNN for the board tensor + self.fc2 = nn.Linear(8 * 8, 64) + + # FCNN for additional info + self.fc_additional1 = nn.Linear(71, 64) + + # Combine all outputs + self.fc_combined1 = nn.Linear(256 + 64 + 64, 128) + self.fc_combined2 = nn.Linear(128, 1) + + def forward(self, inp): + board_tensor, additional_info = inp + # Process the board tensor through the CNN + x = F.relu(self.conv1(board_tensor)) + x = F.relu(self.conv2(x)) + x = x.view(x.size(0), -1) # Flatten the tensor + x = F.relu(self.fc1(x)) + + y = F.relu(self.fc2(board_tensor.view(board_tensor.size(0), -1))) + + # Process the additional info through the FCNN + z = F.relu(self.fc_additional1(additional_info)) + + # Combine the outputs + combined = th.cat((x, y, z), dim=1) + combined = F.relu(self.fc_combined1(combined)) + logit = self.fc_combined2(combined) + + return logit + +class ChessQ(nn.Module): + def __init__(self): + super().__init__() + # CNN for the board tensor + self.conv1 = nn.Conv2d(12, 16, kernel_size=3, padding=1) + self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1) + self.fc1 = nn.Linear(32 * 8 * 8, 256) + + # FCNN for the board tensor + self.fc2 = nn.Linear(8 * 8, 64) + + # FCNN for additional info + self.fc_additional1 = nn.Linear(71, 64) + + # Combine all outputs + self.fc_combined1 = nn.Linear(256 + 64 + 64, 128) + self.fc_combined2 = nn.Linear(128, 1) + + def forward(self, inp): + board_tensor, additional_info, action = inp + # Process the board tensor through the CNN + x = F.relu(self.conv1(board_tensor)) + x = F.relu(self.conv2(x)) + x = x.view(x.size(0), -1) # Flatten the tensor + x = F.relu(self.fc1(x)) + + y = F.relu(self.fc2(board_tensor.view(board_tensor.size(0), -1))) + + # Process the additional info through the FCNN + z = F.relu(self.fc_additional1(additional_info)) + + # Combine the outputs + combined = th.cat((x, y, z), dim=1) + combined = F.relu(self.fc_combined1(combined)) + logit = self.fc_combined2(combined) + + return logit + +if __name__=="__main__": + main(ChessState, start_visualizer=False) \ No newline at end of file diff --git a/vacuumDecay/games/tictactoe.py b/vacuumDecay/games/tictactoe.py index 8da84b5..ac8b7fd 100644 --- a/vacuumDecay/games/tictactoe.py +++ b/vacuumDecay/games/tictactoe.py @@ -25,6 +25,9 @@ class TTTAction(Action): draw.line((x+40, y-40, x-40, y+40), fill='red', width=2) return img + def getTensor(self, state, player=None): + return torch.concat(torch.tensor([self.turn]), torch.tensor(state.board), torch.tensor(state.mutate(self).board)) + class TTTState(State): def __init__(self, curPlayer=0, generation=0, playersNum=2, board=None): if type(board) == type(None): @@ -66,17 +69,29 @@ class TTTState(State): s.append(" ".join([str(p) if p!=None else '.' for p in self.board[l*3:][:3]])) return "\n".join(s) - def getTensor(self): - return torch.tensor([self.turn] + self.board) + def getTensor(self, player=None): + return torch.concat(torch.tensor([self.curPlayer]), torch.tensor(self.board)) @classmethod - def getModel(): + def getVModel(cls): return torch.nn.Sequential( torch.nn.Linear(10, 10), - torch.nn.ReLu(), + torch.nn.ReLU(), torch.nn.Linear(10, 3), + torch.nn.ReLU(), + torch.nn.Linear(3,1), + torch.nn.Sigmoid(), + ) + + @classmethod + def getQModel(cls): + return torch.nn.Sequential( + torch.nn.Linear(20, 12), + torch.nn.ReLU(), + torch.nn.Linear(12, 3), + torch.nn.ReLU(), + torch.nn.Linear(3,1), torch.nn.Sigmoid(), - torch.nn.Linear(3,1) ) def getImage(self): @@ -98,4 +113,4 @@ class TTTState(State): return img if __name__=="__main__": - main(TTTState) \ No newline at end of file + main(TTTState, start_visualizer=False) \ No newline at end of file diff --git a/vacuumDecay/games/ultimatetictactoe.py b/vacuumDecay/games/ultimatetictactoe.py index 4b2dd8f..8f92152 100644 --- a/vacuumDecay/games/ultimatetictactoe.py +++ b/vacuumDecay/games/ultimatetictactoe.py @@ -3,7 +3,7 @@ A lot of this code was stolen from Pulkit Maloo (https://github.com/pulkitmaloo/ """ import numpy as np import torch -from troch import nn +from torch import nn from PIL import Image, ImageDraw from collections import Counter @@ -11,8 +11,11 @@ import itertools from vacuumDecay import State, Action, Runtime, NeuralRuntime, Trainer, choose, main +class UTTTAction(Action): + def __init__(self, player, data): + super().__init__(player, data) -class TTTState(State): +class UTTTState(State): def __init__(self, curPlayer=0, generation=0, playersNum=2, board=None, lastMove=-1): if type(board) == type(None): board = "." * 81 @@ -48,7 +51,7 @@ class TTTState(State): def mutate(self, action): newBoard = self.board[:action.data] + ['O', 'X'][self.curPlayer] + self.board[action.data+1:] - return TTTState(curPlayer=(self.curPlayer+1) % self.playersNum, playersNum=self.playersNum, board=newBoard, lastMove=action.data) + return UTTTState(curPlayer=(self.curPlayer+1) % self.playersNum, playersNum=self.playersNum, board=newBoard, lastMove=action.data) def box(self, x, y): return self.index(x, y) // 9 @@ -67,7 +70,7 @@ class TTTState(State): def getAvaibleActions(self): if self.last_move == -1: for i in range(9*9): - yield Action(self.curPlayer, i) + yield UTTTAction(self.curPlayer, i) return box_to_play = self.next_box(self.last_move) @@ -83,19 +86,6 @@ class TTTState(State): if self.board[ind] == '.': yield Action(self.curPlayer, ind) - # def getScoreFor(self, player): - # p = ['O','X'][player] - # sco = 5 - # for w in self.box_won: - # if w==p: - # sco += 1 - # elif w!='.': - # sco -= 0.5 - # return 1/sco - - # def getPriority(self, score, cascadeMem): - # return -cascadeMem*1 + 100 - def checkWin(self): self.update_box_won() game_won = self.check_small_box(self.box_won) @@ -147,11 +137,15 @@ class TTTState(State): return torch.tensor([self.symbToNum(b) for b in s]) @classmethod - def getModel(cls, phase='default'): - return Model() + def getVModel(cls, phase='default'): + return TTTV() + + @classmethod + def getQModel(cls, phase='default'): + return TTTQ() -class Model(nn.Module): +class TTTV(nn.Module): def __init__(self): super().__init__() @@ -183,13 +177,6 @@ class Model(nn.Module): nn.Linear(self.chansPerSlot*9, self.chansComp), nn.ReLU(), nn.Linear(self.chansComp, 1), - #nn.Linear(9*8, 32), - # nn.ReLU(), - #nn.Linear(32, 8), - # nn.ReLU(), - #nn.Linear(16*9, 12), - # nn.ReLU(), - #nn.Linear(12, 1), nn.Sigmoid() ) @@ -202,5 +189,54 @@ class Model(nn.Module): y = self.out(x) return y +class TTTQ(nn.Module): + def __init__(self): + super().__init__() + + self.chansPerSmol = 24 + self.chansPerSlot = 8 + self.chansComp = 8 + + self.smol = nn.Sequential( + nn.Conv2d( + in_channels=2, + out_channels=self.chansPerSmol, + kernel_size=(3, 3), + stride=3, + padding=0, + ), + nn.ReLU() + ) + self.comb = nn.Sequential( + nn.Conv1d( + in_channels=self.chansPerSmol, + out_channels=self.chansPerSlot, + kernel_size=1, + stride=1, + padding=0, + ), + nn.ReLU() + ) + self.out = nn.Sequential( + nn.Linear(self.chansPerSlot*9*2, self.chansComp), + nn.ReLU(), + nn.Linear(self.chansComp, 4), + nn.ReLU(), + nn.Linear(4, 1), + nn.Sigmoid() + ) + + def forward(self, x): + a, b = x + a = torch.reshape(a, (1, 9, 9)) + b = torch.reshape(b, (1, 9, 9)) + x = torch.stack((a,b)) + x = self.smol(x) + x = torch.reshape(x, (self.chansPerSmol, 9)) + x = self.comb(x) + x = torch.reshape(x, (-1,)) + y = self.out(x) + return y + if __name__=="__main__": - main(TTTState) \ No newline at end of file + main(UTTTState) \ No newline at end of file diff --git a/vacuumDecay/node.py b/vacuumDecay/node.py deleted file mode 100644 index 5b6caee..0000000 --- a/vacuumDecay/node.py +++ /dev/null @@ -1,204 +0,0 @@ -class Node: - def __init__(self, state, universe=None, parent=None, lastAction=None): - self.state = state - if universe == None: - print('[!] No Universe defined. Spawning one...') - universe = Universe() - self.universe = universe - self.parent = parent - self.lastAction = lastAction - - self._childs = None - self._scores = [None]*self.state.playersNum - self._strongs = [None]*self.state.playersNum - self._alive = True - self._cascadeMemory = 0 # Used for our alternative to alpha-beta pruning - - self.last_updated = time.time() # New attribute - - def update(self): - self.last_updated = time.time() - if hasattr(self.universe, 'visualizer'): - self.universe.visualizer.send_update() - - def kill(self): - self._alive = False - - def revive(self): - self._alive = True - - @property - def childs(self): - if self._childs == None: - self._expand() - return self._childs - - def _expand(self): - self._childs = [] - actions = self.state.getAvaibleActions() - for action in actions: - newNode = Node(self.state.mutate(action), - self.universe, self, action) - self._childs.append(self.universe.merge(newNode)) - self.update() - - def getStrongFor(self, player): - if self._strongs[player] != None: - return self._strongs[player] - else: - return self.getScoreFor(player) - - def _pullStrong(self): - strongs = [None]*self.playersNum - for p in range(self.playersNum): - cp = self.state.curPlayer - if cp == p: - best = float('inf') - for c in self.childs: - if c.getStrongFor(p) < best: - best = c.getStrongFor(p) - strongs[p] = best - else: - scos = [(c.getStrongFor(p), c.getStrongFor(cp)) for c in self.childs] - scos.sort(key=lambda x: x[1]) - betterHalf = scos[:max(3, int(len(scos)/3))] - myScores = [bh[0]**2 for bh in betterHalf] - strongs[p] = sqrt(myScores[0]*0.75 + sum(myScores)/(len(myScores)*4)) - update = False - for s in range(self.playersNum): - if strongs[s] != self._strongs[s]: - update = True - break - self._strongs = strongs - if update: - if self.parent != None: - cascade = self.parent._pullStrong() - else: - cascade = 2 - self._cascadeMemory = self._cascadeMemory/2 + cascade - self.update() - return cascade + 1 - self._cascadeMemory /= 2 - return 0 - - def forceStrong(self, depth=3): - if depth == 0: - self.strongDecay() - else: - if len(self.childs): - for c in self.childs: - c.forceStrong(depth-1) - else: - self.strongDecay() - self.update() - - def decayEvent(self): - for c in self.childs: - c.strongDecay() - self.update() - - def strongDecay(self): - if self._strongs == [None]*self.playersNum: - if not self.scoresAvaible(): - self._calcScores() - self._strongs = self._scores - if self.parent: - return self.parent._pullStrong() - return 1 - return None - - def getSelfScore(self): - return self.getScoreFor(self.curPlayer) - - def getScoreFor(self, player): - if self._scores[player] == None: - self._calcScore(player) - self.update() - return self._scores[player] - - def scoreAvaible(self, player): - return self._scores[player] != None - - def scoresAvaible(self): - for p in self._scores: - if p == None: - return False - return True - - def strongScoresAvaible(self): - for p in self._strongs: - if p == None: - return False - return True - - def askUserForAction(self): - return self.state.askUserForAction(self.avaibleActions) - - def _calcScores(self): - for p in range(self.state.playersNum): - self._calcScore(p) - - def _calcScore(self, player): - winner = self._getWinner() - if winner != None: - if winner == player: - self._scores[player] = 0.0 - elif winner == -1: - self._scores[player] = 2/3 - else: - self._scores[player] = 1.0 - self.update() - return - if self.universe.scoreProvider == 'naive': - self._scores[player] = self.state.getScoreFor(player) - elif self.universe.scoreProvider == 'neural': - self._scores[player] = self.state.getScoreNeural(self.universe.model, player) - else: - raise Exception('Unknown Score-Provider') - self.update() - - def getPriority(self): - return self.state.getPriority(self.getSelfScore(), self._cascadeMemory) - - @property - def playersNum(self): - return self.state.playersNum - - @property - def avaibleActions(self): - r = [] - for c in self.childs: - r.append(c.lastAction) - return r - - @property - def curPlayer(self): - return self.state.curPlayer - - def _getWinner(self): - return self.state.checkWin() - - def getWinner(self): - if len(self.childs) == 0: - return -1 - return self._getWinner() - - def _activateEdge(self, dist=0): - if not self.strongScoresAvaible(): - self.universe.newOpen(self) - else: - for c in self.childs: - if c._cascadeMemory > 0.001*(dist-2) or random.random() < 0.01: - c._activateEdge(dist=dist+1) - self.update() - - def __str__(self): - s = [] - if self.lastAction == None: - s.append("[ {ROOT} ]") - else: - s.append("[ -> "+str(self.lastAction)+" ]") - s.append("[ turn: "+str(self.state.curPlayer)+" ]") - s.append(str(self.state)) - s.append("[ score: "+str(self.getScoreFor(0))+" ]") - return '\n'.join(s) diff --git a/vacuumDecay/run.py b/vacuumDecay/run.py index b50f3a2..e066669 100644 --- a/vacuumDecay/run.py +++ b/vacuumDecay/run.py @@ -23,25 +23,25 @@ def aiVsAiLoop(StateClass, start_visualizer=False): trainer = Trainer(init, start_visualizer=start_visualizer) trainer.train() -def humanVsNaive(StateClass, start_visualizer=False): +def humanVsNaive(StateClass, start_visualizer=False, calcDepth=7): run = Runtime(StateClass(), start_visualizer=start_visualizer) - run.game() + run.game(calcDepth=calcDepth) -def main(StateClass): +def main(StateClass, **kwargs): options = ['Play Against AI', 'Play Against AI (AI begins)', 'Play Against AI (Fast Play)', 'Playground', 'Let AI train', 'Play against Naive'] opt = choose('?', options) if opt == options[0]: - humanVsAi(StateClass) + humanVsAi(StateClass,**kwargs) elif opt == options[1]: - humanVsAi(StateClass, bots=[1, 0]) + humanVsAi(StateClass, bots=[1, 0], **kwargs) elif opt == options[2]: - humanVsAi(StateClass, depth=2, noBg=True) + humanVsAi(StateClass, depth=2, noBg=True, **kwargs) elif opt == options[3]: - humanVsAi(StateClass, bots=[None, None]) + humanVsAi(StateClass, bots=[None, None], **kwargs) elif opt == options[4]: - aiVsAiLoop(StateClass) + aiVsAiLoop(StateClass, **kwargs) elif opt == options[5]: - humanVsNaive(StateClass) + humanVsNaive(StateClass, **kwargs) else: - aiVsAiLoop(StateClass) + aiVsAiLoop(StateClass, **kwargs) diff --git a/vacuumDecay/runtime.py b/vacuumDecay/runtime.py index f3af253..9205e03 100644 --- a/vacuumDecay/runtime.py +++ b/vacuumDecay/runtime.py @@ -43,14 +43,14 @@ class Runtime(): def __init__(self, initState, start_visualizer=False): universe = QueueingUniverse() self.head = Node(initState, universe=universe) + self.root = self.head _ = self.head.childs universe.newOpen(self.head) - self.visualizer = None if start_visualizer: self.startVisualizer() def startVisualizer(self): - self.visualizer = Visualizer(self.head.universe) + self.visualizer = Visualizer(self) self.visualizer.start() def spawnWorker(self): @@ -85,11 +85,11 @@ class Runtime(): self.head.forceStrong(calcDepth) opts = [] for c in self.head.childs: - opts.append((c, c.getStrongFor(self.head.curPlayer))) - opts.sort(key=lambda x: x[1]) + opts.append((c, c.getStrongFor(self.head.curPlayer) + random.random()*0.000000001)) + opts.sort(key=lambda x: x[1], reverse=True) print('[i] Evaluated Options:') for o in opts: - print('[ ]' + str(o[0].lastAction) + " (Score: "+str(o[1])+")") + print('[ ]' + str(o[0].lastAction) + " (Win prob: "+str(int((o[1])*10000)/100)+"%)") print('[#] I choose to play: ' + str(opts[0][0].lastAction)) self.performAction(opts[0][0].lastAction) else: @@ -107,22 +107,23 @@ class Runtime(): if bg: self.killWorker() - def saveModel(self, model, gen): - dat = model.state_dict() + def saveModel(self, v_model, q_model, gen): + v_state = v_model.state_dict() + q_model = q_model.state_dict() with open(self.getModelFileName(), 'wb') as f: - pickle.dump((gen, dat), f) + pickle.dump((gen, v_state, q_model), f) - def loadModelState(self, model): + def loadModelState(self, v_model, q_model): with open(self.getModelFileName(), 'rb') as f: - gen, dat = pickle.load(f) - model.load_state_dict(dat) - model.eval() + gen, v_state, q_state = pickle.load(f) + v_model.load_state_dict(v_state) + q_model.load_state_dict(q_state) return gen def loadModel(self): - model = self.head.state.getModel() - gen = self.loadModelState(model) - return model, gen + v_model, q_model = self.head.state.getVModel(), self.head.state.getQModel() + gen = self.loadModelState(v_model, q_model) + return v_model, q_model, gen def getModelFileName(self): return 'brains/uttt.vac' @@ -136,27 +137,29 @@ class NeuralRuntime(Runtime): def __init__(self, initState, **kwargs): super().__init__(initState, **kwargs) - model, gen = self.loadModel() + v_model, q_model, gen = self.loadModel() - self.head.universe.model = model + self.head.universe.v_model = v_model + self.head.universe.q_model = q_model self.head.universe.scoreProvider = 'neural' class Trainer(Runtime): def __init__(self, initState, **kwargs): super().__init__(initState, **kwargs) - #self.universe = Universe() self.universe = self.head.universe self.rootNode = self.head self.terminal = None - def buildDatasetFromModel(self, model, depth=4, refining=True, fanOut=[5, 5, 5, 5, 4, 4, 4, 4], uncertainSec=15, exacity=5): + def buildDatasetFromModel(self, v_model, q_model, depth=4, refining=True, fanOut=[5, 5, 5, 5, 4, 4, 4, 4], uncertainSec=15, exacity=5): print('[*] Building Timeline') - term = self.linearPlay(model, calcDepth=depth, exacity=exacity) + term = self.linearPlay(v_model, q_model, calcDepth=depth, exacity=exacity) if refining: print('[*] Refining Timeline (exploring alternative endings)') cur = term for d in fanOut: cur = cur.parent + if cur == None: + break cur.forceStrong(d) print('.', end='', flush=True) print('') @@ -164,9 +167,10 @@ class Trainer(Runtime): self.timelineExpandUncertain(term, uncertainSec) return term - def linearPlay(self, model, calcDepth=7, exacity=5, verbose=False, firstNRandom=2): + def linearPlay(self, v_model, q_model, calcDepth=7, exacity=5, verbose=False, firstNRandom=2): head = self.rootNode - self.universe.model = model + self.universe.v_model = v_model + self.universe.q_model = q_model self.spawnWorker() while head.getWinner() == None: if verbose: @@ -183,7 +187,7 @@ class Trainer(Runtime): firstNRandom -= 1 ind = int(random.random()*len(opts)) else: - opts.sort(key=lambda x: x[1]) + opts.sort(key=lambda x: x[1], reverse=True) if exacity >= 10: ind = 0 else: @@ -236,31 +240,52 @@ class Trainer(Runtime): self.killWorker() print('') - def trainModel(self, model, lr=0.00005, cut=0.01, calcDepth=4, exacity=5, terms=None, batch=16): + def trainModel(self, v_model, q_model, lr=0.00005, cut=0.01, calcDepth=4, exacity=5, terms=None, batch=2): loss_func = nn.MSELoss() - optimizer = optim.Adam(model.parameters(), lr) + v_optimizer = optim.Adam(v_model.parameters(), lr) + q_optimizer = optim.Adam(q_model.parameters(), lr) + print('[*] Conditioning Brain') if terms == None: terms = [] for i in range(batch): terms.append(self.buildDatasetFromModel( - model, depth=calcDepth, exacity=exacity)) - print('[*] Conditioning Brain') - for r in range(64): + v_model, q_model, depth=calcDepth, exacity=exacity)) + for r in range(16): loss_sum = 0 lLoss = 0 zeroLen = 0 for i, node in enumerate(self.timelineIter(terms)): for p in range(self.rootNode.playersNum): inp = node.state.getTensor(player=p) - gol = torch.tensor( + v = torch.tensor( [node.getStrongFor(p)], dtype=torch.float) - out = model(inp) - loss = loss_func(out, gol) - optimizer.zero_grad() - loss.backward() - optimizer.step() - loss_sum += loss.item() - if loss.item() == 0.0: + qs = [] + q_preds = [] + q_loss = torch.Tensor([0]) + if node.childs: + for child in node.childs: + sa = child.lastAction.getTensor(node.state, player=p) + q = child.getStrongFor(p) + q_pred = q_model(sa) + qs.append(q) + q_preds.append(q_pred) + qs = torch.Tensor(qs) + q_target = torch.zeros_like(qs).scatter_(0, torch.argmax(qs).unsqueeze(0), 1) + q_cur = torch.concat(q_preds) + q_loss = loss_func(q_cur, q_target) + q_optimizer.zero_grad() + q_loss.backward() + q_optimizer.step() + + v_pred = v_model(inp) + v_loss = loss_func(v_pred, v) + v_optimizer.zero_grad() + v_loss.backward() + v_optimizer.step() + + loss = v_loss.item() + q_loss.item() + loss_sum += loss + if v_loss.item() == 0.0: zeroLen += 1 if zeroLen == 5: break @@ -270,31 +295,31 @@ class Trainer(Runtime): lLoss = loss_sum return loss_sum - def main(self, model=None, gens=1024, startGen=0): + def main(self, v_model=None, q_model=None, gens=1024, startGen=0): newModel = False - if model == None: + if v_model == None or q_model==None: print('[!] No brain found. Creating new one...') newModel = True - model = self.rootNode.state.getModel() + v_model, q_model = self.rootNode.state.getVModel(), self.rootNode.state.getQModel() self.universe.scoreProvider = ['neural', 'naive'][newModel] - model.train() + v_model.train(), q_model.train() for gen in range(startGen, startGen+gens): print('[#####] Gen '+str(gen)+' training:') - loss = self.trainModel(model, calcDepth=min( + loss = self.trainModel(v_model, q_model, calcDepth=min( 4, 3+int(gen/16)), exacity=int(gen/3+1), batch=4) print('[L] '+str(loss)) self.universe.scoreProvider = 'neural' - self.saveModel(model, gen) + self.saveModel(v_model, q_model, gen) def trainFromTerm(self, term): - model, gen = self.loadModel() + v_model, q_model, gen = self.loadModel() self.universe.scoreProvider = 'neural' - self.trainModel(model, calcDepth=4, exacity=10, term=term) - self.saveModel(model) + self.trainModel(v_model, q_model, calcDepth=4, exacity=10, term=term) + self.saveModel(v_model, q_model) def train(self): if os.path.exists(self.getModelFileName()): - model, gen = self.loadModel() - self.main(model, startGen=gen+1) + v_model, q_model, gen = self.loadModel() + self.main(v_model, q_model, startGen=gen+1) else: self.main() diff --git a/vacuumDecay/templates/index.html b/vacuumDecay/templates/index.html index e74854f..ad87ab1 100644 --- a/vacuumDecay/templates/index.html +++ b/vacuumDecay/templates/index.html @@ -2,70 +2,124 @@ - Game Tree Visualization + Interactive Tree Visualization + - +
diff --git a/vacuumDecay/visualizer.py b/vacuumDecay/visualizer.py index a4712b2..e1e4b3b 100644 --- a/vacuumDecay/visualizer.py +++ b/vacuumDecay/visualizer.py @@ -3,14 +3,19 @@ import time import networkx as nx from flask import Flask, render_template, jsonify from flask_socketio import SocketIO, emit +from io import BytesIO +import base64 class Visualizer: - def __init__(self, universe): - self.universe = universe + def __init__(self, runtime): + self.runtime = runtime self.graph = nx.DiGraph() self.app = Flask(__name__) self.socketio = SocketIO(self.app) self.init_flask() + self.update_thread = threading.Thread(target=self.update_periodically) + self.update_thread.daemon = True + self.update_thread.start() def init_flask(self): @self.app.route('/') @@ -19,36 +24,19 @@ class Visualizer: @self.app.route('/data') def data(): - nodes_data = [] - edges_data = [] - for node in self.universe.iter(): - nodes_data.append({ - 'id': id(node), - 'image': node.state.getImage().tobytes() if node.state.getImage() else None, - 'value': node.getScoreFor(node.state.curPlayer), - 'last_updated': node.last_updated - }) - for child in node.childs: - edges_data.append({'source': id(node), 'target': id(child)}) - return jsonify(nodes=nodes_data, edges=edges_data) + return jsonify(self.get_data()) @self.socketio.on('connect') def handle_connect(): print('Client connected') def send_update(self): - nodes_data = [] - edges_data = [] - for node in self.universe.iter(): - nodes_data.append({ - 'id': id(node), - 'image': node.state.getImage().tobytes() if node.state.getImage() else None, - 'value': node.getScoreFor(node.state.curPlayer), - 'last_updated': node.last_updated - }) - for child in node.childs: - edges_data.append({'source': id(node), 'target': id(child)}) - self.socketio.emit('update', {'nodes': nodes_data, 'edges': edges_data}) + self.socketio.emit('update', self.get_data()) + + def update_periodically(self): + while True: + self.send_update() + time.sleep(1) def run(self): self.socketio.run(self.app, debug=True, use_reloader=False) @@ -56,3 +44,33 @@ class Visualizer: def start(self): self.thread = threading.Thread(target=self.run) self.thread.start() + + def get_data(self): + nodes_data = [] + edges_data = [] + + def add_node_data(node, depth=0): + img = None + if node.state.getImage(): # depth <= 2: + buffered = BytesIO() + node.state.getImage().save(buffered, format="JPEG") + img = base64.b64encode(buffered.getvalue()).decode("utf-8") + + nodes_data.append({ + 'id': id(node), + 'parentId': id(node.parent) if node.parent else None, + 'image': img, + 'currentPlayer': node.state.curPlayer, + 'winProbs': [node.getStrongFor(i) for i in range(node.state.playersNum)], + 'last_updated': node.last_updated + }) + + for child in node.childs: + edges_data.append({'source': id(node), 'target': id(child)}) + add_node_data(child, depth=depth + 1) + + head_node = self.runtime.head + if head_node: + add_node_data(head_node) + + return {'nodes': nodes_data, 'edges': edges_data}