An Exploration of Financial Relationships between Physicians and Payers

In [264]:
import random as random
import re
from decimal import *
import locale
import itertools

import sympy
from sympy import *
import pandas
from pandas import DataFrame
import math
import scipy.stats as stats
import numpy as np
from sympy import latex

import matplotlib
from matplotlib import pyplot as plt

from pyspark.sql import SQLContext 
from pyspark.sql import DecimalType 
from pyspark.sql import FloatType 
from pyspark import Row

import seaborn as sns
from IPython.core.display import HTML

#initialize some things for the IPython session
init_printing()
locale.setlocale(locale.LC_ALL, 'en_US')

#Generate a bar chart with data, title, y-label, x-label and whether
#the chart should be bar scale.
def bar_chart(label_to_count, title, y_label, x_label,log):
    OX = [x[0] for x in label_to_count]
    OY = [y[1] for y in label_to_count]
    fig = plt.figure()
    fig.suptitle(title, size=14)
    ax = plt.subplot(111)
    width = .35
    ind = np.arange(len(OY))
    rects = ax.bar(ind, OY, alpha=0.35, color='b', label=y_label)
    for ii,rect in enumerate(rects):
        height = rect.get_height()
        plt.text(rect.get_x()+rect.get_width()/2., 1.02*height, '%.2fM'% (OY[ii]),
                ha='center', va='bottom')
    ax.legend()
    ax.grid(True)
    ax.set_xticks(np.arange(len(OX)) + width)
    ax.set_xticklabels(OX)
    ax.set_ylabel(y_label)
    
    fig.autofmt_xdate()

#Take a 2D array of data and create a dataframe to display
#the data in tabular form
def print_table(column_labels, row_labels, contents):
    tmp = [[t[0]] + t[1] for t in zip(row_labels, contents)]
    df = DataFrame(tmp, columns=column_labels)
    pandas.set_option('display.max_colwidth', 100)
    display(HTML(df.to_html()))

#Truncate long lines on word boundaries
def truncate(line, length=40):
    if(len(line) > length):
        
        output_line = ""
        for token in line.split(' '):
            next = output_line + " " + token
            if len(next ) >= length:
                return next + "..."
            else:
                if len(output_line) == 0:
                    output_line = token
                else:
                    output_line = next
    else:
        return line
In [265]:
#Let's overlay some structure from our raw data

#after the pig job, we get a ctrl-A separated file
raw_data = sc.textFile("/user/hrt_qa/open_payments/general/post/part-m-*")

#create a SQL Context so that we may use spark-sql
#This allows us to use a very simple subset of SQL, for a more complete
#set of SQL available, you can use Hive as the underlying engine
#by using HiveContext instead of SQLContext
sqlContext = SQLContext(sc)

#split up the line into tokens separated on ctrl-a
parts = raw_data.map(lambda l : l.split('\x01'))

#We're only really concerned about a few fields, so we'll project out only
#the fields we're interested in.
def tokens_to_columns(tokens):
    return Row( physician_id=tokens[7]\
              , physician_name="{} {}".format(tokens[8], tokens[10])\
              , physician_specialty=tokens[21] \
              , payer=tokens[43] \
              , reason=tokens[52] \
              , amount_str=tokens[48] \
              , amount=float(tokens[48]) \
              , amount_decimal=Decimal(tokens[48]) \
              )

#Consider rows with either empty or null physician ID's to be bad and we want to ignore those.
payments = parts.map(tokens_to_columns)\
                .filter(lambda row : len(row.physician_id) > 0)
                       
#Now, we can register this as a table called payments.
#This allows us to refer to the table in our SQL statements
schemaPayments = sqlContext.inferSchema(payments)
schemaPayments.registerAsTable('payments')
In [266]:
#Broken down by reasons
count_by_reasons = sqlContext.sql("""select reason, count(*) as num_payments 
                                     from payments 
                                     group by reason 
                                     order by num_payments desc""").collect()

print_table(['Payment Reason', '# of Payments']\
           , [x[0] for x in count_by_reasons]\
           , [ [locale.format("%d", x[1], grouping=True)] for x in count_by_reasons]\
           )
Payment Reason # of Payments
0 Food and Beverage 2,192,057
1 Travel and Lodging 135,235
2 Education 122,839
3 Compensation for services other than consulting, including serving as faculty or as a speaker at... 77,236
4 Consulting Fee 45,525
5 Gift 26,422
6 Honoraria 10,900
7 Compensation for serving as faculty or as a speaker for a non-accredited and noncertified contin... 4,377
8 Royalty or License 3,268
9 Grant 2,881
10 Space rental or facility fees(teaching hospital only) 2,114
11 Current or prospective ownership or investment interest 1,559
12 Entertainment 1,400
13 Charitable Contribution 480
14 Compensation for serving as faculty or as a speaker for an accredited or certified continuing ed... 381
In [267]:
#Which specialties are getting the most reimbursements?
totals_by_specialty = sqlContext.sql("""select physician_specialty, count(*) as cnt, sum(amount) as total
                                        from payments
                                        group by physician_specialty""").collect()
total_count_by_specialty = sum([t[1] for t in totals_by_specialty])
top_count_by_specialty = sorted(totals_by_specialty, key=lambda t : t[1], reverse=True)[0:10]
print_table(['Specialty', '# of Payments', "% of Payments"]\
           , [x[0] for x in top_count_by_specialty]\
           , [ [ locale.format("%d", x[1], grouping=True)\
               , '{0:.2f}%'.format(100*x[1]/total_count_by_specialty)\
               ] \
               for x in top_count_by_specialty]\
           )
Specialty # of Payments % of Payments
0 Allopathic & Osteopathic Physicians/ Family Medicine 430,771 16.00%
1 Allopathic & Osteopathic Physicians/ Internal Medicine 417,155 15.00%
2 Allopathic & Osteopathic Physicians/ Internal Medicine/ Cardiovascular Disease 160,342 6.00%
3 Allopathic & Osteopathic Physicians/ Psychiatry & Neurology/ Psychiatry 121,067 4.00%
4 Allopathic & Osteopathic Physicians/ Internal Medicine/ Gastroenterology 92,531 3.00%
5 Allopathic & Osteopathic Physicians/ Psychiatry & Neurology/ Neurology 89,673 3.00%
6 Other Service Providers/ Specialist 80,576 3.00%
7 Allopathic & Osteopathic Physicians/ Obstetrics & Gynecology 65,434 2.00%
8 Allopathic & Osteopathic Physicians/ Internal Medicine/ Endocrinology, Diabetes & Metabolism 62,862 2.00%
9 Allopathic & Osteopathic Physicians/ Urology 61,165 2.00%
In [222]:
#Which specialties are getting the most money?
top_total_by_specialty = sorted(totals_by_specialty, key=lambda t : t[2], reverse=True)[0:10]
total_amount_by_specialty = sum([t[2] for t in totals_by_specialty])
print_table(['Specialty', 'Amount of Payments', '% of Total Amount']\
           , [x[0] for x in top_total_by_specialty]\
           , [ ['$' + locale.format('%0.2f', x[2], grouping=True)\
               , '{0:.2f}%'.format(100*x[2]/total_amount_by_specialty)\
               ] \
               for x in top_total_by_specialty\
             ]\
           )
Specialty Amount of Payments % of Total Amount
0 " $209,192,160.39 31.24%
1 Allopathic & Osteopathic Physicians/ Orthopaedic Surgery $80,157,503.04 11.97%
2 Allopathic & Osteopathic Physicians/ Internal Medicine $26,616,544.66 3.98%
3 Allopathic & Osteopathic Physicians/ Internal Medicine/ Cardiovascular Disease $24,291,090.52 3.63%
4 Allopathic & Osteopathic Physicians/ Psychiatry & Neurology/ Psychiatry $18,724,216.32 2.80%
5 Other Service Providers/ Specialist $17,475,702.37 2.61%
6 Allopathic & Osteopathic Physicians/ Psychiatry & Neurology/ Neurology $16,974,664.19 2.54%
7 Allopathic & Osteopathic Physicians/ Neurological Surgery $15,848,473.39 2.37%
8 Allopathic & Osteopathic Physicians/ Internal Medicine/ Endocrinology, Diabetes & Metabolism $15,525,370.10 2.32%
9 Allopathic & Osteopathic Physicians/ Internal Medicine/ Gastroenterology $14,570,253.86 2.18%
In [223]:
#who is getting the most gifts?
gift_amount_by_physician = sqlContext.sql("""select physician_id, physician_specialty, payer, count(*) as cnt, sum(amount) as total
                                             from payments
                                             where reason = \'Gift\'
                                             group by physician_id, physician_specialty, payer
                                             order by total desc
                                             """).filter(lambda t:len(t[0]) > 3).take(10)

print_table(['Physician','Specialty', 'Payer', 'Number of Gifts', 'Total Amount for Gifts']\
           , [x[0] for x in gift_amount_by_physician]\
           , [ [ x[1] \
               , x[2] \
               , locale.format('%d', x[3], grouping=True)\
               , '$' + locale.format('%0.2f', x[4], grouping=True)\
               ] \
             for x in gift_amount_by_physician]\
           )
