Tracking Provenance of an ML Pipeline

This notebook provides an example of how the provenance of an ML pipeline can be recorded. We use the loan scenario from https://explain.openprovenance.org/loan/.

The original dataset (accepted_2007_to_2018Q4.csv.gz) can be downloaded from https://www.kaggle.com/wordsforthewise/lending-club.

Importing required packages

In [1]:
# Packages from Python
from datetime import datetime
import hashlib
import os
from pathlib import Path
import pickle
import platform
import random
import time
import timeit
In [2]:
# ML packages
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
In [3]:
# Provenance packages
from prov.model import ProvDocument, Namespace, PROV, PROV_TYPE, PROV_VALUE, ProvEntity, ProvAgent
from prov.dot import prov_to_dot

Setting up namespaces for identifiers

In [4]:
PD_NS = Namespace('pd', 'https://pandas.pydata.org/#')
PY_NS = Namespace('py', 'urn:python:var:')
SK_NS = Namespace('sk', 'https://scikit-learn.org/stable/modules/generated/sklearn.')
LN_NS = Namespace('ln', 'https://plead-project.org/ns/loan#')
PL_NS = Namespace('pl', 'https://plead-project.org/ns/plead#')

NAMESPACES = [
    PD_NS,
    PY_NS,
    SK_NS,
    LN_NS,
    PL_NS,
    Namespace('file', 'file://'),
    Namespace('ex', 'http://example/org')
]

Convenient funtions

In [5]:
# A context class to measure elapsed time

class Timer:
    def __init__(self, timer=None, disable_gc=False, verbose=True, msg_template='Time taken: %f seconds'):
        if timer is None:
            timer = timeit.default_timer
        self.timer = timer
        self.disable_gc = disable_gc
        self.gc_state = None
        self.verbose = verbose
        self.msg_template = msg_template
        self.start = self.end = self.interval = None

    def __enter__(self):
        if self.disable_gc:
            self.gc_state = gc.isenabled()
            gc.disable()
        self.start = self.timer()
        return self

    def __exit__(self, *args):
        self.end = self.timer()
        if self.disable_gc and self.gc_state:
            gc.enable()
            self.gc_state = None
        self.interval = self.end - self.start
        if self.verbose:
            print(self.msg_template % self.interval)
