This notebook provides an example of how the provenance of an ML pipeline can be recorded. We use the loan scenario from https://explain.openprovenance.org/loan/.
The original dataset (accepted_2007_to_2018Q4.csv.gz
) can be downloaded from https://www.kaggle.com/wordsforthewise/lending-club.
# Packages from Python
from datetime import datetime
import hashlib
import os
from pathlib import Path
import pickle
import platform
import random
import time
import timeit
# ML packages
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
# Provenance packages
from prov.model import ProvDocument, Namespace, PROV, PROV_TYPE, PROV_VALUE, ProvEntity, ProvAgent
from prov.dot import prov_to_dot
PD_NS = Namespace('pd', 'https://pandas.pydata.org/#')
PY_NS = Namespace('py', 'urn:python:var:')
SK_NS = Namespace('sk', 'https://scikit-learn.org/stable/modules/generated/sklearn.')
LN_NS = Namespace('ln', 'https://plead-project.org/ns/loan#')
PL_NS = Namespace('pl', 'https://plead-project.org/ns/plead#')
NAMESPACES = [
PD_NS,
PY_NS,
SK_NS,
LN_NS,
PL_NS,
Namespace('file', 'file://'),
Namespace('ex', 'http://example/org')
]
# A context class to measure elapsed time
class Timer:
def __init__(self, timer=None, disable_gc=False, verbose=True, msg_template='Time taken: %f seconds'):
if timer is None:
timer = timeit.default_timer
self.timer = timer
self.disable_gc = disable_gc
self.gc_state = None
self.verbose = verbose
self.msg_template = msg_template
self.start = self.end = self.interval = None
def __enter__(self):
if self.disable_gc:
self.gc_state = gc.isenabled()
gc.disable()
self.start = self.timer()
return self
def __exit__(self, *args):
self.end = self.timer()
if self.disable_gc and self.gc_state:
gc.enable()
self.gc_state = None
self.interval = self.end - self.start
if self.verbose:
print(self.msg_template % self.interval)
def sha256(filename):
hash_sha256 = hashlib.sha256()
with open(filename, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def get_file_entity(prov_doc: ProvDocument, filepath) -> ProvEntity:
sha256_digest = sha256(filepath)
file_stats = os.stat(filepath)
e_file = prov_doc.entity(
'file:' + str(filepath), {
'prov:type': LN_NS['File'], 'ln:filesize': file_stats.st_size,
'ln:sha256': sha256_digest,
'ln:created_at': datetime.fromtimestamp(file_stats.st_birthtime)
}
)
return e_file
def get_agent_machine(prov_doc: ProvDocument) -> ProvAgent:
uname_result = platform.uname()
ag_machine = prov_doc.agent(
'ex:machine/' + uname_result.node, {
'prov:type': PROV['SoftwareAgent'],
'ln:machine_system': uname_result.system,
'ln:machine_release': uname_result.release,
'ln:machine_version': uname_result.version,
'ln:machine_python_version': platform.python_version()
}
)
return ag_machine
# we use this unique number to make our identifiers unique for this session
session_id = int(time.time())
# Initialising the provenance document
prov_doc = ProvDocument(namespaces=NAMESPACES)
csv_filepath = 'lending-club/accepted_2007_to_2018Q4.csv.gz'
e_loans_csv_file = get_file_entity(prov_doc, csv_filepath)
loans = pd.read_csv(csv_filepath, compression='gzip', low_memory=False)
n_rows, n_cols = loans.shape
# Checking the initial dataset
loans.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 2260701 entries, 0 to 2260700 Columns: 151 entries, id to settlement_term dtypes: float64(113), object(38) memory usage: 2.5+ GB
# Adding attributes we got from the dataset
e_loans_csv_file.add_attributes({'ln:n_rows': n_rows, 'ln:n_cols': n_cols})
startTime = datetime.now() # record the start time of this step
with Timer():
print('Filtering the original dataset')
print('- Only keep "Fully Paid" and "Charged Off"')
loans = loans.loc[loans['loan_status'].isin(['Fully Paid', 'Charged Off'])]
print('- New dimensions: ', loans.shape)
print('- Drop features missing more than 30% data...')
missing_fractions = loans.isnull().mean().sort_values(ascending=False)
print(missing_fractions.head(10))
drop_list = sorted(list(missing_fractions[missing_fractions > 0.3].index))
print(drop_list)
loans.drop(labels=drop_list, axis=1, inplace=True)
print('Current dimensions: ', loans.shape)
print('Only keep loan features known to potential investors:')
keep_list = [
'addr_state', 'annual_inc', 'application_type', 'dti',
'earliest_cr_line', 'emp_length', 'emp_title',
'fico_range_high', 'fico_range_low', 'grade',
'home_ownership', 'id', 'initial_list_status',
'installment', 'int_rate', 'issue_d', 'loan_amnt', 'loan_status',
'mort_acc', 'open_acc', 'pub_rec', 'pub_rec_bankruptcies',
'purpose', 'revol_bal', 'revol_util', 'sub_grade',
'term', 'title', 'total_acc', 'verification_status', 'zip_code'
]
drop_list = [col for col in loans.columns if col not in keep_list]
print(drop_list)
loans.drop(labels=drop_list, axis=1, inplace=True)
n_rows, n_cols = loans.shape
print('Current dimensions: ', loans.shape)
# Saving a snapshot of the filtered dataset
filtered_filepath = 'loans_filtered.xz'
loans.to_pickle(filtered_filepath)
endTime = datetime.now() # record the end time of this step
Filtering the original dataset - Only keep "Fully Paid" and "Charged Off" - New dimensions: (1345310, 151) - Drop features missing more than 30% data... member_id 1.000000 next_pymnt_d 1.000000 orig_projected_additional_accrued_interest 0.997206 hardship_start_date 0.995723 hardship_end_date 0.995723 payment_plan_start_date 0.995723 hardship_length 0.995723 hardship_dpd 0.995723 hardship_loan_status 0.995723 hardship_last_payment_amount 0.995723 dtype: float64 ['all_util', 'annual_inc_joint', 'debt_settlement_flag_date', 'deferral_term', 'desc', 'dti_joint', 'hardship_amount', 'hardship_dpd', 'hardship_end_date', 'hardship_last_payment_amount', 'hardship_length', 'hardship_loan_status', 'hardship_payoff_balance_amount', 'hardship_reason', 'hardship_start_date', 'hardship_status', 'hardship_type', 'il_util', 'inq_fi', 'inq_last_12m', 'max_bal_bc', 'member_id', 'mths_since_last_delinq', 'mths_since_last_major_derog', 'mths_since_last_record', 'mths_since_rcnt_il', 'mths_since_recent_bc_dlq', 'mths_since_recent_revol_delinq', 'next_pymnt_d', 'open_acc_6m', 'open_act_il', 'open_il_12m', 'open_il_24m', 'open_rv_12m', 'open_rv_24m', 'orig_projected_additional_accrued_interest', 'payment_plan_start_date', 'revol_bal_joint', 'sec_app_chargeoff_within_12_mths', 'sec_app_collections_12_mths_ex_med', 'sec_app_earliest_cr_line', 'sec_app_fico_range_high', 'sec_app_fico_range_low', 'sec_app_inq_last_6mths', 'sec_app_mort_acc', 'sec_app_mths_since_last_major_derog', 'sec_app_num_rev_accts', 'sec_app_open_acc', 'sec_app_open_act_il', 'sec_app_revol_util', 'settlement_amount', 'settlement_date', 'settlement_percentage', 'settlement_status', 'settlement_term', 'total_bal_il', 'total_cu_tl', 'verification_status_joint'] Current dimensions: (1345310, 93) Only keep loan features known to potential investors: ['funded_amnt', 'funded_amnt_inv', 'pymnt_plan', 'url', 'delinq_2yrs', 'inq_last_6mths', 'out_prncp', 'out_prncp_inv', 'total_pymnt', 'total_pymnt_inv', 'total_rec_prncp', 'total_rec_int', 'total_rec_late_fee', 'recoveries', 'collection_recovery_fee', 'last_pymnt_d', 'last_pymnt_amnt', 'last_credit_pull_d', 'last_fico_range_high', 'last_fico_range_low', 'collections_12_mths_ex_med', 'policy_code', 'acc_now_delinq', 'tot_coll_amt', 'tot_cur_bal', 'total_rev_hi_lim', 'acc_open_past_24mths', 'avg_cur_bal', 'bc_open_to_buy', 'bc_util', 'chargeoff_within_12_mths', 'delinq_amnt', 'mo_sin_old_il_acct', 'mo_sin_old_rev_tl_op', 'mo_sin_rcnt_rev_tl_op', 'mo_sin_rcnt_tl', 'mths_since_recent_bc', 'mths_since_recent_inq', 'num_accts_ever_120_pd', 'num_actv_bc_tl', 'num_actv_rev_tl', 'num_bc_sats', 'num_bc_tl', 'num_il_tl', 'num_op_rev_tl', 'num_rev_accts', 'num_rev_tl_bal_gt_0', 'num_sats', 'num_tl_120dpd_2m', 'num_tl_30dpd', 'num_tl_90g_dpd_24m', 'num_tl_op_past_12m', 'pct_tl_nvr_dlq', 'percent_bc_gt_75', 'tax_liens', 'tot_hi_cred_lim', 'total_bal_ex_mort', 'total_bc_limit', 'total_il_high_credit_limit', 'hardship_flag', 'disbursement_method', 'debt_settlement_flag'] Current dimensions: (1345310, 31) Time taken: 4.923831 seconds
# Recording the person who did the filtering
ag_engineer = prov_doc.agent(
'ex:staff/259', [
(PROV_TYPE, PROV['Person']),
(PROV_TYPE, LN_NS['DataEngineer'])
]
)
ag_institution = prov_doc.agent(
'ex:institution', [
(PROV_TYPE, PROV['Organization']),
(PROV_TYPE, LN_NS['CreditProvider']),
]
)
a_filtering = prov_doc.activity(
f'ex:ml/filtering/{session_id}', startTime, endTime, {
PROV_TYPE: PL_NS['SelectingData']
}
)
a_filtering.used(e_loans_csv_file)
a_filtering.wasAssociatedWith(ag_engineer)
ag_engineer.actedOnBehalfOf(ag_institution)
# Recording the provenance of the filtered dataset
e_loans_filtered = get_file_entity(prov_doc, filtered_filepath)
e_loans_filtered.add_attributes({'ln:n_rows': n_rows, 'ln:n_cols': n_cols})
e_loans_filtered.wasGeneratedBy(a_filtering)
e_loans_filtered.wasDerivedFrom(e_loans_csv_file)
e_loans_filtered.wasAttributedTo(ag_engineer)
<ProvEntity: file:loans_filtered.xz>
startTime = datetime.now()
with Timer():
print('*** Preprocessing and dropping specific columns: ***')
print('- Convert loan term (36/60 months) to number')
loans['term'] = loans['term'].apply(lambda s: np.int8(s.split()[0]))
print('- The grade is implied by the subgrade, drop the grade column')
loans.drop('grade', axis=1, inplace=True)
print('- There are too many different job titles for this feature to be useful, so we drop it.')
loans.drop(labels='emp_title', axis=1, inplace=True)
print('- Convert emp_length to integers:')
loans['emp_length'].replace(to_replace='10+ years', value='10 years', inplace=True)
loans['emp_length'].replace('< 1 year', '0 years', inplace=True)
emp_length_to_int = lambda s: s if pd.isnull(s) else np.int8(s.split()[0])
loans['emp_length'] = loans['emp_length'].apply(emp_length_to_int)
print(loans['emp_length'].value_counts(dropna=False).sort_index())
print('- Replace the home_ownership values ANY and NONE with OTHER')
loans['home_ownership'].replace(['NONE', 'ANY'], 'OTHER', inplace=True)
print(loans['home_ownership'].value_counts(dropna=False))
print('- Annual income ranges from 0 to 10,000,000, with a median of 65,000. Because of the large range of incomes, we should take a log transform of the annual income variable.')
loans['log_annual_inc'] = loans['annual_inc'].apply(lambda x: np.log10(x+1))
loans.drop('annual_inc', axis=1, inplace=True)
print('- There are 61,665 different titles in the dataset, and based on the top 10 titles, the purpose variable appears to already contain this information. So we drop the title variable.')
loans.drop('title', axis=1, inplace=True)
print('- There are too many different zip codes, so just keep the state column.')
loans.drop(labels='zip_code', axis=1, inplace=True)
print('- Just retain the year number for earliest_cr_line')
loans['earliest_cr_line'] = loans['earliest_cr_line'].apply(lambda s: int(s[-4:]))
print('- We only need to keep one of the FICO scores. We take the average of fico_range_low and fico_range_high and call it fico_score:')
loans['fico_score'] = 0.5*loans['fico_range_low'] + 0.5*loans['fico_range_high']
loans.drop(['fico_range_high', 'fico_range_low'], axis=1, inplace=True)
print('- We take a log transform of the total credit revolving balance variable.')
loans['log_revol_bal'] = loans['revol_bal'].apply(lambda x: np.log10(x + 1))
loans.drop('revol_bal', axis=1, inplace=True)
print('Current dimensions: ', loans.shape)
print('*** Data transformation ***')
print('- Convert loan status to 0/1 charge-off flag')
loans['charged_off'] = (loans['loan_status'] == 'Charged Off').apply(np.uint8)
loans.drop('loan_status', axis=1, inplace=True)
missing_fractions = loans.isnull().mean().sort_values(ascending=False) # Fraction of data missing for each variable
print('- Checking for missing data:')
print(missing_fractions[missing_fractions > 0]) # Print variables that are missing data
print('- All remaining columns:')
print(loans.columns)
print('- Introduce dummy categories')
loans = pd.get_dummies(
loans,
columns=[
'sub_grade', 'home_ownership', 'verification_status',
'purpose', 'addr_state', 'initial_list_status', 'application_type'
], drop_first=True
)
print('Current dimensions: ', loans.shape)
print('- Converting issue_d to datetime')
loans['issue_d'] = pd.to_datetime(loans['issue_d'])
# Saving a snapshot of the processed data
processed_filepath = 'loans_processed.xz'
loans.to_pickle(processed_filepath)
endTime = datetime.now()
*** Preprocessing and dropping specific columns: *** - Convert loan term (36/60 months) to number - The grade is implied by the subgrade, drop the grade column - There are too many different job titles for this feature to be useful, so we drop it. - Convert emp_length to integers: 0.0 108061 1.0 88494 2.0 121743 3.0 107597 4.0 80556 5.0 84154 6.0 62733 7.0 59624 8.0 60701 9.0 50937 10.0 442199 NaN 78511 Name: emp_length, dtype: int64 - Replace the home_ownership values ANY and NONE with OTHER MORTGAGE 665579 RENT 534421 OWN 144832 OTHER 478 Name: home_ownership, dtype: int64 - Annual income ranges from 0 to 10,000,000, with a median of 65,000. Because of the large range of incomes, we should take a log transform of the annual income variable. - There are 61,665 different titles in the dataset, and based on the top 10 titles, the purpose variable appears to already contain this information. So we drop the title variable. - There are too many different zip codes, so just keep the state column. - Just retain the year number for earliest_cr_line - We only need to keep one of the FICO scores. We take the average of fico_range_low and fico_range_high and call it fico_score: - We take a log transform of the total credit revolving balance variable. Current dimensions: (1345310, 26) *** Data transformation *** - Convert loan status to 0/1 charge-off flag - Checking for missing data: emp_length 0.058359 mort_acc 0.035145 revol_util 0.000637 pub_rec_bankruptcies 0.000518 dti 0.000278 dtype: float64 - All remaining columns: Index(['id', 'loan_amnt', 'term', 'int_rate', 'installment', 'sub_grade', 'emp_length', 'home_ownership', 'verification_status', 'issue_d', 'purpose', 'addr_state', 'dti', 'earliest_cr_line', 'open_acc', 'pub_rec', 'revol_util', 'total_acc', 'initial_list_status', 'application_type', 'mort_acc', 'pub_rec_bankruptcies', 'log_annual_inc', 'fico_score', 'log_revol_bal', 'charged_off'], dtype='object') - Introduce dummy categories Current dimensions: (1345310, 123) - Converting issue_d to datetime Time taken: 13.586206 seconds
# Recording the provenance of the above step
a_transforming = prov_doc.activity(
f'ex:ml/preprocessing/{session_id}', startTime, endTime, {
PROV_TYPE: PL_NS['TransformingData']
}
)
a_transforming.used(e_loans_filtered)
a_transforming.wasAssociatedWith(ag_engineer) # the same engineer did this, reusing the agent `ag_engineer`
# Recording the provenance of the filtered dataset
e_loans_processed = get_file_entity(prov_doc, processed_filepath)
e_loans_processed.add_attributes({'ln:n_rows': loans.shape[0], 'ln:n_cols': loans.shape[1]})
e_loans_processed.wasGeneratedBy(a_transforming)
e_loans_processed.wasDerivedFrom(e_loans_filtered)
e_loans_processed.wasAttributedTo(ag_engineer)
<ProvEntity: file:loans_processed.xz>
startTime = datetime.now()
with Timer():
print('*** Train/Test Data Spliting ***')
loans_train = loans.loc[loans['issue_d'] < loans['issue_d'].quantile(0.9)]
loans_test = loans.loc[loans['issue_d'] >= loans['issue_d'].quantile(0.9)]
print('- Number of loans in the partitions: ', loans_train.shape[0] + loans_test.shape[0])
print('- Number of loans in the full dataset:', loans.shape[0])
print('- Test/Train ratio:', loans_test.shape[0] / loans.shape[0])
del loans
train_filepath = 'loans_train.xz'
test_filepath = 'loans_test.xz'
loans_train.to_pickle(train_filepath)
loans_test.to_pickle(test_filepath)
endTime = datetime.now()
*** Train/Test Data Spliting *** - Number of loans in the partitions: 1345310 - Number of loans in the full dataset: 1345310 - Test/Train ratio: 0.11111639696426846 Time taken: 0.795344 seconds
# Recording the provenance of the above step
a_splitting = prov_doc.activity(
f'ex:ml/splitting/{session_id}', startTime, endTime, {
PROV_TYPE: PL_NS['SplittingTestData']
}
)
a_splitting.used(e_loans_processed)
a_splitting.wasAssociatedWith(ag_engineer) # reusing the same agent `ag_engineer`
# provenance of the train data
e_loans_train = get_file_entity(prov_doc, train_filepath)
e_loans_train.add_attributes({'ln:n_rows': loans_train.shape[0], 'ln:n_cols': loans_train.shape[1]})
e_loans_train.wasGeneratedBy(a_splitting)
e_loans_train.wasDerivedFrom(e_loans_filtered)
e_loans_train.wasAttributedTo(ag_engineer)
# provenance of the test data
e_loans_test = get_file_entity(prov_doc, test_filepath)
e_loans_test.add_attributes({'ln:n_rows': loans_test.shape[0], 'ln:n_cols': loans_test.shape[1]})
e_loans_test.wasGeneratedBy(a_splitting)
e_loans_test.wasDerivedFrom(e_loans_filtered)
e_loans_test.wasAttributedTo(ag_engineer)
<ProvEntity: file:loans_test.xz>
startTime = datetime.now()
with Timer():
print('*** Pipeline Training ***')
loans_train.drop('issue_d', axis=1, inplace=True)
loans_test.drop('issue_d', axis=1, inplace=True)
print('- IDs are all unique, hence not useful for predicting loan status')
loans_train.drop('id', axis=1, inplace=True)
loans_test.drop('id', axis=1, inplace=True)
y_train = loans_train['charged_off']
y_test = loans_test['charged_off']
X_train = loans_train.drop('charged_off', axis=1)
X_test = loans_test.drop('charged_off', axis=1)
del loans_train, loans_test
dt_pipeline = Pipeline([
('imputer', SimpleImputer(copy=False)),
('model', DecisionTreeClassifier(max_depth=5))
])
dt_pipeline.fit(X_train, y_train)
endTime = datetime.now()
*** Pipeline Training *** - IDs are all unique, hence not useful for predicting loan status Time taken: 17.191541 seconds
# Recording the provenance of the above step
a_training = prov_doc.activity(
f'ex:ml/training/{session_id}', startTime, endTime, {
PROV_TYPE: PL_NS['FittingData']
}
)
a_training.used(e_loans_train)
a_training.wasAssociatedWith(ag_engineer)
e_pipeline = prov_doc.entity(
f'py:{session_id}/{id(dt_pipeline)}', { # this is the in-memory object of the Pipeline
PROV_TYPE: SK_NS['pipeline.Pipeline']
}
)
e_pipeline.wasGeneratedBy(a_training)
e_pipeline.wasDerivedFrom(e_loans_train)
e_pipeline.wasAttributedTo(ag_engineer)
<ProvEntity: py:1574096047/4455456720>
startTime = datetime.now()
with Timer():
print('*** Pipeline Validation ***')
score = dt_pipeline.score(X_test, y_test)
print('- Accuracy score: ', score)
endTime = datetime.now()
*** Pipeline Validation *** - Accuracy score: 0.7957935860214335 Time taken: 0.247086 seconds
# Recording the provenance of the above step
a_validating = prov_doc.activity(
f'ex:ml/validating/{session_id}', startTime, endTime, {
PROV_TYPE: PL_NS['AssessingPerformance']
}
)
a_validating.used(e_loans_test)
a_validating.used(e_pipeline)
a_validating.wasAssociatedWith(ag_engineer)
e_score = prov_doc.entity(
f'py:{session_id}/{id(score)}', {
PROV_TYPE: PL_NS['AccuracyScore'],
PROV_VALUE: score
}
)
e_score.wasGeneratedBy(a_validating)
e_score.wasDerivedFrom(e_loans_test)
e_score.wasDerivedFrom(e_pipeline)
<ProvEntity: py:1574096047/5006562704>
startTime = datetime.now()
with Timer():
print('*** Pipeline Approval and Saving ***')
print('- simulating a human approval')
time.sleep(random.random())
approval_time = datetime.now()
# Saving the approved pipeline
pipeline_filepath = Path('dt_pipeline.pickled')
with pipeline_filepath.open('wb') as f:
pickle.dump(dt_pipeline, f)
endTime = datetime.now()
*** Pipeline Approval and Saving *** - simulating a human approval Time taken: 0.711091 seconds
# Recording the provenance of the above step
a_approving = prov_doc.activity(
f'ex:ml/approving/{session_id}', startTime, endTime, {
PROV_TYPE: PL_NS['ApprovingPipeline']
}
)
a_approving.used(e_score)
# the manager who approved the pipeline for deployment
ag_manager = prov_doc.agent(
'ex:staff/37', [
(PROV_TYPE, PROV['Person']),
(PROV_TYPE, LN_NS['Manager'])
]
)
ag_manager.actedOnBehalfOf(ag_institution)
a_approving.wasAssociatedWith(ag_manager)
# provenance of the pipeline saved in the previous step
e_pipeline_file = get_file_entity(prov_doc, pipeline_filepath)
e_pipeline_file.wasGeneratedBy(a_approving)
e_pipeline_file.wasDerivedFrom(e_pipeline)
# a record of the approval is also recorded
e_approval_record = prov_doc.entity(
f'ex:records/{session_id}', {
PROV_TYPE: LN_NS['ApprovalRecord'],
'ln:signature': sha256(pipeline_filepath),
'ln:pipeline': e_pipeline_file,
})
e_approval_record.wasGeneratedBy(a_approving)
e_approval_record.wasDerivedFrom(e_score)
e_approval_record.wasAttributedTo(ag_manager)
<ProvEntity: ex:records/1574096047>
provenance_filepath = Path("output/training.provn")
print('Writing provenance to:', provenance_filepath)
with provenance_filepath.open('w') as f:
f.write(prov_doc.get_provn())
# Visualise the provenance in a graphical representation
dot = prov_to_dot(prov_doc)
dot.write_png(provenance_filepath.with_suffix('.png'))
dot.write_pdf(provenance_filepath.with_suffix('.pdf'))
Writing provenance to: output/training.provn