Physician Specialty Payer Number of Gifts Total Amount for Gifts
0 225073 Dental Providers/ Dentist/ General Practice Dentalez Alabama, Inc. 1 $56,422.00
1 364744 Other Service Providers/ Specialist Ellman International 1 $37,699.00
2 446958 Dental Providers/ Dentist/ Endodontics Tulsa Dental Products LLC 6 $37,216.28
3 523360 Dental Providers/ Dentist/ Prosthodontics GAC International LLC 14 $23,562.07
4 244739 Allopathic & Osteopathic Physicians/ Family Medicine Mallinckrodt LLC 1 $19,488.75
5 92931 Allopathic & Osteopathic Physicians/ Psychiatry & Neurology/ Neurology Mallinckrodt LLC 1 $19,370.35
6 481461 Dental Providers/ Dentist/ Orthodontics and Dentofacial Orthopedics GAC International LLC 1 $18,573.00
7 9126 Dental Providers/ Dentist/ Orthodontics and Dentofacial Orthopedics GAC International LLC 7 $15,750.58
8 523314 Dental Providers/ Dentist/ Orthodontics and Dentofacial Orthopedics GAC International LLC 1 $15,000.00
9 224960 Laboratories/ Clinical Medical Laboratory Tosoh Bioscience, Inc. 5 $14,001.00
In [224]:
#who is getting the most Food?
food_amount_by_physician = sqlContext.sql("""select physician_id
                                                  , physician_specialty
                                                  , payer, count(*) as cnt
                                                  , sum(amount) as total
                                             from payments
                                             where reason = \'Food and Beverage\'
                                             group by physician_id, physician_specialty, payer
                                             order by total desc
                                             """).filter(lambda t:len(t[0]) > 3).take(10)

print_table(['Physician','Specialty', 'Payer', 'Number of Payments', 'Total Amount for Payments']\
           , [x[0] for x in food_amount_by_physician]\
           , [ [ x[1] \
               , x[2] \
               , locale.format('%d', x[3], grouping=True)\
               , '$' + locale.format('%0.2f', x[4], grouping=True)\
               ] \
             for x in food_amount_by_physician]\
           )
Physician Specialty Payer Number of Payments Total Amount for Payments
0 200720 Allopathic & Osteopathic Physicians/ Surgery Teleflex Medical Incorporated 4 $78,183.81
1 28946 Dental Providers/ Dentist/ General Practice BIOLASE, INC. 6 $29,608.22
2 405597 Allopathic & Osteopathic Physicians/ Ophthalmology Lundbeck LLC 67 $14,955.02
3 29943 Allopathic & Osteopathic Physicians/ Urology Auxilium Pharmaceuticals, Inc. 62 $13,138.35
4 245373 Allopathic & Osteopathic Physicians/ Anesthesiology Depomed, Inc. 36 $12,647.92
5 232708 Allopathic & Osteopathic Physicians/ Neurological Surgery Baxano Surgical, Inc. 28 $10,641.94
6 328465 SonaCare Medical, LLC 4 $9,997.92
7 440053 Allopathic & Osteopathic Physicians/ Internal Medicine Pfizer Inc. 36 $9,690.36
8 201967 Allopathic & Osteopathic Physicians/ Surgery/ Surgical Oncology Intuitive Surgical, Inc. 18 $8,601.26
9 154591 Other Service Providers/ Specialist Ranbaxy Inc. 10 $8,347.49
In [225]:
#Who is paying the most?
amount_by_payer = sqlContext.sql("""select payer, reason, count(*) as cnt, sum(amount) as total
                                             from payments
                                             group by payer, reason
                                             order by total desc
                                             """).filter(lambda t:len(t[0]) > 3).take(10)
print_table(['Payer','Reason', 'Number of Payments', 'Total Amount for Payments']\
           , [x[0] for x in amount_by_payer]\
           , [ [ x[1] \
               , locale.format('%d', x[2], grouping=True)\
               , '$' + locale.format('%0.2f', x[3], grouping=True)\
               ] \
             for x in amount_by_payer]\
           )
Payer Reason Number of Payments Total Amount for Payments
0 Genentech, Inc. Royalty or License 65 $122,548,134.00
1 DePuy Synthes Sales Inc. Royalty or License 247 $27,730,373.58
2 Arthrex, Inc. Royalty or License 259 $11,524,088.26
3 Biomet, Inc. Royalty or License 301 $9,966,304.43
4 AstraZeneca Pharmaceuticals LP Compensation for services other than consulting, including serving as faculty or as a speaker at... 5,237 $9,529,667.44
5 Zimmer Holding Inc Royalty or License 115 $9,132,692.18
6 Pfizer Inc. Grant 107 $7,989,769.90
7 Forest Laboratories, Inc. Compensation for services other than consulting, including serving as faculty or as a speaker at... 5,165 $7,633,516.85
8 Janssen Pharmaceuticals, Inc Compensation for services other than consulting, including serving as faculty or as a speaker at... 3,742 $7,423,565.00
9 Otsuka America Pharmaceutical, Inc. Consulting Fee 4,098 $6,972,416.19
In [226]:
#Take the data above and generate a bar chart with it
bar_chart([ [x[0] + ' - ' + truncate(x[1], length=20) \
            , x[3] /1000000.0 \
            ] for x in amount_by_payer ]\
         , 'Most Paid'\
         , 'Total Paid in $1M'\
         , 'Payer/Reason'\
         , False\
         )
In [227]:
#Some useful functions for more advanced analytics

#Joins in spark take RDD[K,V] x RDD[K,U] => RDD[K, [U,V] ]
#This function returns U
def join_lhs(t):
    return t[1][0]

