import gym import numpy as np import pickle import random import tensorflow as tf import json from collections import deque from tensorflow.keras import Model, Sequential from tensorflow.keras.layers import Dense, Embedding, Reshape from tensorflow.keras.optimizers import Adam from tf_agents.environments import py_environment from tf_agents.environments import suite_gym MAX_STEPS = 20 # Maximum number of steps when generating a board BOARD_ROWS = 5 BOARD_COLS = 5 LIMIT = 100 # start a new game if it takes this many class LightsOutEnvironment(py_environment.PyEnvironment): board = None previous_action = None def _winner(self): ''' Returns a 1 if we won ''' for i in range(BOARD_ROWS): for j in range(BOARD_COLS): if(self.board[i, j] != 0): return None return 1 def _flip(self, value): if value == 1: return 0 return 1 def _take_action(self, position, imaginary=False): ''' Applies the action and returns a new board ''' newboard = self._state.copy() newboard[position] = self._flip(self._state[position]) # Left if position[0] > 0: newboard[(position[0]-1, position[1])] = self._flip(self._state[(position[0]-1, position[1])]) # Right if position[0] < BOARD_COLS-1: newboard[(position[0]+1, position[1])] = self._flip(self._state[(position[0]+1, position[1])]) # Up if position[1] > 0: newboard[(position[0], position[1]-1)] = self._flip(self._state[(position[0], position[1]-1)]) # Down if position[1] < BOARD_ROWS-1: newboard[(position[0], position[1]+1)] = self._flip(self._state[(position[0], position[1]+1)]) if not imaginary: self.previous_action = position self._state = newboard return newboard def _available_positions(self): ''' We can push any button except the one we just did ''' positions = [] for i in range(BOARD_ROWS): for j in range(BOARD_COLS): if (i, j) != self.previous_action: positions.append((i, j)) # need to be tuple return positions def _gen_solvable_board(self): ''' Generates a new solvable board ''' self._state = np.zeros((BOARD_ROWS, BOARD_COLS)) steps = self.rng.integers(1, MAX_STEPS) self.previous_action = None for i in range(steps): positions = self.availablePositions() idx = np.random.choice(len(positions)) action = positions[idx] self._take_action(position=action, imaginary=False) def __init__(self): self.rng = np.random.default_rng() self._action_spec = array_spec.BoundedArraySpec( shape=(2,), dtype=np.int, minimum=0, maximum=(BOARD_ROWS - 1, BOARD_COLS - 1), name='action') self._observation_spec = array_spec.BoundedArraySpec( shape=(BOARD_ROWS, BOARD_COLS), dtype=np.int, minimum=0, maximum=1, name='observation') self._gen_solvable_board() self._episode_ended = False self.current_steps = 0 def action_spec(self): return self._action_spec def observation_spec(self): return self._observation_spec def _reset(self): self._gen_solvable_board() self._episode_ended = False self.current_steps = 0 return ts.restart(self._state) def _step(self, action): if self._episode_ended: # The last action ended the episode. Ignore the current action and start # a new episode. return self._reset() self.current_steps += 1 if self.current_steps >= MAX_STEPS: self._episode_ended = True return ts.termination(self._state, -1) elif self._winner(): self._episode_ended = True return ts.termination(self._state, 1) else: self._take_action(action) return ts.transition( self._state, reward=0.0, discount=1.0) def main(): # New tensorflow version enviroment = gym.make("Taxi-v2").env enviroment.render() print('Number of states: {}'.format(enviroment.observation_space.n)) print('Number of actions: {}'.format(enviroment.action_space.n)) class Agent: def __init__(self, enviroment, optimizer): # Initialize atributes self._state_size = enviroment.observation_space.n self._action_size = enviroment.action_space.n self._optimizer = optimizer self.expirience_replay = deque(maxlen=2000) # Initialize discount and exploration rate self.gamma = 0.6 self.epsilon = 0.1 # Build networks self.q_network = self._build_compile_model() self.target_network = self._build_compile_model() self.alighn_target_model() def store(self, state, action, reward, next_state, terminated): self.expirience_replay.append((state, action, reward, next_state, terminated)) def _build_compile_model(self): model = Sequential() model.add(Embedding(self._state_size, 10, input_length=1)) model.add(Reshape((10,))) model.add(Dense(50, activation='relu')) model.add(Dense(50, activation='relu')) model.add(Dense(self._action_size, activation='linear')) model.compile(loss='mse', optimizer=self._optimizer) return model def alighn_target_model(self): self.target_network.set_weights(self.q_network.get_weights()) def act(self, state): if np.random.rand() <= self.epsilon: return enviroment.action_space.sample() q_values = self.q_network.predict(state) return np.argmax(q_values[0]) def retrain(self, batch_size): minibatch = random.sample(self.expirience_replay, batch_size) for state, action, reward, next_state, terminated in minibatch: target = self.q_network.predict(state) if terminated: target[0][action] = reward else: t = self.target_network.predict(next_state) target[0][action] = reward + self.gamma * np.amax(t) self.q_network.fit(state, target, epochs=1, verbose=0) def __init__(self, enviroment, optimizer): # Initialize atributes self._state_size = enviroment.observation_space.n self._action_size = enviroment.action_space.n self._optimizer = optimizer self.expirience_replay = deque(maxlen=2000) # Initialize discount and exploration rate self.gamma = 0.6 self.epsilon = 0.1 # Build networks self.q_network = self._build_compile_model() self.target_network = self._build_compile_model() self.alighn_target_model() ###################### ## Old stuff class State: def __init__(self, p1): self.rng = np.random.default_rng() self._state = np.zeros((BOARD_ROWS, BOARD_COLS)) self.player = p1 self.isEnd = False self._stateHash = None # init p1 plays first self.playerSymbol = 1 self.previous_action = None # We don't allow ourselves to hit the same button 2x self.record = {} self.record['wins'] = 0 self.record['losses'] = 0 self.record['longest'] = 0 self.record['shortest'] = LIMIT self.record['current_rounds'] = 0 self.record['decaying_average_wins'] = 0.0 self.record['decaying_average_moves'] = 1.0 * LIMIT self.reset() # get unique hash of current board state def getHash(self): self._stateHash = str(self._state.reshape(BOARD_COLS * BOARD_ROWS)) return self._stateHash def winner(self): if self.record['current_rounds'] > LIMIT: return -1 for i in range(BOARD_ROWS): for j in range(BOARD_COLS): if(self._state[i, j] != 0): return None return 1 def availablePositions(self): ''' We can push any button except the one we just did ''' positions = [] for i in range(BOARD_ROWS): for j in range(BOARD_COLS): if (i, j) != self.previous_action: positions.append((i, j)) # need to be tuple return positions def _flip(self, value): if value == 1: return 0 return 1 def updateState(self, position): ''' Chose action position, so update the board by inverting the lights in a plus ''' self._state[position] = self._flip(self._state[position]) self.previous_action = position # Left if position[0] > 0: self._state[(position[0]-1, position[1])] = self._flip(self._state[(position[0]-1, position[1])]) # Right if position[0] < BOARD_COLS-1: self._state[(position[0]+1, position[1])] = self._flip(self._state[(position[0]+1, position[1])]) # Up if position[1] > 0: self._state[(position[0], position[1]-1)] = self._flip(self._state[(position[0], position[1]-1)]) # Down if position[1] < BOARD_ROWS-1: self._state[(position[0], position[1]+1)] = self._flip(self._state[(position[0], position[1]+1)]) # only when game ends def giveReward(self): result = self.winner() # backpropagate reward # While we could use result directly, we may want to tune rewards if result == 1: #print(f'********* WINNNER *************') self.record['wins'] += 1 self.record['decaying_average_wins'] = ((99.0 * self.record['decaying_average_wins'] + 1) / 100.0) self.record['decaying_average_moves'] = ((99.0 * self.record['decaying_average_moves'] + self.record['current_rounds']) / 100.0) if self.record['current_rounds'] > self.record['longest']: self.record['longest'] = self.record['current_rounds'] if self.record['current_rounds'] < self.record['shortest']: self.record['shortest'] = self.record['current_rounds'] self.player.feedReward(1) elif result == -1: #print(f'--------- LOSER ---------------') self.record['losses'] += 1 self.record['decaying_average_wins'] = ((99.0 * self.record['decaying_average_wins'] + 0) / 100.0) self.record['decaying_average_moves'] = ((99.0 * self.record['decaying_average_moves'] + self.record['current_rounds']) / 100.0) if self.record['current_rounds'] > self.record['longest']: self.record['longest'] = self.record['current_rounds'] self.player.feedReward(-1) else: self.player.feedReward(0) def gen_solvable_board(self, steps): ''' Generates a random solvable board by starting with an empty board and pressing buttons for 'steps' times ''' self._state = np.zeros((BOARD_ROWS, BOARD_COLS)) for i in range(steps): positions = self.availablePositions() idx = np.random.choice(len(positions)) action = positions[idx] self.updateState(action) self.previous_action = None # board reset def reset(self): ''' random board ''' self.gen_solvable_board(self.rng.integers(1, MAX_STEPS)) self._stateHash = str(self._state.reshape(BOARD_COLS * BOARD_ROWS)) self.isEnd = False self.record['current_rounds'] = 0 self.previous_action = None def play(self, rounds=100): showing = False for i in range(rounds): if (i % 100) == 99 and not showing: showing = True if (i % 100) == 0 and not showing: #print(f'1000 Rounds. Showing rest of game until win.') print(f'Round {i}; Stats: {json.dumps(self.record)}') showing = False while not self.isEnd: if showing: self.showBoard() # Player positions = self.availablePositions() player_action = self.player.chooseAction(positions, self._state) # take action and upate board state if showing: print(f'Step {self.record["current_rounds"]}: Chose position: [{player_action}]') self.updateState(player_action) board_hash = self.getHash() self.player.addState(board_hash) # check board status if it is end self.record['current_rounds'] += 1 win = self.winner() if win is not None: # self.showBoard() # ended with p1 either win or draw self.giveReward() self.player.reset() self.reset() showing = False break # play with human def play2(self): while not self.isEnd: self.showBoard() positions = self.availablePositions() player_action = self.player.chooseAction(positions, self._state) # take action and upate board state self.updateState(player_action) # check board status if it is end win = self.winner() if win is not None: if win == 1: print("Player wins!") else: print("You have extraordinary patience. But lost.") self.reset() break def showBoard(self): for i in range(0, BOARD_ROWS): print('-' * (4 * BOARD_COLS + 1)) out = '| ' for j in range(0, BOARD_COLS): if self._state[i, j] == 1: token = 'O' if self._state[i, j] == 0: token = ' ' out += token + ' | ' print(out) print('-' * (4 * BOARD_COLS + 1)) class Player: def __init__(self, name, exp_rate=0.01): self.name = name self.states = [] # record all positions taken self.lr = 0.2 self.exp_rate = exp_rate self.decay_gamma = 0.9 self.states_value = {} # state -> value def getHash(self, board): boardHash = str(board.reshape(BOARD_COLS * BOARD_ROWS)) return boardHash def _flip(self, value): if value == 1: return 0 return 1 def imagineState(self, newboard, position): ''' Create a board that would be the state of the action ''' newboard[position] = self._flip(newboard[position]) # Left if position[0] > 0: newboard[(position[0]-1, position[1])] = self._flip(newboard[(position[0]-1, position[1])]) # Right if position[0] < BOARD_COLS-1: newboard[(position[0]+1, position[1])] = self._flip(newboard[(position[0]+1, position[1])]) # Up if position[1] > 0: newboard[(position[0], position[1]-1)] = self._flip(newboard[(position[0], position[1]-1)]) # Down if position[1] < BOARD_ROWS-1: newboard[(position[0], position[1]+1)] = self._flip(newboard[(position[0], position[1]+1)]) return newboard def chooseAction(self, positions, current_board): value_max = -999 found_good_state = False if np.random.uniform(0, 1) <= self.exp_rate: # take random action idx = np.random.choice(len(positions)) action = positions[idx] else: for p in positions: next_board = current_board.copy() next_board = self.imagineState(next_board, p) next_boardHash = self.getHash(next_board) value = self.states_value.get(next_boardHash) if value is not None: found_good_state = True else: value = 0.0 # print("value", value) if value >= value_max: value_max = value action = p # print("{} takes action {}".format(self.name, action)) if not found_good_state: # We didn't find anything with a value, so explore idx = np.random.choice(len(positions)) action = positions[idx] return action # append a hash state def addState(self, state): self.states.append(state) # at the end of game, backpropagate and update states value def feedReward(self, reward): for st in reversed(self.states): if self.states_value.get(st) is None: self.states_value[st] = 0 self.states_value[st] += self.lr * (self.decay_gamma * reward - self.states_value[st]) reward = self.states_value[st] def reset(self): self.states = [] def savePolicy(self): fw = open('policy_' + str(self.name), 'wb') pickle.dump(self.states_value, fw) fw.close() def loadPolicy(self, file): fr = open(file, 'rb') self.states_value = pickle.load(fr) fr.close() class HumanPlayer: def __init__(self, name): self.name = name def chooseAction(self, positions, current_board): while True: row = int(input("Input your action row:")) col = int(input("Input your action col:")) action = (row, col) if action in positions: return action # append a hash state def addState(self, state): pass # at the end of game, backpropagate and update states value def feedReward(self, reward): pass def reset(self): pass if __name__ == "__main__": # training player = Player("player") st = State(player) print("training...") st.play(50000) #player.savePolicy() # play with human human = HumanPlayer("human") st = State(human) st.play2()