Fred Damstra 2 лет назад
Сommit
e9aeefc829
4 измененных файлов с 1142 добавлено и 0 удалено
  1. 519 0
      lightsout.py
  2. 317 0
      lightsout.tablebased.py
  3. 4 0
      requirements.txt
  4. 302 0
      rlttt.py

+ 519 - 0
lightsout.py

@@ -0,0 +1,519 @@
+import gym
+import numpy as np
+import pickle
+import random
+import tensorflow as tf
+import json
+
+from collections import deque
+
+
+from tensorflow.keras import Model, Sequential
+from tensorflow.keras.layers import Dense, Embedding, Reshape
+from tensorflow.keras.optimizers import Adam
+from tf_agents.environments import py_environment
+from tf_agents.environments import suite_gym
+
+
+MAX_STEPS =  20 # Maximum number of steps when generating a board
+BOARD_ROWS = 5
+BOARD_COLS = 5
+LIMIT = 100 # start a new game if it takes this many
+
+class LightsOutEnvironment(py_environment.PyEnvironment):
+    board = None
+    previous_action = None
+
+    def _winner(self):
+        ''' Returns a 1 if we won '''
+        for i in range(BOARD_ROWS):
+            for j in range(BOARD_COLS):
+                if(self.board[i, j] != 0):
+                    return None
+        return 1
+
+    def _flip(self, value):
+        if value == 1:
+            return 0
+        return 1
+
+    def _take_action(self, position, imaginary=False):
+        ''' Applies the action and returns a new board '''
+        newboard = self._state.copy()
+        newboard[position] = self._flip(self._state[position])
+        # Left
+        if position[0] > 0:
+            newboard[(position[0]-1, position[1])] = self._flip(self._state[(position[0]-1, position[1])])
+        # Right
+        if position[0] < BOARD_COLS-1:
+            newboard[(position[0]+1, position[1])] = self._flip(self._state[(position[0]+1, position[1])])
+        # Up
+        if position[1] > 0:
+            newboard[(position[0], position[1]-1)] = self._flip(self._state[(position[0], position[1]-1)])
+        # Down
+        if position[1] < BOARD_ROWS-1:
+            newboard[(position[0], position[1]+1)] = self._flip(self._state[(position[0], position[1]+1)])
+        
+        if not imaginary:
+            self.previous_action = position
+            self._state = newboard
+
+        return newboard
+
+    def _available_positions(self):
+        ''' We can push any button except the one we just did '''
+        positions = []
+        for i in range(BOARD_ROWS):
+            for j in range(BOARD_COLS):
+                if (i, j) != self.previous_action:
+                    positions.append((i, j))  # need to be tuple
+        return positions
+
+    def _gen_solvable_board(self):
+        ''' Generates a new solvable board '''
+        self._state = np.zeros((BOARD_ROWS, BOARD_COLS))
+        steps = self.rng.integers(1, MAX_STEPS)
+        self.previous_action = None
+        for i in range(steps):
+              positions = self.availablePositions()
+              idx = np.random.choice(len(positions))
+              action = positions[idx]
+              self._take_action(position=action, imaginary=False)
+
+    def __init__(self):
+        self.rng = np.random.default_rng()
+        self._action_spec = array_spec.BoundedArraySpec(
+            shape=(2,), dtype=np.int, minimum=0, maximum=(BOARD_ROWS - 1, BOARD_COLS - 1), name='action')
+        self._observation_spec = array_spec.BoundedArraySpec(
+            shape=(BOARD_ROWS, BOARD_COLS), dtype=np.int, minimum=0, maximum=1, name='observation')
+        self._gen_solvable_board()
+        self._episode_ended = False
+        self.current_steps = 0
+
+    def action_spec(self):
+        return self._action_spec
+
+    def observation_spec(self):
+        return self._observation_spec
+
+    def _reset(self):
+        self._gen_solvable_board()
+        self._episode_ended = False
+        self.current_steps = 0
+        return ts.restart(self._state)
+
+    def _step(self, action):
+      if self._episode_ended:
+          # The last action ended the episode. Ignore the current action and start
+          # a new episode.
+          return self._reset()
+
+      self.current_steps += 1
+      
+      if self.current_steps >= MAX_STEPS:
+          self._episode_ended = True
+          return ts.termination(self._state, -1)
+      elif self._winner():
+          self._episode_ended = True
+          return ts.termination(self._state, 1)
+      else:
+          self._take_action(action)
+
+      return ts.transition(
+          self._state, reward=0.0, discount=1.0)
+
+
+def main():
+    # New tensorflow version
+    enviroment = gym.make("Taxi-v2").env
+    enviroment.render()
+
+    print('Number of states: {}'.format(enviroment.observation_space.n))
+    print('Number of actions: {}'.format(enviroment.action_space.n))
+
+class Agent:
+    def __init__(self, enviroment, optimizer):
+        
+        # Initialize atributes
+        self._state_size = enviroment.observation_space.n
+        self._action_size = enviroment.action_space.n
+        self._optimizer = optimizer
+        
+        self.expirience_replay = deque(maxlen=2000)
+        
+        # Initialize discount and exploration rate
+        self.gamma = 0.6
+        self.epsilon = 0.1
+        
+        # Build networks
+        self.q_network = self._build_compile_model()
+        self.target_network = self._build_compile_model()
+        self.alighn_target_model()
+
+    def store(self, state, action, reward, next_state, terminated):
+        self.expirience_replay.append((state, action, reward, next_state, terminated))
+    
+    def _build_compile_model(self):
+        model = Sequential()
+        model.add(Embedding(self._state_size, 10, input_length=1))
+        model.add(Reshape((10,)))
+        model.add(Dense(50, activation='relu'))
+        model.add(Dense(50, activation='relu'))
+        model.add(Dense(self._action_size, activation='linear'))
+        
+        model.compile(loss='mse', optimizer=self._optimizer)
+        return model
+
+    def alighn_target_model(self):
+        self.target_network.set_weights(self.q_network.get_weights())
+    
+    def act(self, state):
+        if np.random.rand() <= self.epsilon:
+            return enviroment.action_space.sample()
+        
+        q_values = self.q_network.predict(state)
+        return np.argmax(q_values[0])
+
+    def retrain(self, batch_size):
+        minibatch = random.sample(self.expirience_replay, batch_size)
+        
+        for state, action, reward, next_state, terminated in minibatch:
+            
+            target = self.q_network.predict(state)
+            
+            if terminated:
+                target[0][action] = reward
+            else:
+                t = self.target_network.predict(next_state)
+                target[0][action] = reward + self.gamma * np.amax(t)
+            
+            self.q_network.fit(state, target, epochs=1, verbose=0)
+
+def __init__(self, enviroment, optimizer):      
+  # Initialize atributes
+  self._state_size = enviroment.observation_space.n
+  self._action_size = enviroment.action_space.n
+  self._optimizer = optimizer
+
+  self.expirience_replay = deque(maxlen=2000)
+
+  # Initialize discount and exploration rate
+  self.gamma = 0.6
+  self.epsilon = 0.1
+
+  # Build networks
+  self.q_network = self._build_compile_model()
+  self.target_network = self._build_compile_model()
+  self.alighn_target_model()
+
+
+######################
+## Old stuff
+class State:
+    def __init__(self, p1):
+        self.rng = np.random.default_rng()
+        self._state = np.zeros((BOARD_ROWS, BOARD_COLS))
+        self.player = p1
+        self.isEnd = False
+        self._stateHash = None
+        # init p1 plays first
+        self.playerSymbol = 1
+        self.previous_action = None # We don't allow ourselves to hit the same button 2x
+        self.record = {}
+        self.record['wins'] = 0
+        self.record['losses'] = 0
+        self.record['longest'] = 0
+        self.record['shortest'] = LIMIT
+        self.record['current_rounds'] = 0
+        self.record['decaying_average_wins'] = 0.0
+        self.record['decaying_average_moves'] = 1.0 * LIMIT
+        self.reset()
+
+    # get unique hash of current board state
+    def getHash(self):
+        self._stateHash = str(self._state.reshape(BOARD_COLS * BOARD_ROWS))
+        return self._stateHash
+
+    def winner(self):
+        if self.record['current_rounds'] > LIMIT:
+            return -1
+        for i in range(BOARD_ROWS):
+            for j in range(BOARD_COLS):
+                if(self._state[i, j] != 0):
+                    return None
+        return 1
+
+    def availablePositions(self):
+        ''' We can push any button except the one we just did '''
+        positions = []
+        for i in range(BOARD_ROWS):
+            for j in range(BOARD_COLS):
+                if (i, j) != self.previous_action:
+                    positions.append((i, j))  # need to be tuple
+        return positions
+
+    def _flip(self, value):
+        if value == 1:
+            return 0
+        return 1
+
+    def updateState(self, position):
+        ''' Chose action position, so update the board by inverting the lights in a plus '''
+        self._state[position] = self._flip(self._state[position])
+        self.previous_action = position
+        # Left
+        if position[0] > 0:
+            self._state[(position[0]-1, position[1])] = self._flip(self._state[(position[0]-1, position[1])])
+        # Right
+        if position[0] < BOARD_COLS-1:
+            self._state[(position[0]+1, position[1])] = self._flip(self._state[(position[0]+1, position[1])])
+        # Up
+        if position[1] > 0:
+            self._state[(position[0], position[1]-1)] = self._flip(self._state[(position[0], position[1]-1)])
+        # Down
+        if position[1] < BOARD_ROWS-1:
+            self._state[(position[0], position[1]+1)] = self._flip(self._state[(position[0], position[1]+1)])
+
+
+    # only when game ends
+    def giveReward(self):
+        result = self.winner()
+        # backpropagate reward
+        # While we could use result directly, we may want to tune rewards
+        if result == 1:
+            #print(f'********* WINNNER *************')
+            self.record['wins'] += 1
+            self.record['decaying_average_wins'] = ((99.0 * self.record['decaying_average_wins'] + 1) / 100.0)
+            self.record['decaying_average_moves'] = ((99.0 * self.record['decaying_average_moves'] + self.record['current_rounds']) / 100.0)
+            if self.record['current_rounds'] > self.record['longest']:
+                self.record['longest'] = self.record['current_rounds']
+            if self.record['current_rounds'] < self.record['shortest']:
+                self.record['shortest'] = self.record['current_rounds']
+            self.player.feedReward(1)
+        elif result == -1:
+            #print(f'--------- LOSER ---------------')
+            self.record['losses'] += 1
+            self.record['decaying_average_wins'] = ((99.0 * self.record['decaying_average_wins'] + 0) / 100.0)
+            self.record['decaying_average_moves'] = ((99.0 * self.record['decaying_average_moves'] + self.record['current_rounds']) / 100.0)
+            if self.record['current_rounds'] > self.record['longest']:
+                self.record['longest'] = self.record['current_rounds']
+            self.player.feedReward(-1)
+        else:
+            self.player.feedReward(0)
+
+    def gen_solvable_board(self, steps):
+        ''' Generates a random solvable board by starting with an empty board 
+            and pressing buttons for 'steps' times
+        '''
+        self._state = np.zeros((BOARD_ROWS, BOARD_COLS))
+        for i in range(steps):
+            positions = self.availablePositions()
+            idx = np.random.choice(len(positions))
+            action = positions[idx]
+            self.updateState(action)
+        self.previous_action = None
+
+    # board reset
+    def reset(self):
+        ''' random board '''
+        self.gen_solvable_board(self.rng.integers(1, MAX_STEPS))
+        self._stateHash = str(self._state.reshape(BOARD_COLS * BOARD_ROWS))
+        self.isEnd = False
+        self.record['current_rounds'] = 0
+        self.previous_action = None
+
+    def play(self, rounds=100):
+        showing = False
+        for i in range(rounds):
+            if (i % 100) == 99 and not showing:
+                showing = True
+            if (i % 100) == 0 and not showing:
+                #print(f'1000 Rounds. Showing rest of game until win.')
+                print(f'Round {i}; Stats: {json.dumps(self.record)}')
+                showing = False
+            while not self.isEnd:
+                if showing:
+                    self.showBoard()
+                # Player
+                positions = self.availablePositions()
+                player_action = self.player.chooseAction(positions, self._state)
+                # take action and upate board state
+                if showing:
+                    print(f'Step {self.record["current_rounds"]}: Chose position: [{player_action}]')
+                self.updateState(player_action)
+                board_hash = self.getHash()
+                self.player.addState(board_hash)
+                # check board status if it is end
+                self.record['current_rounds'] += 1
+
+                win = self.winner()
+                if win is not None:
+                    # self.showBoard()
+                    # ended with p1 either win or draw
+                    self.giveReward()
+                    self.player.reset()
+                    self.reset()
+                    showing = False
+                    break
+
+    # play with human
+    def play2(self):
+        while not self.isEnd:
+            self.showBoard()
+            positions = self.availablePositions()
+            player_action = self.player.chooseAction(positions, self._state)
+            # take action and upate board state
+            self.updateState(player_action)
+            # check board status if it is end
+            win = self.winner()
+            if win is not None:
+                if win == 1:
+                    print("Player wins!")
+                else:
+                    print("You have extraordinary patience. But lost.")
+                self.reset()
+                break
+
+    def showBoard(self):
+        for i in range(0, BOARD_ROWS):
+            print('-' * (4 * BOARD_COLS + 1))
+            out = '| '
+            for j in range(0, BOARD_COLS):
+                if self._state[i, j] == 1:
+                    token = 'O'
+                if self._state[i, j] == 0:
+                    token = ' '
+                out += token + ' | '
+            print(out)
+        print('-' * (4 * BOARD_COLS + 1))
+
+
+class Player:
+    def __init__(self, name, exp_rate=0.01):
+        self.name = name
+        self.states = []  # record all positions taken
+        self.lr = 0.2
+        self.exp_rate = exp_rate
+        self.decay_gamma = 0.9
+        self.states_value = {}  # state -> value
+
+    def getHash(self, board):
+        boardHash = str(board.reshape(BOARD_COLS * BOARD_ROWS))
+        return boardHash
+
+    def _flip(self, value):
+        if value == 1:
+            return 0
+        return 1
+
+    def imagineState(self, newboard, position):
+        ''' Create a board that would be the state of the action '''
+        newboard[position] = self._flip(newboard[position])
+        # Left
+        if position[0] > 0:
+            newboard[(position[0]-1, position[1])] = self._flip(newboard[(position[0]-1, position[1])])
+        # Right
+        if position[0] < BOARD_COLS-1:
+            newboard[(position[0]+1, position[1])] = self._flip(newboard[(position[0]+1, position[1])])
+        # Up
+        if position[1] > 0:
+            newboard[(position[0], position[1]-1)] = self._flip(newboard[(position[0], position[1]-1)])
+        # Down
+        if position[1] < BOARD_ROWS-1:
+            newboard[(position[0], position[1]+1)] = self._flip(newboard[(position[0], position[1]+1)])
+        return newboard
+
+    def chooseAction(self, positions, current_board):
+        value_max = -999
+        found_good_state = False
+        if np.random.uniform(0, 1) <= self.exp_rate:
+            # take random action
+            idx = np.random.choice(len(positions))
+            action = positions[idx]
+        else:
+            for p in positions:
+                next_board = current_board.copy()
+                next_board = self.imagineState(next_board, p)
+                next_boardHash = self.getHash(next_board)
+                value = self.states_value.get(next_boardHash)
+                if value is not None:
+                    found_good_state = True
+                else:    
+                    value = 0.0
+                # print("value", value)
+                if value >= value_max:
+                    value_max = value
+                    action = p
+        # print("{} takes action {}".format(self.name, action))
+        if not found_good_state:
+            # We didn't find anything with a value, so explore
+            idx = np.random.choice(len(positions))
+            action = positions[idx]
+
+        return action
+
+    # append a hash state
+    def addState(self, state):
+        self.states.append(state)
+
+    # at the end of game, backpropagate and update states value
+    def feedReward(self, reward):
+        for st in reversed(self.states):
+            if self.states_value.get(st) is None:
+                self.states_value[st] = 0
+            self.states_value[st] += self.lr * (self.decay_gamma * reward - self.states_value[st])
+            reward = self.states_value[st]
+
+    def reset(self):
+        self.states = []
+
+    def savePolicy(self):
+        fw = open('policy_' + str(self.name), 'wb')
+        pickle.dump(self.states_value, fw)
+        fw.close()
+
+    def loadPolicy(self, file):
+        fr = open(file, 'rb')
+        self.states_value = pickle.load(fr)
+        fr.close()
+
+
+class HumanPlayer:
+    def __init__(self, name):
+        self.name = name
+
+    def chooseAction(self, positions, current_board):
+        while True:
+            row = int(input("Input your action row:"))
+            col = int(input("Input your action col:"))
+            action = (row, col)
+            if action in positions:
+                return action
+
+    # append a hash state
+    def addState(self, state):
+        pass
+
+    # at the end of game, backpropagate and update states value
+    def feedReward(self, reward):
+        pass
+
+    def reset(self):
+        pass
+
+
+if __name__ == "__main__":
+    # training
+    player = Player("player")
+
+    st = State(player)
+    print("training...")
+    st.play(50000)
+
+    #player.savePolicy()
+
+    # play with human
+    human = HumanPlayer("human")
+
+    st = State(human)
+    st.play2()

+ 317 - 0
lightsout.tablebased.py

@@ -0,0 +1,317 @@
+import json
+import numpy as np
+import pickle
+
+MAX_STEPS =  20 # Maximum number of steps when generating a board
+BOARD_ROWS = 5
+BOARD_COLS = 5
+LIMIT = 100 # start a new game if it takes this many
+
+class State:
+    def __init__(self, p1):
+        self.rng = np.random.default_rng()
+        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
+        self.player = p1
+        self.isEnd = False
+        self.boardHash = None
+        # init p1 plays first
+        self.playerSymbol = 1
+        self.previous_action = None # We don't allow ourselves to hit the same button 2x
+        self.record = {}
+        self.record['wins'] = 0
+        self.record['losses'] = 0
+        self.record['longest'] = 0
+        self.record['shortest'] = LIMIT
+        self.record['current_rounds'] = 0
+        self.record['decaying_average_wins'] = 0.0
+        self.record['decaying_average_moves'] = 1.0 * LIMIT
+        self.reset()
+
+    # get unique hash of current board state
+    def getHash(self):
+        self.boardHash = str(self.board.reshape(BOARD_COLS * BOARD_ROWS))
+        return self.boardHash
+
+    def winner(self):
+        if self.record['current_rounds'] > LIMIT:
+            return -1
+        for i in range(BOARD_ROWS):
+            for j in range(BOARD_COLS):
+                if(self.board[i, j] != 0):
+                    return None
+        return 1
+
+    def availablePositions(self):
+        ''' We can push any button except the one we just did '''
+        positions = []
+        for i in range(BOARD_ROWS):
+            for j in range(BOARD_COLS):
+                if (i, j) != self.previous_action:
+                    positions.append((i, j))  # need to be tuple
+        return positions
+
+    def _flip(self, value):
+        if value == 1:
+            return 0
+        return 1
+
+    def updateState(self, position):
+        ''' Chose action position, so update the board by inverting the lights in a plus '''
+        self.board[position] = self._flip(self.board[position])
+        self.previous_action = position
+        # Left
+        if position[0] > 0:
+            self.board[(position[0]-1, position[1])] = self._flip(self.board[(position[0]-1, position[1])])
+        # Right
+        if position[0] < BOARD_COLS-1:
+            self.board[(position[0]+1, position[1])] = self._flip(self.board[(position[0]+1, position[1])])
+        # Up
+        if position[1] > 0:
+            self.board[(position[0], position[1]-1)] = self._flip(self.board[(position[0], position[1]-1)])
+        # Down
+        if position[1] < BOARD_ROWS-1:
+            self.board[(position[0], position[1]+1)] = self._flip(self.board[(position[0], position[1]+1)])
+
+
+    # only when game ends
+    def giveReward(self):
+        result = self.winner()
+        # backpropagate reward
+        # While we could use result directly, we may want to tune rewards
+        if result == 1:
+            #print(f'********* WINNNER *************')
+            self.record['wins'] += 1
+            self.record['decaying_average_wins'] = ((99.0 * self.record['decaying_average_wins'] + 1) / 100.0)
+            self.record['decaying_average_moves'] = ((99.0 * self.record['decaying_average_moves'] + self.record['current_rounds']) / 100.0)
+            if self.record['current_rounds'] > self.record['longest']:
+                self.record['longest'] = self.record['current_rounds']
+            if self.record['current_rounds'] < self.record['shortest']:
+                self.record['shortest'] = self.record['current_rounds']
+            self.player.feedReward(1)
+        elif result == -1:
+            #print(f'--------- LOSER ---------------')
+            self.record['losses'] += 1
+            self.record['decaying_average_wins'] = ((99.0 * self.record['decaying_average_wins'] + 0) / 100.0)
+            self.record['decaying_average_moves'] = ((99.0 * self.record['decaying_average_moves'] + self.record['current_rounds']) / 100.0)
+            if self.record['current_rounds'] > self.record['longest']:
+                self.record['longest'] = self.record['current_rounds']
+            self.player.feedReward(-1)
+        else:
+            self.player.feedReward(0)
+
+    def gen_solvable_board(self, steps):
+        ''' Generates a random solvable board by starting with an empty board 
+            and pressing buttons for 'steps' times
+        '''
+        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
+        for i in range(steps):
+            positions = self.availablePositions()
+            idx = np.random.choice(len(positions))
+            action = positions[idx]
+            self.updateState(action)
+        self.previous_action = None
+
+    # board reset
+    def reset(self):
+        ''' random board '''
+        self.gen_solvable_board(self.rng.integers(1, MAX_STEPS))
+        self.boardHash = str(self.board.reshape(BOARD_COLS * BOARD_ROWS))
+        self.isEnd = False
+        self.record['current_rounds'] = 0
+        self.previous_action = None
+
+    def play(self, rounds=100):
+        showing = False
+        for i in range(rounds):
+            if (i % 100) == 99 and not showing:
+                showing = True
+            if (i % 100) == 0 and not showing:
+                #print(f'1000 Rounds. Showing rest of game until win.')
+                print(f'Round {i}; Stats: {json.dumps(self.record)}')
+                showing = False
+            while not self.isEnd:
+                if showing:
+                    self.showBoard()
+                # Player
+                positions = self.availablePositions()
+                player_action = self.player.chooseAction(positions, self.board)
+                # take action and upate board state
+                if showing:
+                    print(f'Step {self.record["current_rounds"]}: Chose position: [{player_action}]')
+                self.updateState(player_action)
+                board_hash = self.getHash()
+                self.player.addState(board_hash)
+                # check board status if it is end
+                self.record['current_rounds'] += 1
+
+                win = self.winner()
+                if win is not None:
+                    # self.showBoard()
+                    # ended with p1 either win or draw
+                    self.giveReward()
+                    self.player.reset()
+                    self.reset()
+                    showing = False
+                    break
+
+    # play with human
+    def play2(self):
+        while not self.isEnd:
+            self.showBoard()
+            positions = self.availablePositions()
+            player_action = self.player.chooseAction(positions, self.board)
+            # take action and upate board state
+            self.updateState(player_action)
+            # check board status if it is end
+            win = self.winner()
+            if win is not None:
+                if win == 1:
+                    print("Player wins!")
+                else:
+                    print("You have extraordinary patience. But lost.")
+                self.reset()
+                break
+
+    def showBoard(self):
+        for i in range(0, BOARD_ROWS):
+            print('-' * (4 * BOARD_COLS + 1))
+            out = '| '
+            for j in range(0, BOARD_COLS):
+                if self.board[i, j] == 1:
+                    token = 'O'
+                if self.board[i, j] == 0:
+                    token = ' '
+                out += token + ' | '
+            print(out)
+        print('-' * (4 * BOARD_COLS + 1))
+
+
+class Player:
+    def __init__(self, name, exp_rate=0.01):
+        self.name = name
+        self.states = []  # record all positions taken
+        self.lr = 0.2
+        self.exp_rate = exp_rate
+        self.decay_gamma = 0.9
+        self.states_value = {}  # state -> value
+
+    def getHash(self, board):
+        boardHash = str(board.reshape(BOARD_COLS * BOARD_ROWS))
+        return boardHash
+
+    def _flip(self, value):
+        if value == 1:
+            return 0
+        return 1
+
+    def imagineState(self, newboard, position):
+        ''' Create a board that would be the state of the action '''
+        newboard[position] = self._flip(newboard[position])
+        # Left
+        if position[0] > 0:
+            newboard[(position[0]-1, position[1])] = self._flip(newboard[(position[0]-1, position[1])])
+        # Right
+        if position[0] < BOARD_COLS-1:
+            newboard[(position[0]+1, position[1])] = self._flip(newboard[(position[0]+1, position[1])])
+        # Up
+        if position[1] > 0:
+            newboard[(position[0], position[1]-1)] = self._flip(newboard[(position[0], position[1]-1)])
+        # Down
+        if position[1] < BOARD_ROWS-1:
+            newboard[(position[0], position[1]+1)] = self._flip(newboard[(position[0], position[1]+1)])
+        return newboard
+
+    def chooseAction(self, positions, current_board):
+        value_max = -999
+        found_good_state = False
+        if np.random.uniform(0, 1) <= self.exp_rate:
+            # take random action
+            idx = np.random.choice(len(positions))
+            action = positions[idx]
+        else:
+            for p in positions:
+                next_board = current_board.copy()
+                next_board = self.imagineState(next_board, p)
+                next_boardHash = self.getHash(next_board)
+                value = self.states_value.get(next_boardHash)
+                if value is not None:
+                    found_good_state = True
+                else:    
+                    value = 0.0
+                # print("value", value)
+                if value >= value_max:
+                    value_max = value
+                    action = p
+        # print("{} takes action {}".format(self.name, action))
+        if not found_good_state:
+            # We didn't find anything with a value, so explore
+            idx = np.random.choice(len(positions))
+            action = positions[idx]
+
+        return action
+
+    # append a hash state
+    def addState(self, state):
+        self.states.append(state)
+
+    # at the end of game, backpropagate and update states value
+    def feedReward(self, reward):
+        for st in reversed(self.states):
+            if self.states_value.get(st) is None:
+                self.states_value[st] = 0
+            self.states_value[st] += self.lr * (self.decay_gamma * reward - self.states_value[st])
+            reward = self.states_value[st]
+
+    def reset(self):
+        self.states = []
+
+    def savePolicy(self):
+        fw = open('policy_' + str(self.name), 'wb')
+        pickle.dump(self.states_value, fw)
+        fw.close()
+
+    def loadPolicy(self, file):
+        fr = open(file, 'rb')
+        self.states_value = pickle.load(fr)
+        fr.close()
+
+
+class HumanPlayer:
+    def __init__(self, name):
+        self.name = name
+
+    def chooseAction(self, positions, current_board):
+        while True:
+            row = int(input("Input your action row:"))
+            col = int(input("Input your action col:"))
+            action = (row, col)
+            if action in positions:
+                return action
+
+    # append a hash state
+    def addState(self, state):
+        pass
+
+    # at the end of game, backpropagate and update states value
+    def feedReward(self, reward):
+        pass
+
+    def reset(self):
+        pass
+
+
+if __name__ == "__main__":
+    # training
+    player = Player("player")
+
+    st = State(player)
+    print("training...")
+    st.play(50000)
+
+    #player.savePolicy()
+
+    # play with human
+    human = HumanPlayer("human")
+
+    st = State(human)
+    st.play2()