#Joins in spark take RDD[K,V] x RDD[K,U] => RDD[K, [U,V] ]
#This function returns V
def join_rhs(t):
    return t[1][1]

#Add a key/value to a dictionary and return the dictionary
def annotate_dict(d, k, v):
    d[k] = v
    return d

#Plots a density plot of a set of points representing inliers and outliers
#A rugplot is used to indicate the points and the outliers are marked in red.
def plot_outliers(inliers, outliers, reason):
    fig, ax = plt.subplots(nrows=1)
    sns.distplot(inliers + outliers, ax=ax, rug=True, hist=False)
    ax.plot(outliers, np.zeros_like(outliers), 'ro', clip_on=False)
    fig.suptitle('Distribution for {} Values'.format(reason), size=14)
In [228]:
#Outlier analysis using Median Absolute Divergence

#Using reservoir sampling, uniformly sample N points
#requires O(N) memory
def sample_points(points, N):
    sample = [];
    for i,point in enumerate(points):
        if i < N:
            sample.append(point)
        elif i >= N and random.random() < N/float(i+1):
            replace = random.randint(0,len(sample)-1)
            sample[replace] = point
    return sample

#Returns a function which will extract the median at location 'key'
#a list of dictionaries.
def median_func(key):
    #Right now it uses numpy's median, but probably a quickselect implementation is called for
    #as I expect this doesn't scale
    return lambda partition_value : (partition_value[0], np.median([d[key] for d in partition_value[1]]))

#Compute the modified z-score for use by  as per Iglewicz and Hoaglin:
#Boris Iglewicz and David Hoaglin (1993),
#"Volume 16: How to Detect and Handle Outliers",
#The ASQC Basic References in Quality Control: Statistical Techniques
#, Edward F. Mykytka, Ph.D., Editor.
def get_z_score(reason_to_diff):
    med = join_rhs(reason_to_diff)
    if med > 0:
        return 0.6745 * join_lhs(reason_to_diff)['diff'] / med
    else:
        return 0

def is_outlier(thresh):
    return lambda reason_to_diff : get_mad(reason_to_diff) > thresh

#Return a RDD of a uniform random sample of a specified size per key
def get_inliers(reason_amount_pairs, size=2000):
    group_by_reason = reason_amount_pairs.groupByKey()
    return group_by_reason.map(lambda t : (t[0], sample_points(t[1], size)))


#Return the outliers based on Median Absolute Divergence
#See http://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm for more info.
#The input key structure is reason_specialty => dict(amount, physician, payer, specialty)
def get_outliers(reason_amount_pairs, thresh=3.5):
    """
        This uses the median absolute divergence (MAD) statistic to find
        outliers for each reason x specialty partitions.
        
        Outliers are computed as follows: 
        * Let X be all the payments for a given specialty, reason pair
        * Let x_i be a payment in X
        * Let MAD be the median absolute divergence, defined as
          MAD = median( for all x in X, | x - median(X)| )
        * Let M_i be the modified z-score for payment x_i, defined as
          0.6745*(x_i − median(X) )/MAD
        
        As per the recommendations by Iglewicz and Hoaglin, a payment is
        considered an outlier if the modified z-score, M_i > thresh, which
        is 3.5 by default.
        
        
        REFERENCE:
        Boris Iglewicz and David Hoaglin (1993),
        "Volume 16: How to Detect and Handle Outliers",
        The ASQC Basic References in Quality Control: Statistical Techniques,
        Edward F. Mykytka, Ph.D., Editor.
    """
    
    group_by_reason = reason_amount_pairs.groupByKey()
   
    #Filter by only reason/specialty's with more than 1k entries
    #and compute the median of the amounts across the partition.
    
    #NOTE: There may be some scalability challenges around median, so some care should
    #be taken to reimplement this if partitioning by (reason, specialty)
    #does not yield small enough numbers to handle in an individual map function.
    reason_to_median = group_by_reason.filter(lambda t: len(t[1]) > 1000) \
                                      .map(median_func('amount'))
   
    #Join the base, non-grouped data, with the median per key, consider just the payments more than the median
    #since we're looking for large money outliers and annotate the dictionary for each entry x_i with the following:
    # * diff = |x_i - median(X)| in the parlance of the comment above.
    #   NOTE: Strictly speaking I can drop the absolute value since x_i > median(X), but I choose not to.
    # * median = median(X)
    # 
    reason_abs_dist_from_median = \
        reason_amount_pairs.join(reason_to_median) \
                           .filter(lambda t : join_lhs(t)['amount'] > join_rhs(t)) \
                           .map(lambda t: (t[0],dict( diff=abs(join_lhs(t)['amount'] - join_rhs(t))\
                                                    , row=annotate_dict(join_lhs(t) \
                                                                       , 'median' \
                                                                       , join_rhs(t) \
                                                                       )\
                                                    )\
                                          )\
                                )
                
    # Given diff cached per element, we need only compute the median of the diffs
    # to compute the MAD.  
    #Remember, MAD = median( for all x in X, | x - median(X)| )
    reason_to_MAD = reason_abs_dist_from_median.groupByKey() \
                                               .map(median_func('diff'))
    reason_to_MAD.take(1)
    
    # Joining the grouped data to get both | x_i - median(X) | and MAD in the same place, we can compute
    # the modified z-score, 0.6475*| x_i - median(X)| / MAD, and filter by the ones which are more than threshold
    # we can then do some pivoting of keys and sort by that threshold to give us the ranked list of outliers.
    return reason_abs_dist_from_median.join(reason_to_MAD) \
                                      .filter(is_outlier(thresh))\
                                      .map(lambda t: (get_z_score(t), annotate_dict(join_lhs(t)['row'], 'key', t[0]))) \
                                      .sortByKey(False) \
                                      .map(lambda t: (t[1]['key'], annotate_dict(t[1], 'mad', t[0])))

