Parallel Processing Example

In [6]:
import numpy.random as npr
import numpy as np
import pandas as pd
from pandas import DataFrame, date_range

from datetime import datetime, timedelta
In [7]:
np.random.seed(111)

# Function to generate test data
def CreateDataSet(Number=1):
    
    Output = []
    
    for i in range(Number):
        
        # Create a date range with hour frequency
        date = date_range(start='10/1/2012', end='10/31/2012', freq='H')
        
        # Create long lat data
        laty = npr.normal(4815862, 5000,size=len(date))
        longx = npr.normal(687993, 5000,size=len(date))
        
        # status of interest
        status = [0,1]
        
        # Make a random list of statuses
        random_status = [status[npr.randint(low=0,high=len(status))] for i in range(len(date))]
        
        # user pool
        user = ['sally','derik','james','bob','ryan','chris']
        
        # Make a random list of users 
        random_user = [user[npr.randint(low=0,high=len(user))] for i in range(len(date))]
    
        Output.extend(zip(random_user, random_status, date, longx, laty))
        
    return pd.DataFrame(Output, columns = ['user', 'status', 'date', 'long', 'lat'])
In [8]:
data = CreateDataSet(20)
data.head()
Out[8]:
user status date long lat
0 ryan 0 2012-10-01 00:00:00 692823.716714 4810192.808328
1 ryan 1 2012-10-01 01:00:00 679549.965772 4817783.595967
2 bob 0 2012-10-01 02:00:00 686339.324152 4823344.768882
3 ryan 0 2012-10-01 03:00:00 677609.798732 4814085.088514
4 sally 1 2012-10-01 04:00:00 689556.379975 4811924.332295

5 rows × 5 columns

In [167]:
#some time deltas
before = timedelta(hours = 8)
after = timedelta(minutes = 1)
In [168]:
from IPython.parallel import Client
cli = Client()
cli.ids

cli = Client()
#cli[:].apply_sync(get_pid)
dview=cli[:]

with dview.sync_imports():
    import numpy as np
    import os
    from datetime import timedelta
    import pandas as pd
importing numpy on engine(s)
importing os on engine(s)
importing timedelta from datetime on engine(s)
importing pandas on engine(s)
In [194]:
@dview.parallel(block=True)
def work(df):
    before = timedelta(hours = 8)
    after = timedelta(minutes = 1)
    output = []
    #loop through data index's
    for i in range(0, len(df)):
        l = []
        #first we will filter out the data by date to have a smaller list to compute distances for
        
        #create a mask to query all dates between range for date i
        date_mask = (df['date'] >= df['date'].iloc[i]-before) & (df['date'] <= df['date'].iloc[i]+after)
        #create a mask to query all users who are not user i (themselves)
        user_mask = df['user']!=df['user'].iloc[i]
        #apply masks
        dists_to_check = df[date_mask & user_mask]
        
        #for point i, create coordinate to calculate distances from
        a = numpy.array((df['long'].iloc[i], df['lat'].iloc[i]))
        #create array of distances to check on the masked data
        b = numpy.array((dists_to_check['long'].values, dists_to_check['lat'].values))
        
        #for j in the date queried data
        for j in range(1, len(dists_to_check)):
            #compute the ueclidean distance between point a and each point of b (the date masked data)
            x = numpy.linalg.norm(a-numpy.array((b[0][j], b[1][j])))
            
            #if the distance is within our range of interest append the index to a list
            if x <=1000:
                l.append(j)
            else:
                pass
        try:
            #use the list of desired index's 'l' to query a final subset of the data
            data = dists_to_check.iloc[l]
            #summarize the column of interest then append to output list
            output.append(data['status'].sum())
        except IndexError, e:
            pass
            output.append(0)
            #print "There were no data to add"
        
    return pandas.DataFrame(output)
In [195]:
start = datetime.now()
out = work(data)
print datetime.now() - start
in sync results <function __call__ at 0x108cccaa0>
0:00:08.475609
In [0]:
 
In [124]:
c = np.array((data['long'].values, data['lat'].values))
a = np.array((data['long'].iloc[0], data['lat'].iloc[0]))

def t():
    l = []
    for j in range(1, 2000):
                x = np.linalg.norm(a-np.array((b[0][j], b[1][j])))
                
                if x <=1000:
                    l.append(j)
                else:
                    pass
In [125]:
%timeit t
10000000 loops, best of 3: 30.4 ns per loop