In [2]:
import matplotlib.colors as colors

def compute_check(B):
    for i in B.check_nodes:
        s=0
        for n in B.neighbors(i):
           s+=B.node[n]["value"]
        B.node[i]["value"]=s % 2

def show_graph(B,pos,with_values=False):
    nx.draw_networkx_nodes(B,pos,nodelist=B.variable_nodes,node_shape='o',node_color='b')
    nx.draw_networkx_nodes(B,pos,nodelist=B.check_nodes,node_shape='s')
    nx.draw_networkx_edges(B,pos)
    if with_values:
       nx.draw_networkx_labels(B,pos,nx.get_node_attributes(B,"value"))
    plt.show()
    
def draw_nodes(B,pos,node_shape,values,llrs,node_candidates,Max,Levels):
    step=float(2)*Max/Levels
    nodes=[n for n in node_candidates if (1-2*values[n])*llrs[n] < -Max]
    node_color=colors.rgb2hex((1,0,0))
    nx.draw_networkx_nodes(B,pos,nodelist=nodes,node_shape=node_shape,node_color=node_color)
    nodes=[n for n in node_candidates if (1-2*values[n])*llrs[n] >= Max]
    node_color=colors.rgb2hex((0,1,0))
    nx.draw_networkx_nodes(B,pos,nodelist=nodes,node_shape=node_shape,node_color=node_color)
    for l in range(Levels):
        nodes=[n for n in node_candidates if (1-2*values[n])*llrs[n] >= l*step-Max and (1-2*values[n])*llrs[n] < (l+1)*step-Max]
        node_color=colors.rgb2hex((float(Levels-l)/(Levels+1),float(l+1)/(Levels+1),0))
        nx.draw_networkx_nodes(B,pos,nodelist=nodes,node_shape=node_shape,node_color=node_color)

# show decoding graph
def show_dgraph(B,pos,with_vcolors=False,with_ccolors=False,vLevels=5,cLevels=5):
    if with_vcolors:
        values=nx.get_node_attributes(B,"value")
        llrs=nx.get_node_attributes(B,"llr")
        vMax=np.log2((1-p)/p)*1.5
        draw_nodes(B,pos,'o',values,llrs,B.variable_nodes,vMax,vLevels)            
    else:
        nx.draw_networkx_nodes(B,pos,nodelist=B.variable_nodes,node_shape='o',node_color='b')
    if with_ccolors:
        values=nx.get_node_attributes(B,"value") # may check if it is pulled already if this takes long
        llrs=nx.get_node_attributes(B,"llr")
        cMax=np.log2((1-p)/p)*0.7
        draw_nodes(B,pos,'s',values,llrs,B.check_nodes,cMax,cLevels)
    else:
        nx.draw_networkx_nodes(B,pos,nodelist=B.check_nodes,node_shape='s')
    nx.draw_networkx_nodes(B,pos,nodelist=B.dummy_nodes,node_shape='s',node_color='k')
    nx.draw_networkx_edges(B,pos)
    
def get_IRA_node_pos(B,g=5): # get node position for plotting IRA code
    M=B.M
    K=B.K
    pos={}
    for i in range(0,M):
        pos[i]=(0,-i*g)
    for i in range(M,M+K):
        pos[i]=(5,-(i-M)*g)
    for i in range(M+K,M+2*K):
        pos[i]=(10,-(i-M-K)*g)
    return pos

def get_LDPC_node_pos(B,g=5): # get node position for plotting code (LDPC for decoding)
    M=B.M
    K=B.K
    pos={}
    for i in range(M+K):
        pos[M+2*K+i]=(i*g,0)
    for i in range(M):
        pos[i]=(i*g,-1)
    for i in range(K):
        pos[M+K+i]=((M+i)*g,-1)
    for i in range(K):
        pos[M+i]=(i*g,-2)
    return pos
        
def assign_random_codeword(B):
    # assign random input for information bits
    M=B.M
    K=B.K
    for i in range(0,M):
        B.node[i]["value"]=np.random.randint(0,2)
    # assign 0 to parity bits
    for i in range(0,K):
        B.node[M+K+i]["value"]=0
    # compute check
    compute_check(B)
    # assign parity bits sequentially to satisfy check
    B.node[M+K]["value"]=B.node[M]["value"]
    for i in range(1,K):
        B.node[M+K+i]["value"]=(B.node[M+i]["value"]+B.node[M+K+i-1]["value"]) % 2
    compute_check(B)

    
def H(p):
    return -p*np.log2(p)-(1-p)*np.log2(1-p)

Generate graph code

In [3]:
import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
import networkx as nx
from networkx.algorithms import bipartite

def generate_IRA_graph(M=5,K=5):
    B=nx.Graph()
    B.add_nodes_from(range(0,M), bipartite=0) # variable nodes
    B.add_nodes_from(range(M,M+K), bipartite = 1) # check nodes
    B.add_nodes_from(range(M+K,M+2*K), bipartite=0) # variable nodes

    for i in range(0,K-1):
        B.add_edges_from([(M+i,M+i+K),(M+i+1,M+i+K)])
    B.add_edges_from([(M+K-1,M+2*K-1)])

    for i in range(0,M):
        tmp=np.random.permutation(K)
        B.add_edges_from([(i,tmp[0]+M),(i,tmp[1]+M),(i,tmp[2]+M)])

    B.variable_nodes = set(n for n,d in B.nodes(data=True) if d['bipartite']==0)
    B.check_nodes = set(B)-B.variable_nodes
    B.M=M
    B.K=K
    return B