#Filter the outliers by reason and return a RDD with just the outliers of a specified reason.
def get_by_reason(outliers, reason):
    return outliers.filter(lambda t: str.startswith(t[0].encode('ascii', 'ignore'),reason))

#Grab data using Spark-SQL and filter with spark core RDD operations to only yield the data
#we want, ones with physicians, payers and reasons
reason_amount_pairs = sqlContext.sql("select reason, physician_specialty, amount, physician_id, payer from payments")\
                                .filter(lambda row:len(row.reason) > 3 and len(row.physician_id) > 3 and len(row.payer) > 3) \
                                .map(lambda row: ( "{}_{}".format(row.reason, row.physician_specialty)\
                                               , dict(amount=row.amount\
                                                     ,physician_id=row.physician_id\
                                                     ,payer=row.payer\
                                                     ,specialty=row.physician_specialty\
                                                     )\
                                               )\
                                     )

#Get the outliers based on a modified z-score threshold of 3.5
outliers = get_outliers(reason_amount_pairs, 3.5)
#Get a sample per specialty/reason partition
inliers = get_inliers(reason_amount_pairs)
In [231]:
#display the top k outliers in a table and a distribution plot
#of an inlier sample along with the outliers rug-plotted in red
def display_info(inliers_raw, outliers_raw_tmp, reason, k=None):
    outliers_raw = []
    if k is None:
        outliers_raw = sorted(outliers_raw_tmp, key=lambda d:d[1]['amount'], reverse=True)
    else:
        outliers_raw = sorted(outliers_raw_tmp, key=lambda d:d[1]['amount'], reverse=True)[0:k]
    inlier_pts = []
    for i in [d[1] for d in inliers_raw]:
        for j in i:
            inlier_pts.append(j['amount'])
    outlier_pts= [d[1]['amount'] for d in outliers_raw]
    plot_outliers(inlier_pts[0:1500], outlier_pts, reason)

    print_table(['Physician','Specialty', 'Payer', 'Amount']\
               , [d[1]['physician_id'] for d in outliers_raw]\
               , [ [ d[1]['specialty'] \
                   , d[1]['payer'].encode('ascii', 'ignore') \
                   , '$' + locale.format('%0.2f', d[1]['amount'], grouping=True)\
                   ] \
                 for d in outliers_raw]\
               )
In [232]:
#outliers for food and beverage purchases