+ 4 - 0
requirements.txt

@@ -0,0 +1,4 @@
+gym[atari]
+numpy
+tensorflow-macos
+tf-agents

+ 302 - 0
rlttt.py

@@ -0,0 +1,302 @@
+import json
+import numpy as np
+import pickle
+
+BOARD_ROWS = 3
+BOARD_COLS = 3
+
+
+class State:
+    def __init__(self, p1, p2):
+        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
+        self.p1 = p1
+        self.p2 = p2
+        self.isEnd = False
+        self.boardHash = None
+        # init p1 plays first
+        self.playerSymbol = 1
+        self.wins = {}
+        self.wins[p1.name] = 0
+        self.wins[p2.name] = 0
+        self.wins["tie"] = 0
+
+
+    # get unique hash of current board state
+    def getHash(self):
+        self.boardHash = str(self.board.reshape(BOARD_COLS * BOARD_ROWS))
+        return self.boardHash
+
+    def winner(self):
+        # row
+        for i in range(BOARD_ROWS):
+            if sum(self.board[i, :]) == 3:
+                self.isEnd = True
+                self.wins[self.p1.name] += 1
+                return 1
+            if sum(self.board[i, :]) == -3:
+                self.isEnd = True
+                self.wins[self.p2.name] += 1
+                return -1
+        # col
+        for i in range(BOARD_COLS):
+            if sum(self.board[:, i]) == 3:
+                self.isEnd = True
+                self.wins[self.p1.name] += 1
+                return 1
+            if sum(self.board[:, i]) == -3:
+                self.isEnd = True
+                self.wins[self.p2.name] += 1
+                return -1
+        # diagonal
+        diag_sum1 = sum([self.board[i, i] for i in range(BOARD_COLS)])
+        diag_sum2 = sum([self.board[i, BOARD_COLS - i - 1] for i in range(BOARD_COLS)])
+        diag_sum = max(abs(diag_sum1), abs(diag_sum2))
+        if diag_sum == 3:
+            self.isEnd = True
+            if diag_sum1 == 3 or diag_sum2 == 3:
+                self.wins[self.p1.name] += 1
+                return 1
+            else:
+                self.wins[self.p2.name] += 1
+                return -1
+
+        # tie
+        # no available positions
+        if len(self.availablePositions()) == 0:
+            self.isEnd = True
+            self.wins["tie"] += 1
+            return 0
+        # not end
+        self.isEnd = False
+        return None
+
+    def availablePositions(self):
+        positions = []
+        for i in range(BOARD_ROWS):
+            for j in range(BOARD_COLS):
+                if self.board[i, j] == 0:
+                    positions.append((i, j))  # need to be tuple
+        return positions
+
+    def updateState(self, position):
+        self.board[position] = self.playerSymbol
+        # switch to another player
+        self.playerSymbol = -1 if self.playerSymbol == 1 else 1
+
+    # only when game ends
+    def giveReward(self):
+        result = self.winner()
+        # backpropagate reward
+        if result == 1:
+            self.p1.feedReward(1)
+            self.p2.feedReward(0)
+        elif result == -1:
+            self.p1.feedReward(0)
+            self.p2.feedReward(1)
+        else:
+            self.p1.feedReward(0.1)
+            self.p2.feedReward(0.5)
+
+    # board reset
+    def reset(self):
+        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
+        self.boardHash = None
+        self.isEnd = False
+        self.playerSymbol = 1
+
+    def play(self, rounds=100):
+        for i in range(rounds):
+            if i % 1000 == 0:
+                print(f"Rounds {i}; Current Results: {json.dumps(self.wins)}")
+            while not self.isEnd:
+                # Player 1
+                positions = self.availablePositions()
+                p1_action = self.p1.chooseAction(positions, self.board, self.playerSymbol)
+                # take action and upate board state
+                self.updateState(p1_action)
+                board_hash = self.getHash()
+                self.p1.addState(board_hash)
+                # check board status if it is end
+
+                win = self.winner()
+                if win is not None:
+                    # self.showBoard()
+                    # ended with p1 either win or draw
+                    self.giveReward()
+                    self.p1.reset()
+                    self.p2.reset()
+                    self.reset()
+                    break
+
+                else:
+                    # Player 2
+                    positions = self.availablePositions()
+                    p2_action = self.p2.chooseAction(positions, self.board, self.playerSymbol)
+                    self.updateState(p2_action)
+                    board_hash = self.getHash()
+                    self.p2.addState(board_hash)
+
+                    win = self.winner()
+                    if win is not None:
+                        # self.showBoard()
+                        # ended with p2 either win or draw
+                        self.giveReward()
+                        self.p1.reset()
+                        self.p2.reset()
+                        self.reset()
+                        break
+
+    # play with human
+    def play2(self):
+        while not self.isEnd:
+            # Player 1
+            positions = self.availablePositions()
+            p1_action = self.p1.chooseAction(positions, self.board, self.playerSymbol)
+            # take action and upate board state
+            self.updateState(p1_action)
+            self.showBoard()
+            # check board status if it is end
+            win = self.winner()
+            if win is not None:
+                if win == 1:
+                    print(self.p1.name, "wins!")
+                else:
+                    print("tie!")
+                self.reset()
+                break
+
+            else:
+                # Player 2
+                positions = self.availablePositions()
+                p2_action = self.p2.chooseAction(positions)
+
+                self.updateState(p2_action)
+                self.showBoard()
+                win = self.winner()
+                if win is not None:
+                    if win == -1:
+                        print(self.p2.name, "wins!")
+                    else:
+                        print("tie!")
+                    self.reset()
+                    break
+
+    def showBoard(self):
+        # p1: x  p2: o
+        for i in range(0, BOARD_ROWS):
+            print('-------------')
+            out = '| '
+            for j in range(0, BOARD_COLS):
+                if self.board[i, j] == 1:
+                    token = 'x'
+                if self.board[i, j] == -1:
+                    token = 'o'
+                if self.board[i, j] == 0:
+                    token = ' '
+                out += token + ' | '
+            print(out)
+        print('-------------')
+
+
+class Player:
+    def __init__(self, name, exp_rate=0.3):
+        self.name = name
+        self.states = []  # record all positions taken
+        self.lr = 0.2
+        self.exp_rate = exp_rate
+        self.decay_gamma = 0.9
+        self.states_value = {}  # state -> value
+
+    def getHash(self, board):
+        boardHash = str(board.reshape(BOARD_COLS * BOARD_ROWS))
+        return boardHash
+
+    def chooseAction(self, positions, current_board, symbol):
+        if np.random.uniform(0, 1) <= self.exp_rate:
+            # take random action
+            idx = np.random.choice(len(positions))
+            action = positions[idx]
+        else:
+            value_max = -999
+            for p in positions:
+                next_board = current_board.copy()
+                next_board[p] = symbol
+                next_boardHash = self.getHash(next_board)
+                value = 0 if self.states_value.get(next_boardHash) is None else self.states_value.get(next_boardHash)
+                # print("value", value)
+                if value >= value_max:
+                    value_max = value
+                    action = p
+        # print("{} takes action {}".format(self.name, action))
+        return action
+
+    # append a hash state
+    def addState(self, state):
+        self.states.append(state)
+
+    # at the end of game, backpropagate and update states value
+    def feedReward(self, reward):
+        for st in reversed(self.states):
+            if self.states_value.get(st) is None:
+                self.states_value[st] = 0
+            self.states_value[st] += self.lr * (self.decay_gamma * reward - self.states_value[st])
+            reward = self.states_value[st]
+
+    def reset(self):
+        self.states = []
+
+    def savePolicy(self):
+        fw = open('policy_' + str(self.name), 'wb')
+        pickle.dump(self.states_value, fw)
+        fw.close()
+
+    def loadPolicy(self, file):
+        fr = open(file, 'rb')
+        self.states_value = pickle.load(fr)
+        fr.close()
+
+
+class HumanPlayer:
+    def __init__(self, name):
+        self.name = name
+
+    def chooseAction(self, positions):
+        while True:
+            row = int(input("Input your action row:"))
+            col = int(input("Input your action col:"))
+            action = (row, col)
+            if action in positions:
+                return action
+
+    # append a hash state
+    def addState(self, state):
+        pass
+
+    # at the end of game, backpropagate and update states value
+    def feedReward(self, reward):
+        pass
+
+    def reset(self):
+        pass
+
+
+if __name__ == "__main__":
+    # training
+    p1 = Player("p1")
+    p2 = Player("p2")
+
+    st = State(p1, p2)
+    print("training...")
+    st.play(50000)
+
+    p1.savePolicy()
+    p2.savePolicy()
+
+    # play with human
+    p1 = Player("computer", exp_rate=0)
+    p1.loadPolicy("policy_p1")
+
+    p2 = HumanPlayer("human")
+
+    st = State(p1, p2)
+    st.play2()