## Restricted Boltzman Machine
class FRBM(object):
def __init__(self, n_visible, n_hidden,
W = None, b_hid = None, b_vis = None,
X = None):
self.rng = np.random.RandomState(0)
self.theano_rng = RandomStreams(self.rng.randint(2**30))
## model inputs
self.X = X or T.matrix('X')
## model params
self.n_visible = n_visible
self.n_hidden = n_hidden
if W is None:
W_bound = np.sqrt(6. / (n_hidden + n_visible))
W = theano.shared(value = np.asarray(
self.rng.uniform(
low = -4. * W_bound,
high = 4. * W_bound,
size = (n_visible, n_hidden)),
dtype = theano.config.floatX),
name = 'RBM_W', borrow = True)
if b_hid is None:
b_hid = theano.shared(np.zeros(n_hidden, dtype=theano.config.floatX),
name = 'RBM_b_hid', borrow = True)
if b_vis is None:
b_vis = theano.shared(np.zeros(n_visible, dtype = theano.config.floatX),
name = 'RBM_b_vis', borrow = True)
self.W = W
self.b_hid = b_hid
self.b_vis = b_vis
self.params = [self.W, self.b_hid, self.b_vis]
## model predictions
## model cost and error
def free_energy(self, v_sample):
wx_b = T.dot(v_sample, self.W) + self.b_hid
vbias_term = T.dot(v_sample, self.b_vis)
hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis = 1)
return -hidden_term - vbias_term
def propup(self, vis):
pre_sigmoid_activation = T.dot(vis, self.W) + self.b_hid
return (pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation))
def sample_h_given_v(self, v0_sample):
pre_sigmoid_h1, h1_mean = self.propup(v0_sample)
h1_sample = self.theano_rng.binomial(size = h1_mean.shape,
n = 1, p=h1_mean, dtype = theano.config.floatX)
return [pre_sigmoid_h1, h1_mean, h1_sample]
def propdown(self, hid):
pre_sigmoid_activation = T.dot(hid, self.W.T) + self.b_vis
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]
def sample_v_given_h(self, h0_sample):
pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)
v1_sample = self.theano_rng.binomial(size = v1_mean.shape,
n=1, p=v1_mean,
dtype = theano.config.floatX)
return (pre_sigmoid_v1, v1_mean, v1_sample)
def gibbs_hvh(self, h0_sample):
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)
return (pre_sigmoid_v1, v1_mean, v1_sample,
pre_sigmoid_h1, h1_mean, h1_sample)
def gibbs_vhv(self, v0_sample):
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)
return (pre_sigmoid_h1, h1_mean, h1_sample,
pre_sigmoid_v1, v1_mean, v1_sample)
def get_cost_updates(lr, persistent = None, k = 1)
def run_rbm(v_train_X, batch_size = 20, learning_rate = 0.1, n_epochs = 15,
n_chains = 20, n_samples = 10, n_hidden = 500):
n_train_batches = v_train_X.get_value(borrow = True).shape[0] / batch_size
index = T.lscalar()
x = T.matrix('x')
## initialize storage for the persistent chain
persistent_chain = theano.shared(np.zeros((batch_size, n_hidden),
dtype = theano.config.floatX),
borrow = True)
rbm = FRBM(X = x, n_visible = 28 * 28, n_hidden = n_hidden)
cost, updates = rbm.get_cost_updates(lr = learning_rate,
persistent = persistent_chain,
k = 15)
## train the rbm
train_rbm = theano.function([index],
cost,
updates = updates,
givens = {
x: v_train_X[index*batch_size:(index+1)*batch_size]
},
name = 'train_rbm')
for epoch in xrange(n_epochs):
mean_cost = np.mean([train_rbm(i) for i in xrange(n_train_batches)])
print 'training epoch %d, cost is ' % epoch, np.mean(mean_cost)