import matplotlib.colors as colors
def compute_check(B):
for i in B.check_nodes:
s=0
for n in B.neighbors(i):
s+=B.node[n]["value"]
B.node[i]["value"]=s % 2
def show_graph(B,pos,with_values=False):
nx.draw_networkx_nodes(B,pos,nodelist=B.variable_nodes,node_shape='o',node_color='b')
nx.draw_networkx_nodes(B,pos,nodelist=B.check_nodes,node_shape='s')
nx.draw_networkx_edges(B,pos)
if with_values:
nx.draw_networkx_labels(B,pos,nx.get_node_attributes(B,"value"))
plt.show()
def draw_nodes(B,pos,node_shape,values,llrs,node_candidates,Max,Levels):
step=float(2)*Max/Levels
nodes=[n for n in node_candidates if (1-2*values[n])*llrs[n] < -Max]
node_color=colors.rgb2hex((1,0,0))
nx.draw_networkx_nodes(B,pos,nodelist=nodes,node_shape=node_shape,node_color=node_color)
nodes=[n for n in node_candidates if (1-2*values[n])*llrs[n] >= Max]
node_color=colors.rgb2hex((0,1,0))
nx.draw_networkx_nodes(B,pos,nodelist=nodes,node_shape=node_shape,node_color=node_color)
for l in range(Levels):
nodes=[n for n in node_candidates if (1-2*values[n])*llrs[n] >= l*step-Max and (1-2*values[n])*llrs[n] < (l+1)*step-Max]
node_color=colors.rgb2hex((float(Levels-l)/(Levels+1),float(l+1)/(Levels+1),0))
nx.draw_networkx_nodes(B,pos,nodelist=nodes,node_shape=node_shape,node_color=node_color)
# show decoding graph
def show_dgraph(B,pos,with_vcolors=False,with_ccolors=False,vLevels=5,cLevels=5):
if with_vcolors:
values=nx.get_node_attributes(B,"value")
llrs=nx.get_node_attributes(B,"llr")
vMax=np.log2((1-p)/p)*1.5
draw_nodes(B,pos,'o',values,llrs,B.variable_nodes,vMax,vLevels)
else:
nx.draw_networkx_nodes(B,pos,nodelist=B.variable_nodes,node_shape='o',node_color='b')
if with_ccolors:
values=nx.get_node_attributes(B,"value") # may check if it is pulled already if this takes long
llrs=nx.get_node_attributes(B,"llr")
cMax=np.log2((1-p)/p)*0.7
draw_nodes(B,pos,'s',values,llrs,B.check_nodes,cMax,cLevels)
else:
nx.draw_networkx_nodes(B,pos,nodelist=B.check_nodes,node_shape='s')
nx.draw_networkx_nodes(B,pos,nodelist=B.dummy_nodes,node_shape='s',node_color='k')
nx.draw_networkx_edges(B,pos)
def get_IRA_node_pos(B,g=5): # get node position for plotting IRA code
M=B.M
K=B.K
pos={}
for i in range(0,M):
pos[i]=(0,-i*g)
for i in range(M,M+K):
pos[i]=(5,-(i-M)*g)
for i in range(M+K,M+2*K):
pos[i]=(10,-(i-M-K)*g)
return pos
def get_LDPC_node_pos(B,g=5): # get node position for plotting code (LDPC for decoding)
M=B.M
K=B.K
pos={}
for i in range(M+K):
pos[M+2*K+i]=(i*g,0)
for i in range(M):
pos[i]=(i*g,-1)
for i in range(K):
pos[M+K+i]=((M+i)*g,-1)
for i in range(K):
pos[M+i]=(i*g,-2)
return pos
def assign_random_codeword(B):
# assign random input for information bits
M=B.M
K=B.K
for i in range(0,M):
B.node[i]["value"]=np.random.randint(0,2)
# assign 0 to parity bits
for i in range(0,K):
B.node[M+K+i]["value"]=0
# compute check
compute_check(B)
# assign parity bits sequentially to satisfy check
B.node[M+K]["value"]=B.node[M]["value"]
for i in range(1,K):
B.node[M+K+i]["value"]=(B.node[M+i]["value"]+B.node[M+K+i-1]["value"]) % 2
compute_check(B)
def H(p):
return -p*np.log2(p)-(1-p)*np.log2(1-p)
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import networkx as nx
from networkx.algorithms import bipartite
def generate_IRA_graph(M=5,K=5):
B=nx.Graph()
B.add_nodes_from(range(0,M), bipartite=0) # variable nodes
B.add_nodes_from(range(M,M+K), bipartite = 1) # check nodes
B.add_nodes_from(range(M+K,M+2*K), bipartite=0) # variable nodes
for i in range(0,K-1):
B.add_edges_from([(M+i,M+i+K),(M+i+1,M+i+K)])
B.add_edges_from([(M+K-1,M+2*K-1)])
for i in range(0,M):
tmp=np.random.permutation(K)
B.add_edges_from([(i,tmp[0]+M),(i,tmp[1]+M),(i,tmp[2]+M)])
B.variable_nodes = set(n for n,d in B.nodes(data=True) if d['bipartite']==0)
B.check_nodes = set(B)-B.variable_nodes
B.M=M
B.K=K
return B
B=generate_IRA_graph(5,5)
pos=get_IRA_node_pos(B)
show_graph(B,pos)
# assign random input
for i in B.variable_nodes:
B.node[i]["value"]=np.random.randint(0,2)
compute_check(B)
show_graph(B,pos,True)
# assign random input for information bits
for i in range(0,B.M):
B.node[i]["value"]=np.random.randint(0,2)
# assign 0 to parity bits
for i in range(0,B.K):
B.node[B.M+B.K+i]["value"]=0
# compute check
compute_check(B)
show_graph(B,pos,True)
# assign parity bits sequentially to satisfy check
B.node[B.M+B.K]["value"]=B.node[B.M]["value"]
for i in range(1,B.K):
B.node[B.M+B.K+i]["value"]=(B.node[B.M+i]["value"]+B.node[B.M+B.K+i-1]["value"]) % 2
# recompute check
compute_check(B)
show_graph(B,pos,True)
M=100
K=200
B=generate_IRA_graph(M,K)
# add dummy nodes for decoding
B.dummy_nodes=range(M+2*K,M+2*K+M+K)
B.add_nodes_from(B.dummy_nodes, bipartite=2) # dummy factor nodes
# add dummy edges for decoding
for i in range(M):
B.add_edges_from([(i,M+2*K+i)])
for i in range(K):
B.add_edges_from([(M+K+i,M+2*K+M+i)])
pos=get_LDPC_node_pos(B)
show_dgraph(B,pos)
# function for BP/message passing algorithm
def compute_variable_node_prob(B):
for n in B.variable_nodes:
s=0
for j in B.neighbors(n):
s=s+B[n][j]["f2v"]
B.node[n]["llr"]=s
def variable_node_update(B):
compute_variable_node_prob(B)
for n in B.variable_nodes:
s=B.node[n]["llr"]
for j in B.neighbors(n):
B[n][j]["v2f"]=s-B[n][j]["f2v"]
def compute_factor_node_prob(B):
for n in B.check_nodes:
p=1
for j in B.neighbors(n):
p=p*np.tanh(B[n][j]["v2f"]/2)
B.node[n]["llr"]=2*np.arctanh(p)
def factor_node_update(B):
compute_factor_node_prob(B)
for n in B.check_nodes:
p=np.tanh(B.node[n]["llr"]/2)
for j in B.neighbors(n):
B[n][j]["f2v"]=2*np.arctanh(p/np.tanh(B[n][j]["v2f"]/2))
def compute_error_rate(B):
values=nx.get_node_attributes(B,"value")
llrs=nx.get_node_attributes(B,"llr")
return float(len([n for n in B.variable_nodes if (1-2*values[n])*llrs[n] < 0]))/(M+K)
def compute_error_check_rate(B):
values=nx.get_node_attributes(B,"value")
llrs=nx.get_node_attributes(B,"llr")
return float(len([n for n in B.check_nodes if (1-2*values[n])*llrs[n] < 0]))/(K)
p=0.10
assign_random_codeword(B)
for n in B.variable_nodes:
err = 1 if np.random.rand() < p else 0
B.node[n]["received"]=(B.node[n]["value"]+err) % 2
for a,b in B.edges():
B[a][b]["f2v"]=0 # set all factor to variable node message to 0
llr = np.log((1-p)/p)
for i in range(0,M):
B[i][M+2*K+i]["f2v"]=llr if B.node[i]["received"]==0 else -llr
for i in range(0,K):
B[M+K+i][M+2*K+M+i]["f2v"]=llr if B.node[M+K+i]["received"]==0 else -llr
variable_node_update(B)
factor_node_update(B)
show_dgraph(B,pos,with_vcolors=True,with_ccolors=True)
#compute_variable_node_prob(B)
for _ in range(20):
variable_node_update(B)
factor_node_update(B)
#show_dgraph(B,pos,with_vcolors=True,with_ccolors=True)
print ("error rate: %f, check failure rate: %f" % (compute_error_rate(B),compute_error_check_rate(B)))
#show_dgraph(B,pos,with_vcolors=True,with_ccolors=True)
error rate: 0.066667, check failure rate: 0.285000 error rate: 0.040000, check failure rate: 0.210000 error rate: 0.043333, check failure rate: 0.200000 error rate: 0.030000, check failure rate: 0.130000 error rate: 0.020000, check failure rate: 0.090000 error rate: 0.010000, check failure rate: 0.030000 error rate: 0.000000, check failure rate: 0.005000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000 error rate: 0.000000, check failure rate: 0.000000
/home/phsamuel/anaconda3/lib/python3.5/site-packages/ipykernel/__main__.py:28: RuntimeWarning: invalid value encountered in double_scalars
1-H(0.15)
0.39015969528359962