In [6]:
def sha256(filename):
    hash_sha256 = hashlib.sha256()
    with open(filename, "rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            hash_sha256.update(chunk)
    return hash_sha256.hexdigest()

Definition of a file entity

In [7]:
def get_file_entity(prov_doc: ProvDocument, filepath) -> ProvEntity:
    sha256_digest = sha256(filepath)
    file_stats = os.stat(filepath)
    e_file = prov_doc.entity(
        'file:' + str(filepath), {
            'prov:type': LN_NS['File'], 'ln:filesize': file_stats.st_size,
            'ln:sha256': sha256_digest,
            'ln:created_at': datetime.fromtimestamp(file_stats.st_birthtime)
        }
    )

    return e_file

Definition of a 'machine' agent

In [8]:
def get_agent_machine(prov_doc: ProvDocument) -> ProvAgent:
    uname_result = platform.uname()
    ag_machine = prov_doc.agent(
        'ex:machine/' + uname_result.node, {
            'prov:type': PROV['SoftwareAgent'],
            'ln:machine_system': uname_result.system,
            'ln:machine_release': uname_result.release,
            'ln:machine_version': uname_result.version,
            'ln:machine_python_version': platform.python_version()
        }
    )
    return ag_machine

Creating the pipeline and recording the provenance

In [9]:
# we use this unique number to make our identifiers unique for this session
session_id = int(time.time())

# Initialising the provenance document
prov_doc = ProvDocument(namespaces=NAMESPACES)

Importing source data

In [10]:
csv_filepath = 'lending-club/accepted_2007_to_2018Q4.csv.gz'
e_loans_csv_file = get_file_entity(prov_doc, csv_filepath)

loans = pd.read_csv(csv_filepath, compression='gzip', low_memory=False)
n_rows, n_cols = loans.shape
In [11]:
# Checking the initial dataset
loans.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2260701 entries, 0 to 2260700
Columns: 151 entries, id to settlement_term
dtypes: float64(113), object(38)
memory usage: 2.5+ GB
In [12]:
# Adding attributes we got from the dataset
e_loans_csv_file.add_attributes({'ln:n_rows': n_rows, 'ln:n_cols': n_cols})

Filtering data according to requirements

In [13]:
startTime = datetime.now()  # record the start time of this step
with Timer():
    print('Filtering the original dataset')
    print('- Only keep "Fully Paid" and "Charged Off"')
    loans = loans.loc[loans['loan_status'].isin(['Fully Paid', 'Charged Off'])]
    print('- New dimensions: ', loans.shape)

    print('- Drop features missing more than 30% data...')
    missing_fractions = loans.isnull().mean().sort_values(ascending=False)
    print(missing_fractions.head(10))

    drop_list = sorted(list(missing_fractions[missing_fractions > 0.3].index))
    print(drop_list)

    loans.drop(labels=drop_list, axis=1, inplace=True)
    print('Current dimensions: ', loans.shape)

    print('Only keep loan features known to potential investors:')
    keep_list = [
        'addr_state', 'annual_inc', 'application_type', 'dti',
        'earliest_cr_line', 'emp_length', 'emp_title',
        'fico_range_high', 'fico_range_low', 'grade',
        'home_ownership', 'id', 'initial_list_status',
        'installment', 'int_rate', 'issue_d', 'loan_amnt', 'loan_status',
        'mort_acc', 'open_acc', 'pub_rec', 'pub_rec_bankruptcies',
        'purpose', 'revol_bal', 'revol_util', 'sub_grade',
        'term', 'title', 'total_acc', 'verification_status', 'zip_code'
    ]

    drop_list = [col for col in loans.columns if col not in keep_list]
    print(drop_list)

    loans.drop(labels=drop_list, axis=1, inplace=True)
    n_rows, n_cols = loans.shape
    print('Current dimensions: ', loans.shape)

    # Saving a snapshot of the filtered dataset
    filtered_filepath = 'loans_filtered.xz'
    loans.to_pickle(filtered_filepath)

endTime = datetime.now()  # record the end time of this step
Filtering the original dataset
- Only keep "Fully Paid" and "Charged Off"
- New dimensions:  (1345310, 151)
- Drop features missing more than 30% data...
member_id                                     1.000000
next_pymnt_d                                  1.000000
orig_projected_additional_accrued_interest    0.997206
hardship_start_date                           0.995723
hardship_end_date                             0.995723
payment_plan_start_date                       0.995723
hardship_length                               0.995723
hardship_dpd                                  0.995723
hardship_loan_status                          0.995723
hardship_last_payment_amount                  0.995723
dtype: float64
['all_util', 'annual_inc_joint', 'debt_settlement_flag_date', 'deferral_term', 'desc', 'dti_joint', 'hardship_amount', 'hardship_dpd', 'hardship_end_date', 'hardship_last_payment_amount', 'hardship_length', 'hardship_loan_status', 'hardship_payoff_balance_amount', 'hardship_reason', 'hardship_start_date', 'hardship_status', 'hardship_type', 'il_util', 'inq_fi', 'inq_last_12m', 'max_bal_bc', 'member_id', 'mths_since_last_delinq', 'mths_since_last_major_derog', 'mths_since_last_record', 'mths_since_rcnt_il', 'mths_since_recent_bc_dlq', 'mths_since_recent_revol_delinq', 'next_pymnt_d', 'open_acc_6m', 'open_act_il', 'open_il_12m', 'open_il_24m', 'open_rv_12m', 'open_rv_24m', 'orig_projected_additional_accrued_interest', 'payment_plan_start_date', 'revol_bal_joint', 'sec_app_chargeoff_within_12_mths', 'sec_app_collections_12_mths_ex_med', 'sec_app_earliest_cr_line', 'sec_app_fico_range_high', 'sec_app_fico_range_low', 'sec_app_inq_last_6mths', 'sec_app_mort_acc', 'sec_app_mths_since_last_major_derog', 'sec_app_num_rev_accts', 'sec_app_open_acc', 'sec_app_open_act_il', 'sec_app_revol_util', 'settlement_amount', 'settlement_date', 'settlement_percentage', 'settlement_status', 'settlement_term', 'total_bal_il', 'total_cu_tl', 'verification_status_joint']
Current dimensions:  (1345310, 93)
Only keep loan features known to potential investors:
['funded_amnt', 'funded_amnt_inv', 'pymnt_plan', 'url', 'delinq_2yrs', 'inq_last_6mths', 'out_prncp', 'out_prncp_inv', 'total_pymnt', 'total_pymnt_inv', 'total_rec_prncp', 'total_rec_int', 'total_rec_late_fee', 'recoveries', 'collection_recovery_fee', 'last_pymnt_d', 'last_pymnt_amnt', 'last_credit_pull_d', 'last_fico_range_high', 'last_fico_range_low', 'collections_12_mths_ex_med', 'policy_code', 'acc_now_delinq', 'tot_coll_amt', 'tot_cur_bal', 'total_rev_hi_lim', 'acc_open_past_24mths', 'avg_cur_bal', 'bc_open_to_buy', 'bc_util', 'chargeoff_within_12_mths', 'delinq_amnt', 'mo_sin_old_il_acct', 'mo_sin_old_rev_tl_op', 'mo_sin_rcnt_rev_tl_op', 'mo_sin_rcnt_tl', 'mths_since_recent_bc', 'mths_since_recent_inq', 'num_accts_ever_120_pd', 'num_actv_bc_tl', 'num_actv_rev_tl', 'num_bc_sats', 'num_bc_tl', 'num_il_tl', 'num_op_rev_tl', 'num_rev_accts', 'num_rev_tl_bal_gt_0', 'num_sats', 'num_tl_120dpd_2m', 'num_tl_30dpd', 'num_tl_90g_dpd_24m', 'num_tl_op_past_12m', 'pct_tl_nvr_dlq', 'percent_bc_gt_75', 'tax_liens', 'tot_hi_cred_lim', 'total_bal_ex_mort', 'total_bc_limit', 'total_il_high_credit_limit', 'hardship_flag', 'disbursement_method', 'debt_settlement_flag']
Current dimensions:  (1345310, 31)
Time taken: 4.923831 seconds
In [14]:
# Recording the person who did the filtering
ag_engineer = prov_doc.agent(
    'ex:staff/259', [
        (PROV_TYPE, PROV['Person']),
        (PROV_TYPE, LN_NS['DataEngineer'])
    ]
)
ag_institution = prov_doc.agent(
    'ex:institution', [
        (PROV_TYPE, PROV['Organization']),
        (PROV_TYPE, LN_NS['CreditProvider']),
    ]
)

a_filtering = prov_doc.activity(
    f'ex:ml/filtering/{session_id}', startTime, endTime, {
        PROV_TYPE: PL_NS['SelectingData']
    }
)
a_filtering.used(e_loans_csv_file)
a_filtering.wasAssociatedWith(ag_engineer)
ag_engineer.actedOnBehalfOf(ag_institution)

# Recording the provenance of the filtered dataset
e_loans_filtered = get_file_entity(prov_doc, filtered_filepath)
e_loans_filtered.add_attributes({'ln:n_rows': n_rows, 'ln:n_cols': n_cols})
e_loans_filtered.wasGeneratedBy(a_filtering)
e_loans_filtered.wasDerivedFrom(e_loans_csv_file)
e_loans_filtered.wasAttributedTo(ag_engineer)
Out[14]:
<ProvEntity: file:loans_filtered.xz>

Preprocessing the filtered dataset

In [15]:
startTime = datetime.now()
with Timer():
    print('*** Preprocessing and dropping specific columns: ***')

    print('- Convert loan term (36/60 months) to number')
    loans['term'] = loans['term'].apply(lambda s: np.int8(s.split()[0]))

    print('- The grade is implied by the subgrade, drop the grade column')
    loans.drop('grade', axis=1, inplace=True)

    print('- There are too many different job titles for this feature to be useful, so we drop it.')
    loans.drop(labels='emp_title', axis=1, inplace=True)

    print('- Convert emp_length to integers:')
    loans['emp_length'].replace(to_replace='10+ years', value='10 years', inplace=True)
    loans['emp_length'].replace('< 1 year', '0 years', inplace=True)
    emp_length_to_int = lambda s: s if pd.isnull(s) else np.int8(s.split()[0])
    loans['emp_length'] = loans['emp_length'].apply(emp_length_to_int)
    print(loans['emp_length'].value_counts(dropna=False).sort_index())

    print('- Replace the home_ownership values ANY and NONE with OTHER')
    loans['home_ownership'].replace(['NONE', 'ANY'], 'OTHER', inplace=True)
    print(loans['home_ownership'].value_counts(dropna=False))

    print('- Annual income ranges from 0 to 10,000,000, with a median of 65,000. Because of the large range of incomes, we should take a log transform of the annual income variable.')
    loans['log_annual_inc'] = loans['annual_inc'].apply(lambda x: np.log10(x+1))
    loans.drop('annual_inc', axis=1, inplace=True)

    print('- There are 61,665 different titles in the dataset, and based on the top 10 titles, the purpose variable appears to already contain this information. So we drop the title variable.')
    loans.drop('title', axis=1, inplace=True)

    print('- There are too many different zip codes, so just keep the state column.')
    loans.drop(labels='zip_code', axis=1, inplace=True)

    print('- Just retain the year number for earliest_cr_line')
    loans['earliest_cr_line'] = loans['earliest_cr_line'].apply(lambda s: int(s[-4:]))

    print('- We only need to keep one of the FICO scores. We take the average of fico_range_low and fico_range_high and call it fico_score:')
    loans['fico_score'] = 0.5*loans['fico_range_low'] + 0.5*loans['fico_range_high']
    loans.drop(['fico_range_high', 'fico_range_low'], axis=1, inplace=True)

    print('- We take a log transform of the total credit revolving balance variable.')
    loans['log_revol_bal'] = loans['revol_bal'].apply(lambda x: np.log10(x + 1))
    loans.drop('revol_bal', axis=1, inplace=True)

    print('Current dimensions: ', loans.shape)

    print('*** Data transformation ***')

    print('- Convert loan status to 0/1 charge-off flag')
    loans['charged_off'] = (loans['loan_status'] == 'Charged Off').apply(np.uint8)
    loans.drop('loan_status', axis=1, inplace=True)

    missing_fractions = loans.isnull().mean().sort_values(ascending=False)  # Fraction of data missing for each variable
    print('- Checking for missing data:')
    print(missing_fractions[missing_fractions > 0])  # Print variables that are missing data
    print('- All remaining columns:')
    print(loans.columns)

    print('- Introduce dummy categories')
    loans = pd.get_dummies(
        loans,
        columns=[
            'sub_grade', 'home_ownership', 'verification_status',
            'purpose', 'addr_state', 'initial_list_status', 'application_type'
        ], drop_first=True
    )
    print('Current dimensions: ', loans.shape)

    print('- Converting issue_d to datetime')
    loans['issue_d'] = pd.to_datetime(loans['issue_d'])

    # Saving a snapshot of the processed data
    processed_filepath = 'loans_processed.xz'
    loans.to_pickle(processed_filepath)
endTime = datetime.now()
*** Preprocessing and dropping specific columns: ***
- Convert loan term (36/60 months) to number
- The grade is implied by the subgrade, drop the grade column
- There are too many different job titles for this feature to be useful, so we drop it.
- Convert emp_length to integers:
0.0     108061
1.0      88494
2.0     121743
3.0     107597
4.0      80556
5.0      84154
6.0      62733
7.0      59624
8.0      60701
9.0      50937
10.0    442199
NaN      78511
Name: emp_length, dtype: int64
- Replace the home_ownership values ANY and NONE with OTHER
MORTGAGE    665579
RENT        534421
OWN         144832
OTHER          478
Name: home_ownership, dtype: int64
- Annual income ranges from 0 to 10,000,000, with a median of 65,000. Because of the large range of incomes, we should take a log transform of the annual income variable.
- There are 61,665 different titles in the dataset, and based on the top 10 titles, the purpose variable appears to already contain this information. So we drop the title variable.
- There are too many different zip codes, so just keep the state column.
- Just retain the year number for earliest_cr_line
- We only need to keep one of the FICO scores. We take the average of fico_range_low and fico_range_high and call it fico_score:
- We take a log transform of the total credit revolving balance variable.
Current dimensions:  (1345310, 26)
*** Data transformation ***
- Convert loan status to 0/1 charge-off flag
- Checking for missing data:
emp_length              0.058359
mort_acc                0.035145
revol_util              0.000637
pub_rec_bankruptcies    0.000518
dti                     0.000278
dtype: float64
- All remaining columns:
Index(['id', 'loan_amnt', 'term', 'int_rate', 'installment', 'sub_grade',
       'emp_length', 'home_ownership', 'verification_status', 'issue_d',
       'purpose', 'addr_state', 'dti', 'earliest_cr_line', 'open_acc',
       'pub_rec', 'revol_util', 'total_acc', 'initial_list_status',
       'application_type', 'mort_acc', 'pub_rec_bankruptcies',
       'log_annual_inc', 'fico_score', 'log_revol_bal', 'charged_off'],
      dtype='object')
- Introduce dummy categories
Current dimensions:  (1345310, 123)
- Converting issue_d to datetime
Time taken: 13.586206 seconds
In [16]:
# Recording the provenance of the above step
a_transforming = prov_doc.activity(
    f'ex:ml/preprocessing/{session_id}', startTime, endTime, {
        PROV_TYPE: PL_NS['TransformingData']
    }
)
a_transforming.used(e_loans_filtered)
a_transforming.wasAssociatedWith(ag_engineer)  # the same engineer did this, reusing the agent `ag_engineer`

# Recording the provenance of the filtered dataset
e_loans_processed = get_file_entity(prov_doc, processed_filepath)
e_loans_processed.add_attributes({'ln:n_rows': loans.shape[0], 'ln:n_cols': loans.shape[1]})
e_loans_processed.wasGeneratedBy(a_transforming)
e_loans_processed.wasDerivedFrom(e_loans_filtered)

e_loans_processed.wasAttributedTo(ag_engineer)
Out[16]:
<ProvEntity: file:loans_processed.xz>

Splitting train and test data

In [17]:
startTime = datetime.now()
with Timer():
    print('*** Train/Test Data Spliting ***')

    loans_train = loans.loc[loans['issue_d'] < loans['issue_d'].quantile(0.9)]
    loans_test = loans.loc[loans['issue_d'] >= loans['issue_d'].quantile(0.9)]
    print('- Number of loans in the partitions:  ', loans_train.shape[0] + loans_test.shape[0])
    print('- Number of loans in the full dataset:', loans.shape[0])
    print('- Test/Train ratio:', loans_test.shape[0] / loans.shape[0])
    del loans

    train_filepath = 'loans_train.xz'
    test_filepath = 'loans_test.xz'
    loans_train.to_pickle(train_filepath)
    loans_test.to_pickle(test_filepath)
endTime = datetime.now()
*** Train/Test Data Spliting ***
- Number of loans in the partitions:   1345310
- Number of loans in the full dataset: 1345310
- Test/Train ratio: 0.11111639696426846
Time taken: 0.795344 seconds
In [18]:
# Recording the provenance of the above step
a_splitting = prov_doc.activity(
    f'ex:ml/splitting/{session_id}', startTime, endTime, {
        PROV_TYPE: PL_NS['SplittingTestData']
    }
)
a_splitting.used(e_loans_processed)
a_splitting.wasAssociatedWith(ag_engineer)  # reusing the same agent `ag_engineer`

# provenance of the train data
e_loans_train = get_file_entity(prov_doc, train_filepath)
e_loans_train.add_attributes({'ln:n_rows': loans_train.shape[0], 'ln:n_cols': loans_train.shape[1]})
e_loans_train.wasGeneratedBy(a_splitting)
e_loans_train.wasDerivedFrom(e_loans_filtered)
e_loans_train.wasAttributedTo(ag_engineer)

# provenance of the test data
e_loans_test = get_file_entity(prov_doc, test_filepath)
e_loans_test.add_attributes({'ln:n_rows': loans_test.shape[0], 'ln:n_cols': loans_test.shape[1]})
e_loans_test.wasGeneratedBy(a_splitting)
e_loans_test.wasDerivedFrom(e_loans_filtered)
e_loans_test.wasAttributedTo(ag_engineer)
Out[18]:
<ProvEntity: file:loans_test.xz>

Training the pipeline

In [19]:
startTime = datetime.now()
with Timer():
    print('*** Pipeline Training ***')
    loans_train.drop('issue_d', axis=1, inplace=True)
    loans_test.drop('issue_d', axis=1, inplace=True)

    print('- IDs are all unique, hence not useful for predicting loan status')
    loans_train.drop('id', axis=1, inplace=True)
    loans_test.drop('id', axis=1, inplace=True)

    y_train = loans_train['charged_off']
    y_test = loans_test['charged_off']
    X_train = loans_train.drop('charged_off', axis=1)
    X_test = loans_test.drop('charged_off', axis=1)

    del loans_train, loans_test

    dt_pipeline = Pipeline([
        ('imputer', SimpleImputer(copy=False)),
        ('model', DecisionTreeClassifier(max_depth=5))
    ])
    dt_pipeline.fit(X_train, y_train)
endTime = datetime.now()
*** Pipeline Training ***
- IDs are all unique, hence not useful for predicting loan status
Time taken: 17.191541 seconds
In [20]:
# Recording the provenance of the above step
a_training = prov_doc.activity(
    f'ex:ml/training/{session_id}', startTime, endTime, {
        PROV_TYPE: PL_NS['FittingData']
    }
)
a_training.used(e_loans_train)
a_training.wasAssociatedWith(ag_engineer)

e_pipeline = prov_doc.entity(
    f'py:{session_id}/{id(dt_pipeline)}', {  # this is the in-memory object of the Pipeline
        PROV_TYPE: SK_NS['pipeline.Pipeline']
    }
)
e_pipeline.wasGeneratedBy(a_training)
e_pipeline.wasDerivedFrom(e_loans_train)
e_pipeline.wasAttributedTo(ag_engineer)
Out[20]:
<ProvEntity: py:1574096047/4455456720>

Validating the trained pipeline

In [21]:
startTime = datetime.now()
with Timer():
    print('*** Pipeline Validation ***')
    score = dt_pipeline.score(X_test, y_test)
    print('- Accuracy score: ', score)
endTime = datetime.now()
*** Pipeline Validation ***
- Accuracy score:  0.7957935860214335
Time taken: 0.247086 seconds
In [22]:
# Recording the provenance of the above step
a_validating = prov_doc.activity(
    f'ex:ml/validating/{session_id}', startTime, endTime, {
        PROV_TYPE: PL_NS['AssessingPerformance']
    }
)
a_validating.used(e_loans_test)
a_validating.used(e_pipeline)
a_validating.wasAssociatedWith(ag_engineer)
e_score = prov_doc.entity(
    f'py:{session_id}/{id(score)}', {
        PROV_TYPE: PL_NS['AccuracyScore'],
        PROV_VALUE: score
    }
)
e_score.wasGeneratedBy(a_validating)
e_score.wasDerivedFrom(e_loans_test)
e_score.wasDerivedFrom(e_pipeline)
Out[22]:
<ProvEntity: py:1574096047/5006562704>

Simulating a manager's approval of the pipeline

In [23]:
startTime = datetime.now()
with Timer():
    print('*** Pipeline Approval and Saving ***')
    print('- simulating a human approval')
    time.sleep(random.random())
    approval_time = datetime.now()

    # Saving the approved pipeline
    pipeline_filepath = Path('dt_pipeline.pickled')
    with pipeline_filepath.open('wb') as f:
        pickle.dump(dt_pipeline, f)

endTime = datetime.now()
*** Pipeline Approval and Saving ***
- simulating a human approval
Time taken: 0.711091 seconds
In [24]:
# Recording the provenance of the above step
a_approving = prov_doc.activity(
    f'ex:ml/approving/{session_id}', startTime, endTime, {
        PROV_TYPE: PL_NS['ApprovingPipeline']
    }
)
a_approving.used(e_score)

# the manager who approved the pipeline for deployment
ag_manager = prov_doc.agent(
    'ex:staff/37', [
        (PROV_TYPE, PROV['Person']),
        (PROV_TYPE, LN_NS['Manager'])
    ]
)
ag_manager.actedOnBehalfOf(ag_institution)
a_approving.wasAssociatedWith(ag_manager)

# provenance of the pipeline saved in the previous step
e_pipeline_file = get_file_entity(prov_doc, pipeline_filepath)
e_pipeline_file.wasGeneratedBy(a_approving)
e_pipeline_file.wasDerivedFrom(e_pipeline)

# a record of the approval is also recorded
e_approval_record = prov_doc.entity(
    f'ex:records/{session_id}', {
        PROV_TYPE: LN_NS['ApprovalRecord'],
        'ln:signature': sha256(pipeline_filepath),
        'ln:pipeline': e_pipeline_file,
    })
e_approval_record.wasGeneratedBy(a_approving)
e_approval_record.wasDerivedFrom(e_score)
e_approval_record.wasAttributedTo(ag_manager)
Out[24]:
<ProvEntity: ex:records/1574096047>

Saving the provenance

In [25]:
provenance_filepath = Path("output/training.provn")
print('Writing provenance to:', provenance_filepath)

with provenance_filepath.open('w') as f:
    f.write(prov_doc.get_provn())

# Visualise the provenance in a graphical representation
dot = prov_to_dot(prov_doc)
dot.write_png(provenance_filepath.with_suffix('.png'))
dot.write_pdf(provenance_filepath.with_suffix('.pdf'))
Writing provenance to: output/training.provn