food_outliers = get_by_reason(outliers, 'Food and Beverage').collect()
food_inliers = get_by_reason(inliers, 'Food and Beverage').collect()
display_info(food_inliers, food_outliers, 'Food and Beverage', 4)
Physician Specialty Payer Amount
0 200720 Allopathic & Osteopathic Physicians/ Surgery Teleflex Medical Incorporated $68,750.00
1 28946 Dental Providers/ Dentist/ General Practice BIOLASE, INC. $13,297.15
2 28946 Dental Providers/ Dentist/ General Practice BIOLASE, INC. $8,111.82
3 28946 Dental Providers/ Dentist/ General Practice BIOLASE, INC. $8,111.82
In [233]:
travel_outliers = get_by_reason(outliers, 'Travel and Lodging').collect()
travel_inliers = get_by_reason(inliers, 'Travel and Lodging').collect()
display_info(travel_inliers, travel_outliers, 'Travel and Lodging', 10)
Physician Specialty Payer Amount
0 106320 Allopathic & Osteopathic Physicians/ Psychiatry & Neurology/ Neurology Boehringer Ingelheim Pharma GmbH & Co.KG $155,772.00
1 472722 Allopathic & Osteopathic Physicians/ Internal Medicine/ Nephrology Merck Sharp & Dohme Corporation $75,000.00
2 371379 Allopathic & Osteopathic Physicians/ Orthopaedic Surgery Exactech, Inc. $65,798.00
3 198801 Allopathic & Osteopathic Physicians/ Internal Medicine/ Cardiovascular Disease Medtronic Vascular, Inc. $41,232.80
4 382697 Allopathic & Osteopathic Physicians/ Internal Medicine/ Nephrology Genentech, Inc. $39,978.80
5 169095 Allopathic & Osteopathic Physicians/ Surgery Medtronic Vascular, Inc. $37,683.00
6 80052 Allopathic & Osteopathic Physicians/ Family Medicine Boehringer Ingelheim Pharma GmbH & Co.KG $24,911.25
7 202461 Allopathic & Osteopathic Physicians/ Thoracic Surgery (Cardiothoracic Vascular Surgery) Covidien LP $21,594.51
8 378722 Allopathic & Osteopathic Physicians/ Internal Medicine GlaxoSmithKline, LLC. $20,112.40
9 243205 Allopathic & Osteopathic Physicians/ Internal Medicine/ Interventional Cardiology Medtronic Vascular, Inc. $19,273.90
In [234]:
consulting_outliers = get_by_reason(outliers, 'Consulting Fee').collect()
consulting_inliers = get_by_reason(inliers, 'Consulting Fee').collect()
display_info(consulting_inliers, consulting_outliers, 'Consulting Fee', 10)
Physician Specialty Payer Amount
0 104930 Allopathic & Osteopathic Physicians/ Psychiatry & Neurology/ Neurology Teva Pharmaceuticals USA, Inc. $207,500.00
1 151515 Other Service Providers/ Specialist Alcon Research Ltd $150,000.00
2 309376 Allopathic & Osteopathic Physicians/ Internal Medicine Teva Pharmaceuticals USA, Inc. $137,559.67
3 231913 Allopathic & Osteopathic Physicians/ Orthopaedic Surgery Exactech, Inc. $108,125.00
4 465481 Allopathic & Osteopathic Physicians/ Internal Medicine/ Rheumatology Vision Quest Industries Inc. $102,196.09
5 409799 Allopathic & Osteopathic Physicians/ Internal Medicine/ Endocrinology, Diabetes & Metabolism Pfizer Inc. $100,000.00
6 206227 Allopathic & Osteopathic Physicians/ Orthopaedic Surgery DePuy Synthes Sales Inc. $93,750.00
7 436192 Allopathic & Osteopathic Physicians/ Internal Medicine Pfizer Inc. $90,000.00
8 306965 Allopathic & Osteopathic Physicians/ Psychiatry & Neurology/ Neurology Teva Pharmaceuticals USA, Inc. $64,125.00
9 163888 Allopathic & Osteopathic Physicians/ Internal Medicine/ Cardiovascular Disease Boehringer Ingelheim Pharmaceuticals, Inc. $61,025.00
In [235]:
gift_outliers = get_by_reason(outliers, 'Gift').collect()
gift_inliers = get_by_reason(inliers, 'Gift').collect()
display_info(gift_inliers, gift_outliers, 'Gift', 10)
Physician Specialty Payer Amount
0 225073 Dental Providers/ Dentist/ General Practice Dentalez Alabama, Inc. $56,422.00
1 167931 Dental Providers/ Dentist DENTSPLY IH Inc. $8,672.50
2 380517 Dental Providers/ Dentist DENTSPLY IH Inc. $8,672.50
3 380073 Dental Providers/ Dentist/ General Practice Benco Dental Supply Co. $7,570.00
4 403926 Dental Providers/ Dentist A-dec, Inc. $5,430.00
5 429612 Dental Providers/ Dentist PureLife, LLC $5,058.72
6 404935 Dental Providers/ Dentist A-dec, Inc. $5,040.00
7 8601 Dental Providers/ Dentist/ General Practice DentalEZ, Inc. $3,876.35
8 385314 Dental Providers/ Dentist/ General Practice Henry Schein, Inc. $3,789.99
9 389592 Dental Providers/ Dentist/ General Practice Henry Schein, Inc. $3,789.99
In [236]:
education_outliers = get_by_reason(outliers, 'Education').collect()
education_inliers = get_by_reason(inliers, 'Education').collect()
display_info(education_inliers, education_outliers, 'Education', 10)
Physician Specialty Payer Amount
0 165014 Allopathic & Osteopathic Physicians/ Urology KARLSTORZ Endoscopy-America $19,574.44
1 141882 Allopathic & Osteopathic Physicians/ Internal Medicine/ Cardiovascular Disease The Spectranetics Corporation $17,500.00
2 123702 Allopathic & Osteopathic Physicians/ Internal Medicine/ Cardiovascular Disease The Spectranetics Corporation $17,500.00
3 223653 Allopathic & Osteopathic Physicians/ Orthopaedic Surgery Smith & Nephew, Inc. $15,750.00
4 168262 Allopathic & Osteopathic Physicians/ Internal Medicine/ Cardiovascular Disease The Spectranetics Corporation $15,000.00
5 95007 Allopathic & Osteopathic Physicians/ Internal Medicine/ Gastroenterology Olympus America Inc. $13,050.00
6 95007 Allopathic & Osteopathic Physicians/ Internal Medicine/ Gastroenterology Olympus America Inc. $13,050.00
7 221294 Other Service Providers/ Specialist KARLSTORZ Endoscopy-America $12,500.00
8 331174 Allopathic & Osteopathic Physicians/ Internal Medicine Olympus America Inc. $12,285.00
9 299899 Allopathic & Osteopathic Physicians/ Orthopaedic Surgery Smith & Nephew, Inc. $11,666.67
In [237]:
#Compute benford's distribution for first and second digit respectively
benford_1=np.array([0] + [math.log10(1+1.0/i) for i in xrange(1,10)])
benford_2=np.array([ sum([ math.log10(1 + 1.0/(j*10 + i)) for j in xrange(1, 10) ]) for i in xrange(0,10)])
In [238]:
#Return a numpy array of zeros of specified $length except at
#position $index, which has value $value
def array_with_value(length, index, value):
    arr = np.zeros(length)
    arr[index] = value
    return arr

#Perform chi-square test between an expected probability
#distribution and a list of empirical frequencies.
#Returns the chi-square statistic and the p-value for the test.
def goodness_of_fit(emp_counts, expected_probabilities):
    #convert from probabilities to counts
    exp_distr = expected_probabilities*np.sum(emp_counts)
    return stats.chisquare(emp_counts, f_exp=exp_distr)
    
        
#For each (reason, payer) pair compute the first and second digit distribution
#for all payments.  Return a RDD with a ranked list based on likely goodness of fit to the
#distribution of first digits predicted by Benford's "Law".
def benfords_law(min_payments=350):
    """
    Benford's "law" is a rough observation that the distribution of numbers for each digit
    position of certain data fits a specific distribution.  It holds for quite a bit real-world
    data and, thus, has become of interest to forensic accountants.
    
    This function computes the distribution of first and second digits for each (reason, payer) pair
    and ranks them by goodness of fit to Benford's Law based on the first digit distribution.
    In particular, the goodness of fit metric that it is ranked by is kullback-liebler divergence, but
    chi-squared goodness of fit test is performed and the results are cached.
    """
    
    #We use this one quite a bit in reducers, so it's nice to have it handy here
    sum_values = lambda x,y:x+y

    #Project out the reason, payer, amount, and amount_str, throwing away values < 10
    #since they don't have 2nd digits.  This probably skews the results, so in real-life, I'd
    #not throw out entries so cavalierly, but for the purpose of simplicity, I've done it here.
    
    #Also, we're pulling out the first and second digits here
    reason_payer_amount_info = sqlContext.sql("""select reason, payer, amount, amount_str 
                                                 from payments
                                              """)\
                                    .filter(lambda t:len(t[0]) > 3 and t[2] > 9.99) \
                                    .map(lambda t: ( (t[1], t[0]) \
                                                   ,dict( payer=t[1]\
                                                        , reason=t[0] \
                                                        , first_digit=t[3][0] \
                                                        , second_digit=t[3][1] \
                                                        )\
                                                   )\
                                        )
    reason_payer_amount_info.take(1)
    
    #filter out the reason / payer combos that have fewer payments than the minimum number of payments                       
    reason_payer_count = reason_payer_amount_info.map(lambda t: (t[0],1)) \
                                                 .reduceByKey(sum_values) \
                                                 .filter(lambda t: t[1] > min_payments)
                                              
    #inner join with the reason/payer's that fit the count requirement and annotate value with the num payments
    reason_payer_digits = reason_payer_amount_info.join(reason_payer_count) \
                                                  .map(lambda t: (t[0] \
                                                                 , annotate_dict(join_lhs(t)\
                                                                                , 'num_payments'\
                                                                                , join_rhs(t)\
                                                                                )\
                                                                  )\
                                                       )
    #compute the first digit distribution.
    #First we count each of the 9 possible first digits, then we translate that count into a vector of dimension 10
    #with count for digit i in position i.  We then sum those vectors, thereby getting the full frequency
    #per digit.
    first_digit_distribution = reason_payer_digits.map(lambda t: ( (t[0], t[1]['first_digit'] ) , 1) ) \
                                                       .reduceByKey(sum_values) \
                                                       .map(lambda t: (t[0][0]\
                                                                      , array_with_value(10\
                                                                                        , int(t[0][1])\
                                                                                        , t[1]\
                                                                                        )\
                                                                      )\
                                                            ) \
                                                       .reduceByKey(sum_values)
    #same thing with the 2nd digit
    second_digit_distribution = reason_payer_digits.map(lambda t: ( (t[0], t[1]['second_digit']) , 1) ) \
                                                       .reduceByKey(sum_values) \
                                                       .map(lambda t: (t[0][0]\
                                                                      , array_with_value(10\
                                                                                        , int(t[0][1])\
                                                                                        , t[1]\
                                                                                        )\
                                                                      )\
                                                            ) \
                                                       .reduceByKey(lambda x,y:np.array(x) + np.array(y))
    #we join the two, compute the goodness of fit based on chi-square test and the distance from benford's
    #distribution based on kl divergence.  Finally we sort by kl-divergence ascending (good fits come first).
    return \
    first_digit_distribution.join(second_digit_distribution) \
                            .map(lambda t : (t[0], dict( payer=t[0][0] \
                                                       , reason=t[0][1] \
                                                       , first_digit_distr=join_lhs(t) \
                                                       , second_digit_distr=join_rhs(t) \
                                                       , first_digit_fit = goodness_of_fit(join_lhs(t)[1:10] \
                                                                                          , benford_1[1:10] \
                                                                                          ) \
                                                       , second_digit_fit = goodness_of_fit(join_rhs(t), benford_2) \
                                                       , kl_divergence=stats.entropy( benford_1[1:10], join_lhs(t)[1:10])
                                                       ) \
                                             ) \
                                 ) \
                             .map(lambda t : (t[1]['kl_divergence'], t[1]) )\
                             .sortByKey(True) \
                             .map(lambda t : ( (t[1]['payer'], t[1]['reason']), t[1]) )
