import numpy as np
import theano
import theano.tensor as T
import lasagne
floatX = theano.config.floatX
floatX
Using gpu device 0: GeForce GTX 965M (CNMeM is disabled, CuDNN 4007) /usr/local/lib/python3.4/dist-packages/theano/tensor/signal/downsample.py:5: UserWarning: downsample module has been moved to the pool module. warnings.warn("downsample module has been moved to the pool module.")
Couldn't import dot_parser, loading of dot files will not be possible.
'float32'
from IPython.display import HTML, display
%load_ext Cython
%%cython
# cython: infer_types=True, annotation_typing=True
## cython: infer_types.verbose=True
from IPython.display import HTML, display
import numpy as np
floatX = np.float32
binary6 = np.array([ list(map(int,bin(2**6+i)[:2:-1])) for i in range(2**6)], dtype=floatX)
height = np.array([-1]*65, dtype=np.int32)
for __i in range(6):
height[2**__i]=__i
cdef class Connect4:
cdef public:
long turn
long long[2] data
cpdef long get_col_row(self, col: long, row: long):
pos = col * 7 + row
mask = (<long long>1) << pos
if self.data[1] & mask:
return 2
return bool(self.data[0] & mask)
cpdef long is_end(self):
cdef long long mask
bitboard = self.data[1-self.turn%2]
bound = (<long long>1)<<48 # 49 = 7*(6+1)
# horizontal: 0x204081 = 1|(1<<7)|(1<<14)|(1<<21)
# vertical: 0xf = 1|(1<<1)|(1<<2)|(1<<3)
# up-right: 0x1010101 = 1|(1<<8)|(1<<16)|(1<<24)
# down-right: 0x208208 = (1<<3)|(1<<9)|(1<<15)|(1<<21)
for mask in [0x204081, 0xf, 0x1010101, 0x208208]:
while mask < bound:
if mask & bitboard == mask:
return True
mask <<= 1
return False
cpdef set_col_row(self, col:long, row:long, value:long):
# assert value in [0,1,2]
pos = col * 7 + row
mask = (<long long>1) << pos
neg_mask = ~mask
if value == 1 or value ==2:
self.data[value-1] |= mask
self.data[2-value] &= neg_mask
else:
self.data[0] &= neg_mask
self.data[1] &= neg_mask
def __init__(self, data=None, turn=0):
if data is not None:
self.data = data[:]
else:
self.data = [0, 0]
self.turn = turn
cpdef remove(self, col:long):
shift = col*7
mask = (((self.data[0]|self.data[1]) >> shift) &0x3f) +1
mask = (mask >> 1) << shift
# print(shift, hex(mask), hex(self.data[0]), hex(self.data[1]))
neg_mask = ~mask
self.data[0] &= neg_mask
self.data[1] &= neg_mask
def _np_branch(self):
c = self.turn%2 # who's turn
base = np.zeros((2,7,6), dtype=floatX)
pos = []
moves = []
red, yellow = self.data
for i in range(7):
mask = ((red|yellow) &0x3f) + 1
p = height[mask]
if p != -1:
moves.append(i)
pos.append(height[mask])
base[c, i] = binary6[red&0x3f]
base[1-c, i] = binary6[yellow&0x3f]
red >>= 7
yellow >>= 7
boards = np.zeros( (len(moves), 2, 7, 6), dtype=floatX)
for i in range(len(moves)):
m = moves[i]
p = pos[i]
boards[i]=base
boards[i, 0, m, p] = 1
return moves, boards
def _np_board(self):
c = (self.turn-1)%2 # who played
board = np.zeros((2, 7, 6), dtype=floatX)
pos = []
moves = []
red, yellow = self.data
for i in range(7):
mask = ((red|yellow) &0x3f) + 1
p = height[mask]
if p != -1:
moves.append(i)
pos.append(height[mask])
board[c, i] = binary6[red&0x3f]
board[1-c, i] = binary6[yellow&0x3f]
red >>= 7
yellow >>= 7
return board
cpdef move(self, col:long, test=False):
# assert 0<= col <7
shift = col*7
mask = (((self.data[0]|self.data[1]) >> shift) &0x3f) +1
# print("mask=", mask)
if mask >= 64:
return None
if not test:
self.data[self.turn%2] |= (mask<<shift)
self.turn += 1
return self
def board_data(self):
for i in range(7):
for j in range(6):
c = self.get_col_row(i,j)
if c!=0:
yield i,j,c
def _repr_html_(self):
def pos(i):
return int(7+(220-6.5)*i/8)
imgstr = "<img src='img/%s.png' width='23px' height='23px' style='position: absolute; top: %spx; left: %spx;margin-top: 0;z-index: %d' />"
header = """<div style="width: 200px; height:180px;position: relative;background: blue">"""
header += "\n".join(imgstr%('empty', pos(5-j), pos(i), 0) for i in range(7) for j in range(6))
return header +"\n".join(imgstr%('red_coin' if c==1 else 'yellow_coin', pos(5-j), pos(i), 2) for (i,j,c) in self.board_data()) +"</div>"
def display(self):
display(HTML(self._repr_html_()))
def __repr__(self):
row_str = lambda j: "".join(".ox"[self.get_col_row(i,j)] for i in range(7))
return "\n".join(row_str(j) for j in range(5,-1,-1))
from random import randint
def random_play(init_data=None, init_turn=0, display=False):
game = Connect4(init_data, init_turn)
while game.turn < 42 and not game.is_end():
while game.move(randint(0,6)) is None:
continue
if display:
game.display()
if game.is_end():
return game.turn
return 0
def MC_agent(_game, N=200):
score = [-1.0*N]*7
for i in range(7):
game = Connect4(_game.data, _game.turn)
if game.move(i):
if game.is_end():
return i
s = 0
for j in range(N):
#print("move", i, "case", j)
r = random_play(game.data, game.turn)
turn = (r-1)%2
if r == 0:
pass
elif (r-1)%2 == _game.turn%2:
s += 0.95** (r-_game.turn-1)
else:
s -= .95** (r-_game.turn-1)
score[i] = s/N
return max(zip(score, range(7)))[1]
def random_vs_MC(init_data=None, init_turn=0, display=False):
game = Connect4(init_data, init_turn)
while game.turn < 42 and not game.is_end():
if game.turn%2 == 0:
while game.move(randint(0,6)) is None:
continue
else:
i = MC_agent(game)
game.move(i)
if display == 'all':
game.display()
if display:
game.display()
if game.is_end():
return game.turn
return 0
input_var = T.tensor4('inputs')
target_var = T.vector('targets')
l_in = lasagne.layers.InputLayer(shape=(None, 2, 7, 6), input_var=input_var)
#l_in_drop = lasagne.layers.DropoutLayer(l_in, p=0.2)
l_hidden = lasagne.layers.DenseLayer(l_in, num_units=400, nonlinearity=lasagne.nonlinearities.rectify, W=lasagne.init.GlorotUniform())
l_hidden2 = lasagne.layers.DenseLayer(l_hidden, num_units=200, nonlinearity=lasagne.nonlinearities.rectify, W=lasagne.init.GlorotUniform())
l_hidden2_drop = lasagne.layers.DropoutLayer(l_hidden2, p=0.2)
l_hidden3 = lasagne.layers.DenseLayer(l_hidden2_drop, num_units=40, nonlinearity=lasagne.nonlinearities.rectify, W=lasagne.init.GlorotUniform())
l_out = lasagne.layers.DenseLayer(l_hidden3, num_units=1, nonlinearity=lasagne.nonlinearities.tanh, W=lasagne.init.GlorotUniform())
prediction = lasagne.layers.get_output(l_out).flatten()
V = theano.function([input_var], prediction)
#loss = lasagne.objectives.binary_crossentropy(prediction, target_var)
loss = lasagne.objectives.squared_error(prediction, target_var)
loss = loss.mean()
params = lasagne.layers.get_all_params(l_out, trainable=True)
updates = lasagne.updates.adam(loss, params)
train_fn = theano.function([input_var, target_var], loss, updates=updates)
test_prediction = lasagne.layers.get_output(l_out, deterministic=True).flatten()
test_V = theano.function([input_var], T.gt(test_prediction, 0.))
from random import random, randint
def player_NN(game):
moves, boards = game._np_branch()
return moves[np.argmax(V(boards))]
def player_random(game):
while 1:
r = randint(0,6)
if game.move(r, test=True) is not None:
return r
def get_player_MC(N=100):
def player(game):
return MC_agent(game, N=N)
return player
def get_player_mixed(*settings):
def player(game):
r = random()
for player, prop in settings:
r-=prop
if r<=0:
return player(game)
return player_random(game)
return player
def vs(player1, player2, display=False):
game = Connect4()
history = []
while game.turn < 42 and not game.is_end():
if game.turn%2 == 0:
m = player1(game)
else:
m = player2(game)
game.move(m)
history.append(m)
if display == 'all':
game.display()
if display:
game.display()
if game.is_end():
return game.turn, history
return 0, history
def train_if(results):
def train(r, history, old_histories, γ=0.95):
_r = -1 if r ==0 else (r-1)%2
if _r not in results:
return
game = Connect4()
board_history = []
for m in history:
game.move(m)
board_history.append(game._np_board())
estimate_V = np.zeros(len(history), dtype=floatX)
if _r==-1:
r = 0.
else:
r = 1.
for i in range(len(history)-1, -1, -1):
estimate_V[i]=r
r *= -γ
old_histories.append( (board_history, estimate_V) )
data0 = np.array([x for h in old_histories for x in h[0]], dtype=floatX)
data1 = np.array([x for h in old_histories for x in h[1]], dtype=floatX)
loss = train_fn(data0, data1)
return train
def vs_test(player1, player2, old_histories=[], ngames=1000, train=None):
result = [0,0,0]
for i in range(ngames):
r, history = vs(player1, player2)
if r == 0:
result[0]+=1
else:
result[1 + (r-1)%2]+=1
if train is not None:
train(r, history, old_histories)
old_histories= old_histories[-10:]
return result
train2 = train=train_if([-1,0])
train1 = train=train_if([-1,1])
train_all = train=train_if([-1,0,1])
import time
import sys
start_time=time.time()
def run_game(V, verbose = False, ɛ=0.1, γ=0.95):
game = Connect4()
history=[]
runtime_V=[]
while game.turn < 42 and not game.is_end():
s = 1 if game.turn%2 == 0 else -1
moves, boards = game._np_branch()
#print(boards)
if random() < ɛ:
idx = randint(0, len(moves)-1)
values=[None]*len(moves)
else:
values = V(boards)
idx = np.argmax(values)
m = moves[idx]
game.move(m)
history.append(boards[idx])
runtime_V.append(values[idx])
if game.is_end():
result = 1.
else:
result = 0. # Tie
# train here
#game.display()
estimate_V = np.zeros(len(history), dtype=floatX)
r = result
for i in range(len(history)-1, -1, -1):
estimate_V[i]=r
r *= -γ
loss = train_fn(np.array(history, dtype=floatX), estimate_V)
return loss
total_loss = 0
N = 5000
print("time:",time.time()-start_time)
print("inital result: mc100 vs nn: %s, nn vs mc100 %s"%(vs_test(get_player_MC(100), player_NN, ngames=100),
vs_test(player_NN, get_player_MC(100), ngames=100)))
print("time:",time.time()-start_time)
for i in range(100*N):
total_loss += run_game(V)
if i%N==N-1:
print("time:",time.time()-start_time)
if i%(10*N)==10*N-1:
print("#%d midterm avgloss=%f"%(i+1, total_loss/N))
print(" mc100 vs nn: %s, nn vs mc100 %s"%(vs_test(get_player_MC(100), player_NN, ngames=100, train=train2),
vs_test(player_NN, get_player_MC(100), ngames=100, train=train1)) )
else:
print("#%d avgloss=%f, rand vs nn: %s, nn vs rand %s"%(i+1, total_loss/N, vs_test(player_random, player_NN, ngames=1000, train=train2),
vs_test(player_NN, player_random, ngames=1000, train=train1)) )
total_loss = 0
sys.stdout.flush()
time: 0.0003426074981689453 inital result: mc100 vs nn: [0, 100, 0], nn vs mc100 [0, 0, 100] time: 11.120651960372925 time: 61.95448708534241 #5000 avgloss=0.406109, rand vs nn: [2, 307, 691], nn vs rand [1, 752, 247] time: 116.81879782676697 #10000 avgloss=0.424629, rand vs nn: [2, 128, 870], nn vs rand [1, 870, 129] time: 171.35795402526855 #15000 avgloss=0.412438, rand vs nn: [1, 164, 835], nn vs rand [1, 848, 151] time: 231.119473695755 #20000 avgloss=0.397607, rand vs nn: [0, 40, 960], nn vs rand [0, 967, 33] time: 285.53651309013367 #25000 avgloss=0.383559, rand vs nn: [1, 31, 968], nn vs rand [1, 959, 40] time: 340.74573159217834 #30000 avgloss=0.394265, rand vs nn: [0, 48, 952], nn vs rand [0, 987, 13] time: 394.1741552352905 #35000 avgloss=0.383031, rand vs nn: [1, 22, 977], nn vs rand [0, 998, 2] time: 447.6618604660034 #40000 avgloss=0.384509, rand vs nn: [0, 12, 988], nn vs rand [0, 993, 7] time: 502.2312698364258 #45000 avgloss=0.369094, rand vs nn: [0, 11, 989], nn vs rand [0, 997, 3] time: 560.1054155826569 #50000 midterm avgloss=0.351583
--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) <ipython-input-12-8b7cf2b99936> in <module>() 45 if i%(10*N)==10*N-1: 46 print("#%d midterm avgloss=%f"%(i+1, total_loss/N)) ---> 47 print(" mc100 vs nn: %s, nn vs mc100 %s"%(vs_test(get_player_MC(100), player_NN, ngames=100, train=train2), 48 vs_test(player_NN, get_player_MC(100), ngames=100, train=train1)) ) 49 else: <ipython-input-10-88f1f5773411> in vs_test(player1, player2, old_histories, ngames, train) 26 result = [0,0,0] 27 for i in range(ngames): ---> 28 r, history = vs(player1, player2) 29 if r == 0: 30 result[0]+=1 <ipython-input-9-0542074bfb98> in vs(player1, player2, display) 30 while game.turn < 42 and not game.is_end(): 31 if game.turn%2 == 0: ---> 32 m = player1(game) 33 else: 34 m = player2(game) <ipython-input-9-0542074bfb98> in player(game) 11 def get_player_MC(N=100): 12 def player(game): ---> 13 return MC_agent(game, N=N) 14 return player 15 <ipython-input-5-eb10225d808a> in MC_agent(_game, N) 9 for j in range(N): 10 #print("move", i, "case", j) ---> 11 r = random_play(game.data, game.turn) 12 turn = (r-1)%2 13 if r == 0: KeyboardInterrupt: