123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- import json
- import numpy as np
- import pickle
- MAX_STEPS = 20 # Maximum number of steps when generating a board
- BOARD_ROWS = 5
- BOARD_COLS = 5
- LIMIT = 100 # start a new game if it takes this many
- class State:
- def __init__(self, p1):
- self.rng = np.random.default_rng()
- self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
- self.player = p1
- self.isEnd = False
- self.boardHash = None
- # init p1 plays first
- self.playerSymbol = 1
- self.previous_action = None # We don't allow ourselves to hit the same button 2x
- self.record = {}
- self.record['wins'] = 0
- self.record['losses'] = 0
- self.record['longest'] = 0
- self.record['shortest'] = LIMIT
- self.record['current_rounds'] = 0
- self.record['decaying_average_wins'] = 0.0
- self.record['decaying_average_moves'] = 1.0 * LIMIT
- self.reset()
- # get unique hash of current board state
- def getHash(self):
- self.boardHash = str(self.board.reshape(BOARD_COLS * BOARD_ROWS))
- return self.boardHash
- def winner(self):
- if self.record['current_rounds'] > LIMIT:
- return -1
- for i in range(BOARD_ROWS):
- for j in range(BOARD_COLS):
- if(self.board[i, j] != 0):
- return None
- return 1
- def availablePositions(self):
- ''' We can push any button except the one we just did '''
- positions = []
- for i in range(BOARD_ROWS):
- for j in range(BOARD_COLS):
- if (i, j) != self.previous_action:
- positions.append((i, j)) # need to be tuple
- return positions
- def _flip(self, value):
- if value == 1:
- return 0
- return 1
- def updateState(self, position):
- ''' Chose action position, so update the board by inverting the lights in a plus '''
- self.board[position] = self._flip(self.board[position])
- self.previous_action = position
- # Left
- if position[0] > 0:
- self.board[(position[0]-1, position[1])] = self._flip(self.board[(position[0]-1, position[1])])
- # Right
- if position[0] < BOARD_COLS-1:
- self.board[(position[0]+1, position[1])] = self._flip(self.board[(position[0]+1, position[1])])
- # Up
- if position[1] > 0:
- self.board[(position[0], position[1]-1)] = self._flip(self.board[(position[0], position[1]-1)])
- # Down
- if position[1] < BOARD_ROWS-1:
- self.board[(position[0], position[1]+1)] = self._flip(self.board[(position[0], position[1]+1)])
- # only when game ends
- def giveReward(self):
- result = self.winner()
- # backpropagate reward
- # While we could use result directly, we may want to tune rewards
- if result == 1:
- #print(f'********* WINNNER *************')
- self.record['wins'] += 1
- self.record['decaying_average_wins'] = ((99.0 * self.record['decaying_average_wins'] + 1) / 100.0)
- self.record['decaying_average_moves'] = ((99.0 * self.record['decaying_average_moves'] + self.record['current_rounds']) / 100.0)
- if self.record['current_rounds'] > self.record['longest']:
- self.record['longest'] = self.record['current_rounds']
- if self.record['current_rounds'] < self.record['shortest']:
- self.record['shortest'] = self.record['current_rounds']
- self.player.feedReward(1)
- elif result == -1:
- #print(f'--------- LOSER ---------------')
- self.record['losses'] += 1
- self.record['decaying_average_wins'] = ((99.0 * self.record['decaying_average_wins'] + 0) / 100.0)
- self.record['decaying_average_moves'] = ((99.0 * self.record['decaying_average_moves'] + self.record['current_rounds']) / 100.0)
- if self.record['current_rounds'] > self.record['longest']:
- self.record['longest'] = self.record['current_rounds']
- self.player.feedReward(-1)
- else:
- self.player.feedReward(0)
- def gen_solvable_board(self, steps):
- ''' Generates a random solvable board by starting with an empty board
- and pressing buttons for 'steps' times
- '''
- self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
- for i in range(steps):
- positions = self.availablePositions()
- idx = np.random.choice(len(positions))
- action = positions[idx]
- self.updateState(action)
- self.previous_action = None
- # board reset
- def reset(self):
- ''' random board '''
- self.gen_solvable_board(self.rng.integers(1, MAX_STEPS))
- self.boardHash = str(self.board.reshape(BOARD_COLS * BOARD_ROWS))
- self.isEnd = False
- self.record['current_rounds'] = 0
- self.previous_action = None
- def play(self, rounds=100):
- showing = False
- for i in range(rounds):
- if (i % 100) == 99 and not showing:
- showing = True
- if (i % 100) == 0 and not showing:
- #print(f'1000 Rounds. Showing rest of game until win.')
- print(f'Round {i}; Stats: {json.dumps(self.record)}')
- showing = False
- while not self.isEnd:
- if showing:
- self.showBoard()
- # Player
- positions = self.availablePositions()
- player_action = self.player.chooseAction(positions, self.board)
- # take action and upate board state
- if showing:
- print(f'Step {self.record["current_rounds"]}: Chose position: [{player_action}]')
- self.updateState(player_action)
- board_hash = self.getHash()
- self.player.addState(board_hash)
- # check board status if it is end
- self.record['current_rounds'] += 1
- win = self.winner()
- if win is not None:
- # self.showBoard()
- # ended with p1 either win or draw
- self.giveReward()
- self.player.reset()
- self.reset()
- showing = False
- break
- # play with human
- def play2(self):
- while not self.isEnd:
- self.showBoard()
- positions = self.availablePositions()
- player_action = self.player.chooseAction(positions, self.board)
- # take action and upate board state
- self.updateState(player_action)
- # check board status if it is end
- win = self.winner()
- if win is not None:
- if win == 1:
- print("Player wins!")
- else:
- print("You have extraordinary patience. But lost.")
- self.reset()
- break
- def showBoard(self):
- for i in range(0, BOARD_ROWS):
- print('-' * (4 * BOARD_COLS + 1))
- out = '| '
- for j in range(0, BOARD_COLS):
- if self.board[i, j] == 1:
- token = 'O'
- if self.board[i, j] == 0:
- token = ' '
- out += token + ' | '
- print(out)
- print('-' * (4 * BOARD_COLS + 1))
- class Player:
- def __init__(self, name, exp_rate=0.01):
- self.name = name
- self.states = [] # record all positions taken
- self.lr = 0.2
- self.exp_rate = exp_rate
- self.decay_gamma = 0.9
- self.states_value = {} # state -> value
- def getHash(self, board):
- boardHash = str(board.reshape(BOARD_COLS * BOARD_ROWS))
- return boardHash
- def _flip(self, value):
- if value == 1:
- return 0
- return 1
- def imagineState(self, newboard, position):
- ''' Create a board that would be the state of the action '''
- newboard[position] = self._flip(newboard[position])
- # Left
- if position[0] > 0:
- newboard[(position[0]-1, position[1])] = self._flip(newboard[(position[0]-1, position[1])])
- # Right
- if position[0] < BOARD_COLS-1:
- newboard[(position[0]+1, position[1])] = self._flip(newboard[(position[0]+1, position[1])])
- # Up
- if position[1] > 0:
- newboard[(position[0], position[1]-1)] = self._flip(newboard[(position[0], position[1]-1)])
- # Down
- if position[1] < BOARD_ROWS-1:
- newboard[(position[0], position[1]+1)] = self._flip(newboard[(position[0], position[1]+1)])
- return newboard
- def chooseAction(self, positions, current_board):
- value_max = -999
- found_good_state = False
- if np.random.uniform(0, 1) <= self.exp_rate:
- # take random action
- idx = np.random.choice(len(positions))
- action = positions[idx]
- else:
- for p in positions:
- next_board = current_board.copy()
- next_board = self.imagineState(next_board, p)
- next_boardHash = self.getHash(next_board)
- value = self.states_value.get(next_boardHash)
- if value is not None:
- found_good_state = True
- else:
- value = 0.0
- # print("value", value)
- if value >= value_max:
- value_max = value
- action = p
- # print("{} takes action {}".format(self.name, action))
- if not found_good_state:
- # We didn't find anything with a value, so explore
- idx = np.random.choice(len(positions))
- action = positions[idx]
- return action
- # append a hash state
- def addState(self, state):
- self.states.append(state)
- # at the end of game, backpropagate and update states value
- def feedReward(self, reward):
- for st in reversed(self.states):
- if self.states_value.get(st) is None:
- self.states_value[st] = 0
- self.states_value[st] += self.lr * (self.decay_gamma * reward - self.states_value[st])
- reward = self.states_value[st]
- def reset(self):
- self.states = []
- def savePolicy(self):
- fw = open('policy_' + str(self.name), 'wb')
- pickle.dump(self.states_value, fw)
- fw.close()
- def loadPolicy(self, file):
- fr = open(file, 'rb')
- self.states_value = pickle.load(fr)
- fr.close()
- class HumanPlayer:
- def __init__(self, name):
- self.name = name
- def chooseAction(self, positions, current_board):
- while True:
- row = int(input("Input your action row:"))
- col = int(input("Input your action col:"))
- action = (row, col)
- if action in positions:
- return action
- # append a hash state
- def addState(self, state):
- pass
- # at the end of game, backpropagate and update states value
- def feedReward(self, reward):
- pass
- def reset(self):
- pass
- if __name__ == "__main__":
- # training
- player = Player("player")
- st = State(player)
- print("training...")
- st.play(50000)
- #player.savePolicy()
- # play with human
- human = HumanPlayer("human")
- st = State(human)
- st.play2()
|