benford_data = benfords_law(400)                   
In [239]:
#Plot the distribution of first and second digit side-by-side for a set of payers.
def plot_figure(title,entries):
    num_rows = len(entries)
    fig, axes = plt.subplots(nrows=len(entries), ncols=2, figsize=(12,12))
    fig.tight_layout()
    plt.subplots_adjust(top=0.91, hspace=0.55, wspace=0.3)
    fig.suptitle(title, size=14)
    
    bar_width = .4
    
    for i,entry in enumerate(entries):
        first_ax = axes[i][0]
        first_digit_distr = entry[1]['first_digit_distr'][1:10]
        sample_label_1 = """$\chi$={}, $p$={}, kl={}, n={}""".format(locale.format('%0.2f'\
                                                                                  , float(entry[1]['first_digit_fit'][0])\
                                                                                  ) \
                                                                    , locale.format('%0.2f'\
                                                                                   , float(entry[1]['first_digit_fit'][1])\
                                                                                   )\
                                                                    , locale.format('%0.2f'\
                                                                                   , float(entry[1]['kl_divergence'])\
                                                                                   )\
                                                                    , int(np.sum(first_digit_distr))\
                                                                    )
        first_digit_distr = first_digit_distr/np.sum(first_digit_distr)
        
        first_ax.bar(np.arange(1,10) ,first_digit_distr, alpha=0.35, color='blue', width=bar_width, label="Sample")
        first_ax.bar(np.arange(1,10)+bar_width ,benford_1[1:10], alpha=0.35, color='red', width=bar_width, label='Benford')
        first_ax.set_xticks(np.arange(1,10))
        first_ax.legend()
        first_ax.grid()
        
        first_ax.set_ylabel('Probability')
        first_ax.set_title("{} First Digit\n{}".format(entry[0][0].encode('ascii', 'ignore'), sample_label_1))
        
        second_ax = axes[i][1]
        second_digit_distr = entry[1]['second_digit_distr']
        sample_label_2 = '$\chi$={}, $p$={}, n={}'.format(locale.format('%0.2f', float(entry[1]['second_digit_fit'][0])) \
                                                 , locale.format('%0.2f', float(entry[1]['second_digit_fit'][1]))\
                                                 , int(np.sum(second_digit_distr))
                                                 )
        second_digit_distr = second_digit_distr/np.sum(second_digit_distr)
        
        second_ax.bar(np.arange(0,10) ,second_digit_distr, alpha=0.35, color='blue', width=bar_width, label="Sample")
        second_ax.bar(np.arange(0,10) + bar_width,benford_2, alpha=0.35, color='red', width=bar_width, label='Benford')
        second_ax.set_xticks(np.arange(0,10))
        second_ax.legend()
        second_ax.grid()
        second_ax.set_ylabel('Probability')
        second_ax.set_title("{} Second Digit\n{}".format(entry[0][0].encode('ascii', 'ignore'), sample_label_2))
        
#Take n-worst or best (depending on t) entries for reason based on goodness of fit for benford's law
#and plot the first/second digit distributions versus benford's distribution side-by-side
#as well as the distribution of kl-divergences.
def benford_summary(reason, data = benford_data, n=5, t='best'):
    raw_data = data.filter(lambda t:t[0][1] == reason).collect()
    s = []
    if t == 'best':
        s=raw_data[0:n]
        plot_figure("Top 5 Best Fitting Benford Analysis for {}".format(reason), s)
    else:
        s=raw_data[-n:][::-1]
        plot_figure("Top 5 Worst Fitting Benford Analysis for {}".format(reason), s)
    plot_outliers([d[1]['kl_divergence'] for d in raw_data], [d[1]['kl_divergence'] for d in s], reason + " KL Divergence")
In [240]:
# Gift Benford Analysis (Best)
benford_summary('Gift', t='best')
In [241]:
benford_summary('Gift', t='worst')
In [242]:
# Travel and Lodging Benford Analysis
benford_summary("Travel and Lodging")
In [243]:
benford_summary("Travel and Lodging", t='worst')
In [244]:
benford_summary("Consulting Fee")
In [245]:
benford_summary("Consulting Fee", t='worst')