#!/usr/bin/env python # coding: utf-8 # In[2]: import matplotlib.colors as colors def compute_check(B): for i in B.check_nodes: s=0 for n in B.neighbors(i): s+=B.node[n]["value"] B.node[i]["value"]=s % 2 def show_graph(B,pos,with_values=False): nx.draw_networkx_nodes(B,pos,nodelist=B.variable_nodes,node_shape='o',node_color='b') nx.draw_networkx_nodes(B,pos,nodelist=B.check_nodes,node_shape='s') nx.draw_networkx_edges(B,pos) if with_values: nx.draw_networkx_labels(B,pos,nx.get_node_attributes(B,"value")) plt.show() def draw_nodes(B,pos,node_shape,values,llrs,node_candidates,Max,Levels): step=float(2)*Max/Levels nodes=[n for n in node_candidates if (1-2*values[n])*llrs[n] < -Max] node_color=colors.rgb2hex((1,0,0)) nx.draw_networkx_nodes(B,pos,nodelist=nodes,node_shape=node_shape,node_color=node_color) nodes=[n for n in node_candidates if (1-2*values[n])*llrs[n] >= Max] node_color=colors.rgb2hex((0,1,0)) nx.draw_networkx_nodes(B,pos,nodelist=nodes,node_shape=node_shape,node_color=node_color) for l in range(Levels): nodes=[n for n in node_candidates if (1-2*values[n])*llrs[n] >= l*step-Max and (1-2*values[n])*llrs[n] < (l+1)*step-Max] node_color=colors.rgb2hex((float(Levels-l)/(Levels+1),float(l+1)/(Levels+1),0)) nx.draw_networkx_nodes(B,pos,nodelist=nodes,node_shape=node_shape,node_color=node_color) # show decoding graph def show_dgraph(B,pos,with_vcolors=False,with_ccolors=False,vLevels=5,cLevels=5): if with_vcolors: values=nx.get_node_attributes(B,"value") llrs=nx.get_node_attributes(B,"llr") vMax=np.log2((1-p)/p)*1.5 draw_nodes(B,pos,'o',values,llrs,B.variable_nodes,vMax,vLevels) else: nx.draw_networkx_nodes(B,pos,nodelist=B.variable_nodes,node_shape='o',node_color='b') if with_ccolors: values=nx.get_node_attributes(B,"value") # may check if it is pulled already if this takes long llrs=nx.get_node_attributes(B,"llr") cMax=np.log2((1-p)/p)*0.7 draw_nodes(B,pos,'s',values,llrs,B.check_nodes,cMax,cLevels) else: nx.draw_networkx_nodes(B,pos,nodelist=B.check_nodes,node_shape='s') nx.draw_networkx_nodes(B,pos,nodelist=B.dummy_nodes,node_shape='s',node_color='k') nx.draw_networkx_edges(B,pos) def get_IRA_node_pos(B,g=5): # get node position for plotting IRA code M=B.M K=B.K pos={} for i in range(0,M): pos[i]=(0,-i*g) for i in range(M,M+K): pos[i]=(5,-(i-M)*g) for i in range(M+K,M+2*K): pos[i]=(10,-(i-M-K)*g) return pos def get_LDPC_node_pos(B,g=5): # get node position for plotting code (LDPC for decoding) M=B.M K=B.K pos={} for i in range(M+K): pos[M+2*K+i]=(i*g,0) for i in range(M): pos[i]=(i*g,-1) for i in range(K): pos[M+K+i]=((M+i)*g,-1) for i in range(K): pos[M+i]=(i*g,-2) return pos def assign_random_codeword(B): # assign random input for information bits M=B.M K=B.K for i in range(0,M): B.node[i]["value"]=np.random.randint(0,2) # assign 0 to parity bits for i in range(0,K): B.node[M+K+i]["value"]=0 # compute check compute_check(B) # assign parity bits sequentially to satisfy check B.node[M+K]["value"]=B.node[M]["value"] for i in range(1,K): B.node[M+K+i]["value"]=(B.node[M+i]["value"]+B.node[M+K+i-1]["value"]) % 2 compute_check(B) def H(p): return -p*np.log2(p)-(1-p)*np.log2(1-p) # ## Generate graph code # In[3]: import matplotlib.pyplot as plt get_ipython().run_line_magic('matplotlib', 'inline') import numpy as np import networkx as nx from networkx.algorithms import bipartite def generate_IRA_graph(M=5,K=5): B=nx.Graph() B.add_nodes_from(range(0,M), bipartite=0) # variable nodes B.add_nodes_from(range(M,M+K), bipartite = 1) # check nodes B.add_nodes_from(range(M+K,M+2*K), bipartite=0) # variable nodes for i in range(0,K-1): B.add_edges_from([(M+i,M+i+K),(M+i+1,M+i+K)]) B.add_edges_from([(M+K-1,M+2*K-1)]) for i in range(0,M): tmp=np.random.permutation(K) B.add_edges_from([(i,tmp[0]+M),(i,tmp[1]+M),(i,tmp[2]+M)]) B.variable_nodes = set(n for n,d in B.nodes(data=True) if d['bipartite']==0) B.check_nodes = set(B)-B.variable_nodes B.M=M B.K=K return B # ## Showing an IRA code # In[4]: B=generate_IRA_graph(5,5) pos=get_IRA_node_pos(B) show_graph(B,pos) # # IRA Code encoding Explain # In[5]: # assign random input for i in B.variable_nodes: B.node[i]["value"]=np.random.randint(0,2) compute_check(B) show_graph(B,pos,True) # In[6]: # assign random input for information bits for i in range(0,B.M): B.node[i]["value"]=np.random.randint(0,2) # assign 0 to parity bits for i in range(0,B.K): B.node[B.M+B.K+i]["value"]=0 # compute check compute_check(B) show_graph(B,pos,True) # In[7]: # assign parity bits sequentially to satisfy check B.node[B.M+B.K]["value"]=B.node[B.M]["value"] for i in range(1,B.K): B.node[B.M+B.K+i]["value"]=(B.node[B.M+i]["value"]+B.node[B.M+B.K+i-1]["value"]) % 2 # recompute check compute_check(B) show_graph(B,pos,True) # # Decoding Explain # ## Create code # In[18]: M=100 K=200 B=generate_IRA_graph(M,K) # add dummy nodes for decoding B.dummy_nodes=range(M+2*K,M+2*K+M+K) B.add_nodes_from(B.dummy_nodes, bipartite=2) # dummy factor nodes # add dummy edges for decoding for i in range(M): B.add_edges_from([(i,M+2*K+i)]) for i in range(K): B.add_edges_from([(M+K+i,M+2*K+M+i)]) pos=get_LDPC_node_pos(B) show_dgraph(B,pos) # ## Define message passing functions # In[9]: # function for BP/message passing algorithm def compute_variable_node_prob(B): for n in B.variable_nodes: s=0 for j in B.neighbors(n): s=s+B[n][j]["f2v"] B.node[n]["llr"]=s def variable_node_update(B): compute_variable_node_prob(B) for n in B.variable_nodes: s=B.node[n]["llr"] for j in B.neighbors(n): B[n][j]["v2f"]=s-B[n][j]["f2v"] def compute_factor_node_prob(B): for n in B.check_nodes: p=1 for j in B.neighbors(n): p=p*np.tanh(B[n][j]["v2f"]/2) B.node[n]["llr"]=2*np.arctanh(p) def factor_node_update(B): compute_factor_node_prob(B) for n in B.check_nodes: p=np.tanh(B.node[n]["llr"]/2) for j in B.neighbors(n): B[n][j]["f2v"]=2*np.arctanh(p/np.tanh(B[n][j]["v2f"]/2)) def compute_error_rate(B): values=nx.get_node_attributes(B,"value") llrs=nx.get_node_attributes(B,"llr") return float(len([n for n in B.variable_nodes if (1-2*values[n])*llrs[n] < 0]))/(M+K) def compute_error_check_rate(B): values=nx.get_node_attributes(B,"value") llrs=nx.get_node_attributes(B,"llr") return float(len([n for n in B.check_nodes if (1-2*values[n])*llrs[n] < 0]))/(K) # ## Add channel noise # # In[22]: p=0.10 assign_random_codeword(B) for n in B.variable_nodes: err = 1 if np.random.rand() < p else 0 B.node[n]["received"]=(B.node[n]["value"]+err) % 2 for a,b in B.edges(): B[a][b]["f2v"]=0 # set all factor to variable node message to 0 llr = np.log((1-p)/p) for i in range(0,M): B[i][M+2*K+i]["f2v"]=llr if B.node[i]["received"]==0 else -llr for i in range(0,K): B[M+K+i][M+2*K+M+i]["f2v"]=llr if B.node[M+K+i]["received"]==0 else -llr variable_node_update(B) factor_node_update(B) show_dgraph(B,pos,with_vcolors=True,with_ccolors=True) # In[23]: #compute_variable_node_prob(B) for _ in range(20): variable_node_update(B) factor_node_update(B) #show_dgraph(B,pos,with_vcolors=True,with_ccolors=True) print ("error rate: %f, check failure rate: %f" % (compute_error_rate(B),compute_error_check_rate(B))) #show_dgraph(B,pos,with_vcolors=True,with_ccolors=True) # ## show_dgraph(B,pos,with_vcolors=True,with_ccolors=True) # In[15]: 1-H(0.15) # In[ ]: