%pylab inline import numpy as np from pandas import * import matplotlib.pyplot as plt from pandas import * import scipy.optimize as opt import statsmodels.api as sm from statsmodels.formula.api import ols import os import random heights_weights = read_csv('data/01_heights_weights_genders.csv') ols_fit = ols('Weight ~ Height', df = heights_weights).fit() ols_sse = ols_fit.mse_resid * (ols_fit.df_resid) print np.round(ols_fit.params, 3) print 'MSE %i' % round(ols_sse) y = heights_weights['Weight'].values Xmat = sm.add_constant(heights_weights['Height'], prepend = True) def ridge_error(params, y, Xmat, lam): ''' Compute SSE of the ridge regression. This is the normal regression SSE, plus the L2 cost of the parameters. ''' predicted = np.dot(Xmat, params) sse = ((y - predicted) ** 2).sum() sse += lam * (params ** 2).sum() return sse def ridge_grad(params, y, Xmat, lam): ''' The gradiant of the ridge regression SSE. ''' grad = np.dot(np.dot(Xmat.T, Xmat), params) - np.dot(Xmat.T, y) grad += lam * params grad *= 2 return grad def ridge_hess(params, y, Xmat, lam): ''' The hessian of the ridge regression SSE. ''' return np.dot(Xmat.T, Xmat) + np.eye(2) * lam params0 = np.array([0.0, 0.0]) ridge_fit = opt.fmin(ridge_error, params0, args = (y, Xmat, 1)) print 'Solution: a = %8.3f, b = %8.3f ' % tuple(ridge_fit) ridge_fit = opt.fmin_ncg(ridge_error, params0, fprime = ridge_grad, args = (y, Xmat, 1)) print 'Solution: a = %8.3f, b = %8.3f ' % tuple(ridge_fit) ridge_fit = opt.fmin_ncg(ridge_error, params0, fprime = ridge_grad, fhess = ridge_hess, args = (y, Xmat, 1)) print 'Solution: a = %8.3f, b = %8.3f ' % tuple(ridge_fit) ridge_fit = opt.fmin_bfgs(ridge_error, params0, fprime = ridge_grad, args = (y, Xmat, 1)) print 'Solution: ', ridge_fit lexical_database = read_csv('data/lexical_database.csv', index_col = 0, header = None, skiprows = 1, squeeze = True) lexical_database.index.name = 'word' letters = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'] ceasar_cipher = {i: j for i, j in zip(letters, letters[1:] + letters[:1])} inverse_ceasar_cipher = {ceasar_cipher[k]: k for k in ceasar_cipher} def cipher_text(text, cipher_dict = ceasar_cipher): # Split the string into a list of characters to apply # the decoder over. strlist = list(text) ciphered = ''.join([cipher_dict.get(x) or x for x in strlist]) return ciphered def decipher_text(text, cipher_dict = ceasar_cipher): # Split the string into a list of characters to apply # the decoder over. strlist = list(text) # Invert the cipher dictionary (k, v) -> (v, k) decipher_dict = {cipher_dict[k]: k for k in cipher_dict} deciphered = ''.join([decipher_dict.get(x) or x for x in strlist]) return deciphered def generate_random_cipher(): ''' Randomly generate a cipher dictionary (a one-to-one letter -> letter map). Used to generate the starting cipher of the algorithm. ''' cipher = [] input = letters output = letters[:] random.shuffle(output) cipher_dict = {k: v for (k, v) in zip(input, output)} return cipher_dict def modify_cipher(cipher_dict, input, new_output): ''' Swap a single key in a cipher dictionary. Old: a -> b, ..., m -> n, ... New: a -> n, ..., m -> b, ... ''' decipher_dict = {cipher_dict[k]: k for k in cipher_dict} old_output = cipher_dict[input] new_cipher = cipher_dict.copy() new_cipher[input] = new_output new_cipher[decipher_dict[new_output]] = old_output return new_cipher def propose_modified_cipher_from_cipher(text, cipher_dict, lexical_db = lexical_database): ''' Generates a new cipher by choosing and swapping a key in the current cipher. ''' _ = text # Unused input = random.sample(cipher_dict.keys(), 1)[0] new_output = random.sample(letters, 1)[0] return modify_cipher(cipher_dict, input, new_output) def propose_modified_cipher_from_text(text, cipher_dict, lexical_db = lexical_database): ''' Generates a new cipher by choosing a swapping a key in the current cipher, but only chooses keys that are letters that appear in the gibberish words in the current text. ''' deciphered = decipher_text(text, cipher_dict).split() letters_to_sample = ''.join([t for t in deciphered if lexical_db.get(t) is None]) letters_to_sample = letters_to_sample or ''.join(set(deciphered)) input = random.sample(letters_to_sample, 1)[0] new_output = random.sample(letters, 1)[0] return modify_cipher(cipher_dict, input, new_output) def one_gram_prob(one_gram, lexical_db = lexical_database): return lexical_db.get(one_gram) or np.finfo(float).eps def text_logp(text, cipher_dict, lexical_db = lexical_database): deciphered = decipher_text(text, cipher_dict).split() logp = np.array([math.log(one_gram_prob(w)) for w in deciphered]).sum() return logp def metropolis_step(text, cipher_dict, proposal_rule, lexical_db = lexical_database): proposed_cipher = proposal_rule(text, cipher_dict) lp1 = text_logp(text, cipher_dict) lp2 = text_logp(text, proposed_cipher) if lp2 > lp1: return proposed_cipher else: a = math.exp(lp2 - lp1) x = random.random() if x < a: return proposed_cipher else: return cipher_dict message = 'here is some sample text' ciphered_text = cipher_text(message, ceasar_cipher) niter = 250000 def metropolis_decipher(ciphered_text, proposal_rule, niter, seed = 4): random.seed(seed) cipher = generate_random_cipher() deciphered_text_list = [] logp_list = [] for i in xrange(niter): logp = text_logp(ciphered_text, cipher) current_deciphered_text = decipher_text(ciphered_text, cipher) deciphered_text_list.append(current_deciphered_text) logp_list.append(logp) cipher = metropolis_step(ciphered_text, cipher, proposal_rule) results = DataFrame({'deciphered_text': deciphered_text_list, 'logp': logp_list}) results.index = np.arange(1, niter + 1) return results results0 = metropolis_decipher(ciphered_text, propose_modified_cipher_from_cipher, niter) results0.ix[10000::10000] results1 = metropolis_decipher(ciphered_text, propose_modified_cipher_from_text, niter) results1.ix[10000::10000] plt.figure(figsize = (10, 7)) results0.logp.plot(style = 'r-', label = 'propose from cipher') results1.logp.plot(style = 'k-', label = 'propose from text') plt.xlabel('Iteration') plt.ylabel('Log Probability') plt.axhline(text_logp(ciphered_text, ceasar_cipher), label = 'log prob of true text') plt.legend(loc = 'best') def maxprob_message(word_lens = (4, 2, 4, 6, 4), lexical_db = lexical_database): db_word_series = Series(lexical_db.index) db_word_len = db_word_series.str.len() max_prob_wordlist = [] logp = 0.0 for i in word_lens: db_words_i = list(db_word_series[db_word_len == i]) db_max_prob_word = lexical_db[db_words_i].idxmax() logp += math.log(lexical_db[db_words_i].max()) max_prob_wordlist.append(db_max_prob_word) return max_prob_wordlist, logp maxprob_message()