Showing an IRA code

In [4]:
B=generate_IRA_graph(5,5)
pos=get_IRA_node_pos(B)
show_graph(B,pos)

IRA Code encoding Explain

In [5]:
# assign random input
for i in B.variable_nodes:
    B.node[i]["value"]=np.random.randint(0,2)

compute_check(B)
show_graph(B,pos,True)
In [6]:
# assign random input for information bits
for i in range(0,B.M):
    B.node[i]["value"]=np.random.randint(0,2)
# assign 0 to parity bits
for i in range(0,B.K):
    B.node[B.M+B.K+i]["value"]=0
# compute check
compute_check(B)
show_graph(B,pos,True)
In [7]:
# assign parity bits sequentially to satisfy check
B.node[B.M+B.K]["value"]=B.node[B.M]["value"]
for i in range(1,B.K):
    B.node[B.M+B.K+i]["value"]=(B.node[B.M+i]["value"]+B.node[B.M+B.K+i-1]["value"]) % 2
# recompute check
compute_check(B)
show_graph(B,pos,True)

Decoding Explain

Create code

In [18]:
M=100
K=200
B=generate_IRA_graph(M,K)
# add dummy nodes for decoding
B.dummy_nodes=range(M+2*K,M+2*K+M+K)
B.add_nodes_from(B.dummy_nodes, bipartite=2) # dummy factor nodes
# add dummy edges for decoding
for i in range(M):
    B.add_edges_from([(i,M+2*K+i)])
for i in range(K):
    B.add_edges_from([(M+K+i,M+2*K+M+i)])
    
pos=get_LDPC_node_pos(B)
show_dgraph(B,pos)

Define message passing functions

In [9]:
# function for BP/message passing algorithm
def compute_variable_node_prob(B):
    for n in B.variable_nodes:
        s=0
        for j in B.neighbors(n):
            s=s+B[n][j]["f2v"]
        B.node[n]["llr"]=s
        
def variable_node_update(B):
    compute_variable_node_prob(B)
    for n in B.variable_nodes:
        s=B.node[n]["llr"]
        for j in B.neighbors(n):
            B[n][j]["v2f"]=s-B[n][j]["f2v"]
            
def compute_factor_node_prob(B):
    for n in B.check_nodes:
        p=1
        for j in B.neighbors(n):
            p=p*np.tanh(B[n][j]["v2f"]/2)
        B.node[n]["llr"]=2*np.arctanh(p)
        
def factor_node_update(B):
    compute_factor_node_prob(B)
    for n in B.check_nodes:
        p=np.tanh(B.node[n]["llr"]/2)
        for j in B.neighbors(n):
            B[n][j]["f2v"]=2*np.arctanh(p/np.tanh(B[n][j]["v2f"]/2))
            
def compute_error_rate(B):
    values=nx.get_node_attributes(B,"value")
    llrs=nx.get_node_attributes(B,"llr")
    return float(len([n for n in B.variable_nodes if (1-2*values[n])*llrs[n] < 0]))/(M+K)

def compute_error_check_rate(B):
    values=nx.get_node_attributes(B,"value")
    llrs=nx.get_node_attributes(B,"llr")
    return float(len([n for n in B.check_nodes if (1-2*values[n])*llrs[n] < 0]))/(K)

Add channel noise

In [22]:
p=0.10
assign_random_codeword(B)
for n in B.variable_nodes:
    err = 1 if np.random.rand() < p else 0
    B.node[n]["received"]=(B.node[n]["value"]+err) % 2
    
for a,b in B.edges():
    B[a][b]["f2v"]=0 # set all factor to variable node message to 0
    
llr = np.log((1-p)/p)
for i in range(0,M):
    B[i][M+2*K+i]["f2v"]=llr if B.node[i]["received"]==0 else -llr
for i in range(0,K):
    B[M+K+i][M+2*K+M+i]["f2v"]=llr if B.node[M+K+i]["received"]==0 else -llr
  
variable_node_update(B)
factor_node_update(B)
show_dgraph(B,pos,with_vcolors=True,with_ccolors=True)

    
In [23]:
#compute_variable_node_prob(B)
for _ in range(20):
    variable_node_update(B)
    factor_node_update(B)
    #show_dgraph(B,pos,with_vcolors=True,with_ccolors=True)
    print ("error rate: %f, check failure rate: %f" % (compute_error_rate(B),compute_error_check_rate(B)))
#show_dgraph(B,pos,with_vcolors=True,with_ccolors=True)
error rate: 0.066667, check failure rate: 0.285000
error rate: 0.040000, check failure rate: 0.210000
error rate: 0.043333, check failure rate: 0.200000
error rate: 0.030000, check failure rate: 0.130000
error rate: 0.020000, check failure rate: 0.090000
error rate: 0.010000, check failure rate: 0.030000
error rate: 0.000000, check failure rate: 0.005000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
error rate: 0.000000, check failure rate: 0.000000
/home/phsamuel/anaconda3/lib/python3.5/site-packages/ipykernel/__main__.py:28: RuntimeWarning: invalid value encountered in double_scalars

show_dgraph(B,pos,with_vcolors=True,with_ccolors=True)

In [15]:
1-H(0.15)
Out[15]:
0.39015969528359962