#!/usr/bin/env python # coding: utf-8 # # Tracking Provenance of an ML Pipeline # # This notebook provides an example of how the provenance of an ML pipeline can be recorded. We use the loan scenario from https://explain.openprovenance.org/loan/. # # The original dataset (`accepted_2007_to_2018Q4.csv.gz`) can be downloaded from https://www.kaggle.com/wordsforthewise/lending-club. # ## Importing required packages # In[1]: # Packages from Python from datetime import datetime import hashlib import os from pathlib import Path import pickle import platform import random import time import timeit # In[2]: # ML packages import numpy as np import pandas as pd from sklearn.tree import DecisionTreeClassifier from sklearn.pipeline import Pipeline from sklearn.impute import SimpleImputer # In[3]: # Provenance packages from prov.model import ProvDocument, Namespace, PROV, PROV_TYPE, PROV_VALUE, ProvEntity, ProvAgent from prov.dot import prov_to_dot # ## Setting up namespaces for identifiers # In[4]: PD_NS = Namespace('pd', 'https://pandas.pydata.org/#') PY_NS = Namespace('py', 'urn:python:var:') SK_NS = Namespace('sk', 'https://scikit-learn.org/stable/modules/generated/sklearn.') LN_NS = Namespace('ln', 'https://plead-project.org/ns/loan#') PL_NS = Namespace('pl', 'https://plead-project.org/ns/plead#') NAMESPACES = [ PD_NS, PY_NS, SK_NS, LN_NS, PL_NS, Namespace('file', 'file://'), Namespace('ex', 'http://example/org') ] # ## Convenient funtions # In[5]: # A context class to measure elapsed time class Timer: def __init__(self, timer=None, disable_gc=False, verbose=True, msg_template='Time taken: %f seconds'): if timer is None: timer = timeit.default_timer self.timer = timer self.disable_gc = disable_gc self.gc_state = None self.verbose = verbose self.msg_template = msg_template self.start = self.end = self.interval = None def __enter__(self): if self.disable_gc: self.gc_state = gc.isenabled() gc.disable() self.start = self.timer() return self def __exit__(self, *args): self.end = self.timer() if self.disable_gc and self.gc_state: gc.enable() self.gc_state = None self.interval = self.end - self.start if self.verbose: print(self.msg_template % self.interval) # In[6]: def sha256(filename): hash_sha256 = hashlib.sha256() with open(filename, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() # ## Definition of a file entity # In[7]: def get_file_entity(prov_doc: ProvDocument, filepath) -> ProvEntity: sha256_digest = sha256(filepath) file_stats = os.stat(filepath) e_file = prov_doc.entity( 'file:' + str(filepath), { 'prov:type': LN_NS['File'], 'ln:filesize': file_stats.st_size, 'ln:sha256': sha256_digest, 'ln:created_at': datetime.fromtimestamp(file_stats.st_birthtime) } ) return e_file # ## Definition of a 'machine' agent # In[8]: def get_agent_machine(prov_doc: ProvDocument) -> ProvAgent: uname_result = platform.uname() ag_machine = prov_doc.agent( 'ex:machine/' + uname_result.node, { 'prov:type': PROV['SoftwareAgent'], 'ln:machine_system': uname_result.system, 'ln:machine_release': uname_result.release, 'ln:machine_version': uname_result.version, 'ln:machine_python_version': platform.python_version() } ) return ag_machine # ## Creating the pipeline and recording the provenance # In[9]: # we use this unique number to make our identifiers unique for this session session_id = int(time.time()) # Initialising the provenance document prov_doc = ProvDocument(namespaces=NAMESPACES) # ### Importing source data # In[10]: csv_filepath = 'lending-club/accepted_2007_to_2018Q4.csv.gz' e_loans_csv_file = get_file_entity(prov_doc, csv_filepath) loans = pd.read_csv(csv_filepath, compression='gzip', low_memory=False) n_rows, n_cols = loans.shape # In[11]: # Checking the initial dataset loans.info() # In[12]: # Adding attributes we got from the dataset e_loans_csv_file.add_attributes({'ln:n_rows': n_rows, 'ln:n_cols': n_cols}) # ### Filtering data according to requirements # In[13]: startTime = datetime.now() # record the start time of this step with Timer(): print('Filtering the original dataset') print('- Only keep "Fully Paid" and "Charged Off"') loans = loans.loc[loans['loan_status'].isin(['Fully Paid', 'Charged Off'])] print('- New dimensions: ', loans.shape) print('- Drop features missing more than 30% data...') missing_fractions = loans.isnull().mean().sort_values(ascending=False) print(missing_fractions.head(10)) drop_list = sorted(list(missing_fractions[missing_fractions > 0.3].index)) print(drop_list) loans.drop(labels=drop_list, axis=1, inplace=True) print('Current dimensions: ', loans.shape) print('Only keep loan features known to potential investors:') keep_list = [ 'addr_state', 'annual_inc', 'application_type', 'dti', 'earliest_cr_line', 'emp_length', 'emp_title', 'fico_range_high', 'fico_range_low', 'grade', 'home_ownership', 'id', 'initial_list_status', 'installment', 'int_rate', 'issue_d', 'loan_amnt', 'loan_status', 'mort_acc', 'open_acc', 'pub_rec', 'pub_rec_bankruptcies', 'purpose', 'revol_bal', 'revol_util', 'sub_grade', 'term', 'title', 'total_acc', 'verification_status', 'zip_code' ] drop_list = [col for col in loans.columns if col not in keep_list] print(drop_list) loans.drop(labels=drop_list, axis=1, inplace=True) n_rows, n_cols = loans.shape print('Current dimensions: ', loans.shape) # Saving a snapshot of the filtered dataset filtered_filepath = 'loans_filtered.xz' loans.to_pickle(filtered_filepath) endTime = datetime.now() # record the end time of this step # In[14]: # Recording the person who did the filtering ag_engineer = prov_doc.agent( 'ex:staff/259', [ (PROV_TYPE, PROV['Person']), (PROV_TYPE, LN_NS['DataEngineer']) ] ) ag_institution = prov_doc.agent( 'ex:institution', [ (PROV_TYPE, PROV['Organization']), (PROV_TYPE, LN_NS['CreditProvider']), ] ) a_filtering = prov_doc.activity( f'ex:ml/filtering/{session_id}', startTime, endTime, { PROV_TYPE: PL_NS['SelectingData'] } ) a_filtering.used(e_loans_csv_file) a_filtering.wasAssociatedWith(ag_engineer) ag_engineer.actedOnBehalfOf(ag_institution) # Recording the provenance of the filtered dataset e_loans_filtered = get_file_entity(prov_doc, filtered_filepath) e_loans_filtered.add_attributes({'ln:n_rows': n_rows, 'ln:n_cols': n_cols}) e_loans_filtered.wasGeneratedBy(a_filtering) e_loans_filtered.wasDerivedFrom(e_loans_csv_file) e_loans_filtered.wasAttributedTo(ag_engineer) # ### Preprocessing the filtered dataset # In[15]: startTime = datetime.now() with Timer(): print('*** Preprocessing and dropping specific columns: ***') print('- Convert loan term (36/60 months) to number') loans['term'] = loans['term'].apply(lambda s: np.int8(s.split()[0])) print('- The grade is implied by the subgrade, drop the grade column') loans.drop('grade', axis=1, inplace=True) print('- There are too many different job titles for this feature to be useful, so we drop it.') loans.drop(labels='emp_title', axis=1, inplace=True) print('- Convert emp_length to integers:') loans['emp_length'].replace(to_replace='10+ years', value='10 years', inplace=True) loans['emp_length'].replace('< 1 year', '0 years', inplace=True) emp_length_to_int = lambda s: s if pd.isnull(s) else np.int8(s.split()[0]) loans['emp_length'] = loans['emp_length'].apply(emp_length_to_int) print(loans['emp_length'].value_counts(dropna=False).sort_index()) print('- Replace the home_ownership values ANY and NONE with OTHER') loans['home_ownership'].replace(['NONE', 'ANY'], 'OTHER', inplace=True) print(loans['home_ownership'].value_counts(dropna=False)) print('- Annual income ranges from 0 to 10,000,000, with a median of 65,000. Because of the large range of incomes, we should take a log transform of the annual income variable.') loans['log_annual_inc'] = loans['annual_inc'].apply(lambda x: np.log10(x+1)) loans.drop('annual_inc', axis=1, inplace=True) print('- There are 61,665 different titles in the dataset, and based on the top 10 titles, the purpose variable appears to already contain this information. So we drop the title variable.') loans.drop('title', axis=1, inplace=True) print('- There are too many different zip codes, so just keep the state column.') loans.drop(labels='zip_code', axis=1, inplace=True) print('- Just retain the year number for earliest_cr_line') loans['earliest_cr_line'] = loans['earliest_cr_line'].apply(lambda s: int(s[-4:])) print('- We only need to keep one of the FICO scores. We take the average of fico_range_low and fico_range_high and call it fico_score:') loans['fico_score'] = 0.5*loans['fico_range_low'] + 0.5*loans['fico_range_high'] loans.drop(['fico_range_high', 'fico_range_low'], axis=1, inplace=True) print('- We take a log transform of the total credit revolving balance variable.') loans['log_revol_bal'] = loans['revol_bal'].apply(lambda x: np.log10(x + 1)) loans.drop('revol_bal', axis=1, inplace=True) print('Current dimensions: ', loans.shape) print('*** Data transformation ***') print('- Convert loan status to 0/1 charge-off flag') loans['charged_off'] = (loans['loan_status'] == 'Charged Off').apply(np.uint8) loans.drop('loan_status', axis=1, inplace=True) missing_fractions = loans.isnull().mean().sort_values(ascending=False) # Fraction of data missing for each variable print('- Checking for missing data:') print(missing_fractions[missing_fractions > 0]) # Print variables that are missing data print('- All remaining columns:') print(loans.columns) print('- Introduce dummy categories') loans = pd.get_dummies( loans, columns=[ 'sub_grade', 'home_ownership', 'verification_status', 'purpose', 'addr_state', 'initial_list_status', 'application_type' ], drop_first=True ) print('Current dimensions: ', loans.shape) print('- Converting issue_d to datetime') loans['issue_d'] = pd.to_datetime(loans['issue_d']) # Saving a snapshot of the processed data processed_filepath = 'loans_processed.xz' loans.to_pickle(processed_filepath) endTime = datetime.now() # In[16]: # Recording the provenance of the above step a_transforming = prov_doc.activity( f'ex:ml/preprocessing/{session_id}', startTime, endTime, { PROV_TYPE: PL_NS['TransformingData'] } ) a_transforming.used(e_loans_filtered) a_transforming.wasAssociatedWith(ag_engineer) # the same engineer did this, reusing the agent `ag_engineer` # Recording the provenance of the filtered dataset e_loans_processed = get_file_entity(prov_doc, processed_filepath) e_loans_processed.add_attributes({'ln:n_rows': loans.shape[0], 'ln:n_cols': loans.shape[1]}) e_loans_processed.wasGeneratedBy(a_transforming) e_loans_processed.wasDerivedFrom(e_loans_filtered) e_loans_processed.wasAttributedTo(ag_engineer) # ### Splitting train and test data # In[17]: startTime = datetime.now() with Timer(): print('*** Train/Test Data Spliting ***') loans_train = loans.loc[loans['issue_d'] < loans['issue_d'].quantile(0.9)] loans_test = loans.loc[loans['issue_d'] >= loans['issue_d'].quantile(0.9)] print('- Number of loans in the partitions: ', loans_train.shape[0] + loans_test.shape[0]) print('- Number of loans in the full dataset:', loans.shape[0]) print('- Test/Train ratio:', loans_test.shape[0] / loans.shape[0]) del loans train_filepath = 'loans_train.xz' test_filepath = 'loans_test.xz' loans_train.to_pickle(train_filepath) loans_test.to_pickle(test_filepath) endTime = datetime.now() # In[18]: # Recording the provenance of the above step a_splitting = prov_doc.activity( f'ex:ml/splitting/{session_id}', startTime, endTime, { PROV_TYPE: PL_NS['SplittingTestData'] } ) a_splitting.used(e_loans_processed) a_splitting.wasAssociatedWith(ag_engineer) # reusing the same agent `ag_engineer` # provenance of the train data e_loans_train = get_file_entity(prov_doc, train_filepath) e_loans_train.add_attributes({'ln:n_rows': loans_train.shape[0], 'ln:n_cols': loans_train.shape[1]}) e_loans_train.wasGeneratedBy(a_splitting) e_loans_train.wasDerivedFrom(e_loans_filtered) e_loans_train.wasAttributedTo(ag_engineer) # provenance of the test data e_loans_test = get_file_entity(prov_doc, test_filepath) e_loans_test.add_attributes({'ln:n_rows': loans_test.shape[0], 'ln:n_cols': loans_test.shape[1]}) e_loans_test.wasGeneratedBy(a_splitting) e_loans_test.wasDerivedFrom(e_loans_filtered) e_loans_test.wasAttributedTo(ag_engineer) # ### Training the pipeline # In[19]: startTime = datetime.now() with Timer(): print('*** Pipeline Training ***') loans_train.drop('issue_d', axis=1, inplace=True) loans_test.drop('issue_d', axis=1, inplace=True) print('- IDs are all unique, hence not useful for predicting loan status') loans_train.drop('id', axis=1, inplace=True) loans_test.drop('id', axis=1, inplace=True) y_train = loans_train['charged_off'] y_test = loans_test['charged_off'] X_train = loans_train.drop('charged_off', axis=1) X_test = loans_test.drop('charged_off', axis=1) del loans_train, loans_test dt_pipeline = Pipeline([ ('imputer', SimpleImputer(copy=False)), ('model', DecisionTreeClassifier(max_depth=5)) ]) dt_pipeline.fit(X_train, y_train) endTime = datetime.now() # In[20]: # Recording the provenance of the above step a_training = prov_doc.activity( f'ex:ml/training/{session_id}', startTime, endTime, { PROV_TYPE: PL_NS['FittingData'] } ) a_training.used(e_loans_train) a_training.wasAssociatedWith(ag_engineer) e_pipeline = prov_doc.entity( f'py:{session_id}/{id(dt_pipeline)}', { # this is the in-memory object of the Pipeline PROV_TYPE: SK_NS['pipeline.Pipeline'] } ) e_pipeline.wasGeneratedBy(a_training) e_pipeline.wasDerivedFrom(e_loans_train) e_pipeline.wasAttributedTo(ag_engineer) # ### Validating the trained pipeline # In[21]: startTime = datetime.now() with Timer(): print('*** Pipeline Validation ***') score = dt_pipeline.score(X_test, y_test) print('- Accuracy score: ', score) endTime = datetime.now() # In[22]: # Recording the provenance of the above step a_validating = prov_doc.activity( f'ex:ml/validating/{session_id}', startTime, endTime, { PROV_TYPE: PL_NS['AssessingPerformance'] } ) a_validating.used(e_loans_test) a_validating.used(e_pipeline) a_validating.wasAssociatedWith(ag_engineer) e_score = prov_doc.entity( f'py:{session_id}/{id(score)}', { PROV_TYPE: PL_NS['AccuracyScore'], PROV_VALUE: score } ) e_score.wasGeneratedBy(a_validating) e_score.wasDerivedFrom(e_loans_test) e_score.wasDerivedFrom(e_pipeline) # ### Simulating a manager's approval of the pipeline # In[23]: startTime = datetime.now() with Timer(): print('*** Pipeline Approval and Saving ***') print('- simulating a human approval') time.sleep(random.random()) approval_time = datetime.now() # Saving the approved pipeline pipeline_filepath = Path('dt_pipeline.pickled') with pipeline_filepath.open('wb') as f: pickle.dump(dt_pipeline, f) endTime = datetime.now() # In[24]: # Recording the provenance of the above step a_approving = prov_doc.activity( f'ex:ml/approving/{session_id}', startTime, endTime, { PROV_TYPE: PL_NS['ApprovingPipeline'] } ) a_approving.used(e_score) # the manager who approved the pipeline for deployment ag_manager = prov_doc.agent( 'ex:staff/37', [ (PROV_TYPE, PROV['Person']), (PROV_TYPE, LN_NS['Manager']) ] ) ag_manager.actedOnBehalfOf(ag_institution) a_approving.wasAssociatedWith(ag_manager) # provenance of the pipeline saved in the previous step e_pipeline_file = get_file_entity(prov_doc, pipeline_filepath) e_pipeline_file.wasGeneratedBy(a_approving) e_pipeline_file.wasDerivedFrom(e_pipeline) # a record of the approval is also recorded e_approval_record = prov_doc.entity( f'ex:records/{session_id}', { PROV_TYPE: LN_NS['ApprovalRecord'], 'ln:signature': sha256(pipeline_filepath), 'ln:pipeline': e_pipeline_file, }) e_approval_record.wasGeneratedBy(a_approving) e_approval_record.wasDerivedFrom(e_score) e_approval_record.wasAttributedTo(ag_manager) # ## Saving the provenance # In[25]: provenance_filepath = Path("output/training.provn") print('Writing provenance to:', provenance_filepath) with provenance_filepath.open('w') as f: f.write(prov_doc.get_provn()) # Visualise the provenance in a graphical representation dot = prov_to_dot(prov_doc) dot.write_png(provenance_filepath.with_suffix('.png')) dot.write_pdf(provenance_filepath.with_suffix('.pdf'))