from hypergraph.hypergraph_models import HyperGraph from hypergraph.markov_diffusion import (create_markov_matrix_model_nodes, create_markov_matrix_model_hyper_edges) from matplotlib import pyplot as plt def create_line_hypergraph(N, cardinality): HG = HyperGraph() nodes = range(N) hyper_edges = [range(i, i + cardinality) for i in range(N + 1 - cardinality)] HG.add_nodes_from(nodes) HG.add_edges_from(hyper_edges) return HG import pykov def transition_matrix_to_pykov_chain(matrix): chain = pykov.Chain() for i, row in enumerate(matrix): for j, column in enumerate(row): chain[(i, j)] = column return chain %matplotlib inline from collections import Counter import numpy as np np.random.multinomial(1000, [1/3.]*3, size=1) import numpy as np from scipy.stats import norm import matplotlib.pyplot as plt def fit_normal_distribution(data): # Fit a normal distribution to the data: mu, std = norm.fit(data) # Plot the PDF. xmin, xmax = plt.xlim() x = np.linspace(xmin, xmax, 100) p = norm.pdf(x, mu, std) plt.plot(x, p, 'k', linewidth=2) title = "Fit results: mu = %.2f, std = %.2f" % (mu, std) plt.title(title) plt.show() return std def centralize_bins(xs): return xs[:-1] + (1 / 2) * (xs[1] - xs[0]) def simulate_walk_on_line(size_of_edge=3, steps=1000): max_len = 500 start = max_len / 2 size = size_of_edge HG = create_line_hypergraph(max_len, size) hypergraph_line_walk_matrix = create_markov_matrix_model_nodes(HG) chain = transition_matrix_to_pykov_chain(hypergraph_line_walk_matrix) c = Counter() all_items = [] for _ in range(200): all_items += chain.walk(steps=steps, start=start) plt.figure(figsize=(10, 8)) ys, bins, _ = plt.hist(all_items, bins=25, normed=True) return centralize_bins(bins), ys, all_items def compute_expected_std(n, steps): return np.sqrt(np.var([(i - n // 2) for i in range(n)]) * steps) compute_expected_std(3, 2000) def parametrize(parameter_name, parameter_values): def decorator(fn): def wrapper(*args, **kwargs): return_values = [] for parameter_value in parameter_values: kwargs[parameter_name] = parameter_value return_values.append(fn(*args, **kwargs)) return return_values return wrapper return decorator steps = [100, 200, 300, 1000, 2000] sizes = [3, 4, 5, 6, 7] @parametrize("steps", steps) @parametrize("size", sizes) def show_walks(size, steps): print("size", size) print("steps", steps) plt.figure() xs, ys, all_items = simulate_walk_on_line(size, steps) std = fit_normal_distribution(all_items) expected_std = compute_expected_std(size, steps) print(std, expected_std) return std, expected_std res = show_walks() plt.figure(figsize=(12, 9)) for step, series in zip(steps, res): first, second = zip(*series) plt.plot(sizes, first, 'o', label='real, %s steps' % step, markersize=10) plt.plot(sizes, second, label='expected, %s steps' % step) plt.xlabel('Hyperedge cardinality') plt.ylabel('Stdev of normal fit') plt.legend(loc=0) plt.title('Random walks 1D using hypergraphs') ys = [] xs = [3, 5, 7, 9] for x in xs: ys.append(compute_expected_std(x, 10000)) plt.plot(xs, ys) def compute_expected_std_easy(n): return np.sqrt(np.var([(i - n // 2) for i in range(n)])) def compute_expected_std_easy2(n): return np.sqrt(np.var([(i - n // 2) for i in range(n)]) * 10000) xs = np.arange(1, 101, 1) ys = [compute_expected_std_easy(x) for x in xs] ys2 = [compute_expected_std_easy2(x) for x in xs] from scipy.optimize import curve_fit def line(x, a, b): return a * x + b curve_fit(line, xs[1:], ys[1:]) from matplotlib import pyplot as plt from hypergraph.utils import plot_transition_matrix max_len = 500 start = max_len / 2 size = 3 HG = create_line_hypergraph(max_len, size) tiny_HG = create_line_hypergraph(10, size) hypergraph_line_walk_edges_matrix = create_markov_matrix_model_hyper_edges(HG) tiny_hypergraph_line_walk_edges_matrix = create_markov_matrix_model_hyper_edges(tiny_HG) plot_transition_matrix(tiny_hypergraph_line_walk_edges_matrix) tiny_HG.hyper_edges() def simulate_walk_on_line_edge(HG, steps=1000): hypergraph_line_walk_matrix = create_markov_matrix_model_hyper_edges(HG) print('matrix ready') chain = transition_matrix_to_pykov_chain(hypergraph_line_walk_matrix) c = Counter() all_items = [] for _ in range(2000): all_items += chain.walk(steps=steps, start=start) plt.figure(figsize=(10, 8)) all_nodes = convert_covered_edges_to_nodes(all_items, HG.hyper_edges()) ys, bins, _ = plt.hist(all_nodes, bins=25, normed=True) return centralize_bins(bins), ys, all_nodes simulate_walk_on_line_edge(3, 100); def convert_covered_edges_to_nodes(all_items, hyperedges): all_nodes = [] for item in all_items: all_nodes += hyperedges[int(item)] return all_nodes steps = [100, 1000] sizes = [3] @parametrize("steps", steps) @parametrize("size", sizes) def show_walks(size, steps): print("size", size) print("steps", steps) plt.figure() max_len = 500 HG = create_line_hypergraph(max_len, size) xs, ys, all_nodes = simulate_walk_on_line_edge(HG, steps) std = fit_normal_distribution(all_nodes) expected_std = compute_expected_std(size, steps) print(std, expected_std) return std, expected_std res = show_walks() plt.figure(figsize=(12, 9)) for step, series in zip(steps, res): first, second = zip(*series) plt.plot(sizes, first, 'o', label='real, %s steps' % step, markersize=10) plt.plot(sizes, second, label='expected, %s steps' % step) plt.xlabel('Hyperedge cardinality') plt.ylabel('Stdev of normal fit') plt.legend(loc=0) plt.title('Random walks 1D using hypergraphs') np.sqrt(1/ 12) def prepare_random_walk_transition_matrix(N): matrix = np.zeros((N, N)) for i in range(1, N - 1): matrix[i][i + 1] = 0.5 matrix[i][i - 1] = 0.5 matrix[0][1] = 1 matrix[-1][-2] = 1 return matrix matrix_size = 10 markov_matrix = prepare_random_walk_transition_matrix(matrix_size) plot_transition_matrix(markov_matrix) def show_walks(markov_matrix, size, steps): print("size", size) print("steps", steps) plt.figure() xs, ys, all_nodes = simulate_walk_on_line_edge(HG, steps) std = fit_normal_distribution(all_nodes) expected_std = compute_expected_std(size, steps) print(std, expected_std) return std, expected_std ws = np.linspace(0.002, 10, 1000) sigmas = np.sqrt((ws - 1) / 2 * (ws + 1) /2 / 3) plt.figure(figsize=(8, 6)) plt.plot(ws, sigmas, linewidth=5, label='analytical') ys = [] xs = range(2, 10) for x in xs: ys.append(compute_expected_std(x, 1)) plt.plot(xs, ys, 'o', markersize=10, label='discrete') plt.tick_params(axis='both', which='major', labelsize=20) plt.xlabel('Hyperedge cardinality', size=20) plt.ylabel('Std for one step walk', size=20) plt.legend(loc=0)