"""~This file is part of the Aliro library~
Copyright (C) 2023 Epistasis Lab,
Center for Artificial Intelligence Research and Education (CAIRE),
Department of Computational Biomedicine (CBM),
Cedars-Sinai Medical Center.
Aliro is maintained by:
- Hyunjun Choi (hyunjun.choi@cshs.org)
- Miguel Hernandez (miguel.e.hernandez@cshs.org)
- Nick Matsumoto (nicholas.matsumoto@cshs.org)
- Jay Moran (jay.moran@cshs.org)
- and many other generous open source contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
(Autogenerated header, do not modify)
"""
import __main__
from sys import version
import warnings
from sklearn import __version__ as skl_version
import joblib
from mlxtend.evaluate import feature_importance_permutation
from sklearn.utils import safe_sqr, check_X_y
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, LabelEncoder
from sklearn.model_selection import GridSearchCV, cross_validate, StratifiedKFold, RepeatedStratifiedKFold, KFold
from sklearn.metrics import SCORERS, roc_curve, auc, make_scorer, confusion_matrix
import itertools
import json
import os
import matplotlib.pyplot as plt
from matplotlib import rcParams
import shap
import numpy as np
import pandas as pd
import matplotlib as mpl
from sklearn import decomposition
import matplotlib.colors as mcolors
from matplotlib.patches import Patch
from sklearn.manifold import TSNE
from sklearn.model_selection import learning_curve
mpl.use('Agg')
# if system environment allows to export figures
figure_export = True
if 'MAKEPLOTS' in os.environ:
if str(os.environ['MAKEPLOTS']) == '1':
figure_export = True
# if system environment has a random seed
random_state = None
if 'RANDOM_SEED' in os.environ:
random_state = int(os.environ['RANDOM_SEED'])
# max numbers of bar in importance_score plot and decision tree plot
max_bar_num = 10
# The maximum depth of the decision tree for plot
DT_MAX_DEPTH = 6
if 'DT_MAX_DEPTH' in os.environ:
DT_MAX_DEPTH = int(os.environ['DT_MAX_DEPTH'])
# Number of samples used for SHAP Explainers
max_samples_kernel_explainer = 50
if 'MACHINE_SHAP_SAMPLES_KERNEL_EXPLAINER' in os.environ:
max_samples_kernel_explainer = int(
os.environ['MACHINE_SHAP_SAMPLES_KERNEL_EXPLAINER'])
max_samples_other_explainer = 100
if 'MACHINE_SHAP_SAMPLES_OTHER_EXPLAINER' in os.environ:
max_samples_other_explainer = int(
os.environ['MACHINE_SHAP_SAMPLES_OTHER_EXPLAINER'])
[docs]def balanced_accuracy(y_true, y_pred):
"""Default scoring function of classification: balanced accuracy.
Balanced accuracy computes each class' accuracy on a per-class basis using a
one-vs-rest encoding, then computes an unweighted average of the class accuracies.
Parameters
----------
y_true: numpy.ndarray {n_samples}
True class labels
y_pred: numpy.ndarray {n_samples}
Predicted class labels by the estimator
Returns
-------
fitness: float
Returns a float value indicating balanced accuracy
0.5 is as good as chance, and 1.0 is perfect predictive accuracy
"""
all_classes = list(set(np.append(y_true, y_pred)))
all_class_accuracies = []
for this_class in all_classes:
this_class_sensitivity = 0.
this_class_specificity = 0.
if sum(y_true == this_class) != 0:
this_class_sensitivity = \
float(sum((y_pred == this_class) & (y_true == this_class))) /\
float(sum((y_true == this_class)))
this_class_specificity = \
float(sum((y_pred != this_class) & (y_true != this_class))) /\
float(sum((y_true != this_class)))
this_class_accuracy = (this_class_sensitivity +
this_class_specificity) / 2.
all_class_accuracies.append(this_class_accuracy)
return np.mean(all_class_accuracies)
def pearsonr(y_true, y_pred):
"""Scoring function: Calculates a Pearson correlation coefficient.
Parameters
----------
y_true: numpy.ndarray {n_samples}
True class labels
y_pred: numpy.ndarray {n_samples}
Predicted class labels by the estimator
Returns
-------
fitness: float
Returns a float value indicating Pearson correlation coefficient
"""
from scipy.stats import pearsonr
r = pearsonr(y_true, y_pred)[0]
if np.isnan(r):
r = 0.0
return r
# make new SCORERS
SCORERS['balanced_accuracy'] = make_scorer(balanced_accuracy)
SCORERS['pearsonr'] = make_scorer(pearsonr)
def get_column_names_from_ColumnTransformer(column_transformer, feature_names):
"""Get column names after applying Column Transformations
Parameters
----------
column_transformer: sklearn.compose._column_transformer.ColumnTransformer
A list of categorical, ordinal and remaining transformations
feature_names: list of strings
Original list of column/feature names
Returns
-------
new_feature_names: list of strings
Feature names generated after column transformations
"""
new_feature_names = []
for transformer_in_columns in column_transformer.transformers_:
_, transformer, col_indices = transformer_in_columns
feature_columns = [feature_names[i] for i in col_indices]
try: # Only works for OneHotEncoder transforms
names = transformer.get_feature_names(feature_columns)
new_feature_names += list(names)
except:
new_feature_names += feature_columns
return new_feature_names
# decision rule for choosing number of folds based on the class distribution in the given dataset
def decision_rule_fold_cv_based_on_classes(each_class):
"""
Adjusts the number of cross-validation folds based on the class distribution.
Parameters
----------
each_class : dict
A dictionary where keys are the classes and the values are the number of samples per class.
Returns
-------
cv : int
The suitable number of cross-validation folds ensuring that each fold can include instances of each class.
"""
# Find the minimum class count to ensure every fold can contain at least one instance of every class.
min_class_count = min(each_class.values())
# The maximum number of folds is determined by the smallest class to ensure representation in each fold.
# However, we cannot have more folds than the minimum class count.
n_folds = min(10, min_class_count) # Starting with a default max of 10 folds
# Ensure at least 2 folds for meaningful cross-validation.
n_folds = max(n_folds, 2)
return n_folds
[docs]def generate_results(model, input_data,
tmpdir, _id, target_name='class',
mode='classification', figure_export=figure_export,
random_state=random_state,
filename=['test_dataset'],
categories=None,
ordinals=None,
encoding_strategy="OneHotEncoder",
param_grid={}
):
"""Generate reaults for applying a model on dataset in Aliro.
Parameters
----------
model: scikit-learn Estimator
A machine learning model following scikit-learn API
input_data: pandas.Dataframe or list of two pandas.Dataframe
pandas.DataFrame: Aliro will use 10 fold CV to estimate train/test scroes.
list of two pandas.DataFrame: The 1st pandas.DataFrame is training dataset,
while the 2nd one is testing dataset
target_name: string
Target name in input data
tmpdir: string
Temporary directory for saving experiment results
_id: string
Experiment id
mode: string
'classification': Run classification analysis
'regression': Run regression analysis
figure_export: boolean
If figure_export is True, the figures will be generated and exported.
random_state: int
Random seed
filename: list
Filename for input dataset
categories: list
List of column names for one-hot encoding
ordinals: dict
Dictionary of ordinal features:
Keys of dictionary: categorical feature name(s)
Values of dictionary: categorical values
encoding_strategy: string
Encoding strategy for categorical features defined in projects.json
param_grid: dict
If grid_search is non-empty dictionary, then the experiment will
do parameter tuning via GridSearchCV. It should report best result to UI
and save all results to knowlegde base.
Returns
-------
None
"""
print('loading..')
if isinstance(input_data, list):
training_data = input_data[0]
testing_data = input_data[1]
input_data = pd.concat([training_data, testing_data])
train_rows = training_data.shape[0]
total_rows = input_data.shape[0]
cv = [
np.array(
range(train_rows)), np.array(
range(
train_rows, total_rows))]
else:
cv = 10
feature_names = np.array(
[x for x in input_data.columns.values if x != target_name])
num_classes = input_data[target_name].unique().shape[0]
# calculate number of each class
each_class = input_data[target_name].value_counts()
features = input_data.drop(target_name, axis=1).values
target = input_data[target_name].values
target_arr = np.array(target)
len_target = len(target_arr)
classes = list(set(target))
class_perc = {}
for OneClass in classes:
occ = np.count_nonzero(target_arr == OneClass)
class_perc["class_"+str(OneClass)] = occ/len_target
file_name = "class_percentage" + ".json"
# show percentage of each class
save_json_fmt(outdir=tmpdir, _id=_id,
fname=file_name, content=class_perc)
features, target = check_X_y(
features, target, dtype=None, order="C", force_all_finite=True)
# fix random_state
model = setup_model_params(model, 'random_state', random_state)
# set class_weight
model = setup_model_params(model, 'class_weight', 'balanced')
# use OneHotEncoder or OrdinalEncoder to convert categorical features
if categories or ordinals:
transform_cols = []
feature_names_list = list(feature_names)
if categories:
col_idx = get_col_idx(feature_names_list, categories)
if encoding_strategy == "OneHotEncoder":
transform_cols.append(
("categorical_encoder", OneHotEncoder(
handle_unknown='ignore'), col_idx))
elif encoding_strategy == "OrdinalEncoder":
ordinal_map = OrdinalEncoder().fit(
features[:, col_idx]).categories_
transform_cols.append(
("categorical_encoder", OrdinalEncoder(
categories=ordinal_map), col_idx))
if ordinals:
ordinal_features = sorted(list(ordinals.keys()))
col_idx = get_col_idx(feature_names_list, ordinal_features)
ordinal_map = [ordinals[k] for k in ordinal_features]
transform_cols.append(("ordinalencoder",
OrdinalEncoder(categories=ordinal_map),
col_idx))
ct = ColumnTransformer(
transformers=transform_cols,
remainder='passthrough',
sparse_threshold=0
)
model = make_pipeline(ct, model)
scores = {}
# print('Args used in model:', model.get_params())
if mode == "classification":
if (num_classes > 2):
scoring = ["balanced_accuracy",
"precision_macro",
"recall_macro",
"f1_macro"]
scores['roc_auc_score'] = 'not supported for multiclass'
scores['train_roc_auc_score'] = 'not supported for multiclass'
else:
# https://en.wikipedia.org/wiki/Confusion_matrix
# scoring = ["balanced_accuracy",
# "precision",
# "recall",
# "f1",
# "roc_auc"]
scoring = ["balanced_accuracy",
"precision",
"recall",
"f1",
"roc_auc"]
metric = "accuracy"
else:
scoring = ["r2",
"neg_mean_squared_error",
"neg_mean_absolute_error",
"explained_variance",
"pearsonr"]
metric = 'r2'
with warnings.catch_warnings():
warnings.simplefilter('ignore')
if param_grid:
print("param_grid")
if isinstance(model, Pipeline):
parameters = {}
for key, val in param_grid.items():
step_name = model.steps[-1][0]
pipe_key = "{}__{}".format(step_name, key)
parameters[pipe_key] = val
else:
parameters = param_grid
clf = GridSearchCV(estimator=model,
param_grid=parameters,
scoring=metric,
cv=5,
refit=True,
verbose=0,
error_score=-float('inf'),
return_train_score=True)
clf.fit(features, target)
cv_results = clf.cv_results_
# rename params name from pipeline object
fmt_result = []
for param, ts in zip(
cv_results['params'], cv_results['mean_train_score']):
row_dict = {'mean_train_score': ts}
for key, val in param.items():
row_dict[key.split('__')[-1]] = val
fmt_result.append(row_dict)
fmt_result = pd.DataFrame(fmt_result)
fmt_result_file = '{0}{1}/grid_search_results_{1}.csv'.format(
tmpdir, _id)
fmt_result.to_csv(fmt_result_file, index=False)
model = clf.best_estimator_
else:
print("param_grid else")
plot_learning_curve(tmpdir, _id, model, features,
target, cv, return_times=True)
model.fit(features, target)
cv_scores = cross_validate(
estimator=model,
X=features,
y=target,
scoring=scoring,
# cv = stratified_cv,
cv = cv,
return_train_score=True,
return_estimator=True
)
for s in scoring:
train_scores = cv_scores['train_' + s]
test_scores = cv_scores['test_' + s]
print("train_scores", train_scores)
print("test_scores", test_scores)
# if abs(train_scores.mean()) is np.nan OR abs(test_scores.mean()) is np.nan
if np.isnan(abs(train_scores.mean())) or np.isnan(abs(test_scores.mean())):
print("777-NaN")
print("train_scores", train_scores)
print("test_scores", test_scores)
# remove _macro
score_name = s.replace('_macro', '')
# make balanced_accuracy as default score
if score_name in ["balanced_accuracy", "neg_mean_squared_error"]:
scores['train_score'] = abs(train_scores.mean())
scores['test_score'] = abs(test_scores.mean())
# Temporary fix to handle NaN values
if np.isnan(scores['train_score']):
scores['train_score'] = np.nanmean(train_scores)
if np.isnan(scores['test_score']):
scores['test_score'] = np.nanmean(test_scores)
# for api will fix later
if score_name == "balanced_accuracy":
scores['accuracy_score'] = test_scores.mean()
# Temporary fix to handle NaN values
if np.nanmean(test_scores)!=np.nan:
scores['accuracy_score'] = np.nanmean(test_scores)
else:
scores['accuracy_score'] = 0
# for experiment tables
if score_name == "balanced_accuracy" or score_name == "r2":
scores['exp_table_score'] = test_scores.mean()
# Temporary fix to handle NaN values
if np.nanmean(test_scores)!=np.nan:
scores['exp_table_score'] = np.nanmean(test_scores)
else:
scores['exp_table_score'] = 0
if score_name in ["neg_mean_squared_error", "neg_mean_absolute_error"]:
scores['train_{}_score'.format(score_name)] = abs(
train_scores.mean())
# Temporary fix to handle NaN values
if np.nanmean(train_scores)!=np.nan:
scores['train_{}_score'.format(score_name)] = np.nanmean(
train_scores)
else:
scores['train_{}_score'.format(score_name)] = 0
scores['{}_score'.format(score_name)] = abs(test_scores.mean())
# Temporary fix to handle NaN values
if np.nanmean(test_scores)!=np.nan:
scores['{}_score'.format(score_name)] = np.nanmean(test_scores)
else:
scores['{}_score'.format(score_name)] = 0
else:
scores['train_{}_score'.format(score_name)] = train_scores.mean()
# Temporary fix to handle NaN values
if np.nanmean(train_scores)!=np.nan:
scores['train_{}_score'.format(score_name)] = np.nanmean(
train_scores)
else:
scores['train_{}_score'.format(score_name)] = 0
scores['{}_score'.format(score_name)] = test_scores.mean()
# Temporary fix to handle NaN values
if np.nanmean(test_scores)!=np.nan:
scores['{}_score'.format(score_name)] = np.nanmean(test_scores)
else:
scores['{}_score'.format(score_name)] = 0
# dump fitted module as pickle file
export_model(tmpdir, _id, model, filename, target_name, mode, random_state)
# get predicted classes
predicted_classes = model.predict(features)
# exporting/computing importance score
coefs, imp_score_type = compute_imp_score(model, metric,
features,
target,
random_state)
feature_importances = {
'feature_names': feature_names.tolist(),
'feature_importances': coefs.tolist(),
'feature_importance_type': imp_score_type
}
print("feature_importances", feature_importances)
top_feature_importances = {}
# if size of feature_importances is greater than 10, then
# only top 10 features whose feature_importances are greater than 0 are returned
if len(feature_importances['feature_importances']) > 10:
for i in range(len(feature_importances['feature_importances'])):
if feature_importances['feature_importances'][i] >= 0:
top_feature_importances[feature_importances['feature_names']
[i]] = feature_importances['feature_importances'][i]
# sort the dictionary in descending order of feature_importances
top_feature_importances = dict(
sorted(top_feature_importances.items(), key=lambda item: item[1], reverse=True))
# get top 10 features
top_feature_importances = dict(
list(top_feature_importances.items())[:10])
top_feature_importances = {
'feature_names': list(top_feature_importances.keys()),
'feature_importances': list(top_feature_importances.values()),
'feature_importance_type': imp_score_type
}
feature_importances = top_feature_importances
save_json_fmt(
outdir=tmpdir,
_id=_id,
fname="feature_importances.json",
content=feature_importances)
dtree_train_score = None
if figure_export:
top_features_names, indices = plot_imp_score(
tmpdir, _id, coefs, feature_names, imp_score_type)
if not categories and not ordinals:
dtree_train_score = plot_dot_plot(tmpdir, _id, features,
target,
top_features_names,
indices,
random_state,
mode)
# save metrics
scores['dtree_train_score'] = dtree_train_score
if mode == 'classification':
plot_confusion_matrix(tmpdir,
_id,
features,
target,
model.classes_,
cv_scores,
figure_export)
plot_pca_2d(tmpdir, _id, features, target)
# plot_pca_3d(tmpdir,_id,features,target)
# plot_pca_3d_iris(tmpdir,_id,features,target)
plot_tsne_2d(tmpdir, _id, features, target)
if type(model).__name__ == 'Pipeline':
step_names = [step[0] for step in model.steps]
column_transformer = model[step_names[0]]
classifier_model = model[step_names[1]]
modified_feature_names = get_column_names_from_ColumnTransformer(
column_transformer, feature_names)
modified_features = column_transformer.transform(features.copy())
plot_shap_analysis_curve(tmpdir,
_id,
classifier_model,
modified_features,
modified_feature_names,
classifier_model.classes_,
target)
else:
plot_shap_analysis_curve(tmpdir,
_id,
model,
features.copy(), # Send a copy of features as it may get modified
feature_names,
model.classes_,
target)
if num_classes == 2:
plot_roc_curve(
tmpdir,
_id,
features,
target,
cv_scores,
figure_export)
else: # regression
if figure_export:
plot_cv_pred(tmpdir, _id, features, target, cv_scores)
metrics_dict = {'_scores': scores}
value_file_name = "value" + ".json"
save_json_fmt(outdir=tmpdir, _id=_id,
fname=value_file_name, content=metrics_dict)
prediction_dict = {'prediction_values': predicted_classes.tolist()}
prediction_file_name = "prediction_values" + ".json"
save_json_fmt(outdir=tmpdir, _id=_id,
fname=prediction_file_name, content=prediction_dict)
[docs]def get_col_idx(feature_names_list, columns):
"""Get unique indexes of columns based on list of column names.
Parameters
----------
feature_names_list: list
List of column names on dataset
columns: list
List of selected column names
Returns
-------
col_idx: list
list of selected column indexes
"""
return [feature_names_list.index(c) for c in columns]
[docs]def setup_model_params(model, parameter_name, value):
"""Assign value to a parameter in a model.
Parameters
----------
model: scikit-learn Estimator
Machine learning model following scikit-learn API
parameter_name: string
Parameter name in the scikit-learn model
value: object
Values for assigning to the parameter
Returns
-------
model: scikit-learn Estimator
A new scikit-learn model with a updated parameter
"""
# fix random_state
if hasattr(model, parameter_name):
setattr(model, parameter_name, value)
return model
[docs]def compute_imp_score(model, metric, features, target, random_state):
"""Compute permuation importance scores for features.
Parameters
----------
tmpdir: string
Temporary directory for saving experiment results
model: scikit-learn Estimator
A fitted scikit-learn model
metric: str, callable
The metric for evaluating the feature importance through
permutation. By default, the strings 'accuracy' is
recommended for classifiers and the string 'r2' is
recommended for regressors. Optionally, a custom
scoring function (e.g., `metric=scoring_func`) that
accepts two arguments, y_true and y_pred, which have
similar shape to the `y` array.
features: np.darray/pd.DataFrame
Features in training dataset
target: np.darray/pd.DataFrame
Target in training dataset
random_state: int
Random seed for permuation importances
Returns
-------
coefs: np.darray
Feature importance scores
imp_score_type: string
Importance score type
"""
coefs, _ = feature_importance_permutation(
predict_method=model.predict,
X=features,
y=target,
num_rounds=5,
metric=metric,
seed=random_state,
)
imp_score_type = "Permutation Feature Importance"
return coefs, imp_score_type
[docs]def save_json_fmt(outdir, _id, fname, content):
"""Save results into json format.
Parameters
----------
outdir: string
Path of output directory
_id: string
Experiment ID in Aliro
fname: string
File name
content: list or directory
Content for results
Returns
-------
None
"""
expdir = outdir + _id + '/'
with open(os.path.join(expdir, fname), 'w') as outfile:
json.dump(content, outfile)
[docs]def plot_confusion_matrix(
tmpdir,
_id,
X,
y,
class_names,
cv_scores,
figure_export):
"""Make plot for confusion matrix.
Parameters
----------
tmpdir: string
Temporary directory for saving experiment results
_id: string
Experiment ID in Aliro
X: np.darray/pd.DataFrame
Features in training dataset
y: np.darray/pd.DataFrame
Target in training dataset
class_names: list
List of class names
cv_scores: dictionary
Return from sklearn.model_selection.cross_validate
figure_export: boolean
If true, then export roc curve plot
Returns
-------
None
"""
pred_y = np.empty(y.shape)
cv = StratifiedKFold(n_splits=10)
# Temporary fix to handle NaN values
# cv = StratifiedKFold(n_splits=8)
for cv_split, est in zip(cv.split(X, y), cv_scores['estimator']):
train, test = cv_split
pred_y[test] = est.predict(X[test])
cnf_matrix = confusion_matrix(
y, pred_y, labels=class_names)
cnf_matrix_dict = {
'cnf_matrix': cnf_matrix.tolist(),
'class_names': class_names.tolist()
}
file_name = "cnf_matrix" + ".json"
save_json_fmt(outdir=tmpdir, _id=_id,
fname=file_name, content=cnf_matrix_dict)
# export plot
if figure_export:
cm = cnf_matrix
classes = class_names
np.set_printoptions(precision=2)
plt.figure()
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, cm[i, j],
ha="center", va="center",
color="gray" if cm[i, j] > thresh else "black")
plt.subplots_adjust(bottom=0.15)
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.savefig(tmpdir + _id + '/confusion_matrix' + '.png')
plt.close()
def get_example_subset(y_predictions, y_true, select_class):
"""
Returns a subset of positive and negative example indices for a given class
along with misclassified labels.
Parameters
----------
y_predictions: List
Denotes model predictions for the test set
y_true: List
Denotes true labels for the test set
select_class: Integer
The class ID of the positive class
"""
y_predictions = pd.Series(y_predictions)
y_true = pd.Series(y_true)
y_pos = y_true[y_true == select_class]
y_neg = y_true[y_true != select_class]
num_positive = min(10, len(y_pos))
num_negative = min(10, len(y_neg))
examples = pd.Series([], dtype='int')
if num_positive != 0:
examples = examples.append(y_pos.sample(n=num_positive))
if num_negative != 0:
examples = examples.append(y_neg.sample(n=num_negative))
y_predictions_subset = y_predictions[examples.index]
y_true_subset = y_true[examples.index]
misclassified = y_predictions_subset != y_true_subset
return list(examples.index), misclassified
def plot_shap_analysis_curve(
tmpdir,
_id,
model,
features,
feature_names,
class_names,
target,
n_features=20
):
"""
Generate SHAP Analysis for Classification models
Parameters
----------
tmpdir: string
Temporary directory for saving experiment results
_id: string
Experiment ID in Aliro
model: scikit-learn estimator
A fitted scikit-learn model
features: np.darray/pd.DataFrame
Features in training dataset
feature_names: np.array
List of feature names
class_names: list
List of class names
target: np.darray
Target in test dataset
n_features: int
Maximum count of features to use for the plots
"""
# Ensure that images are saved properly
rcParams.update({'figure.autolayout': True})
# SHAP value calculation raises error if dtype is non-numeric
features = np.array(features).astype('float64')
# Determine model name for sklearn-based models
model_name = type(model).__name__.lower()
# convert target to Pandas Series type if not already
y_test = pd.Series(target)
# Sample 100 examples for Tree Explainer / Linear Explainer
if model_name in ['decisiontreeclassifier', 'randomforestclassifier', 'logisticregression', 'linearsvc']:
max_num_samples = max_samples_other_explainer
elif model_name == 'gradientboostingclassifier' and len(class_names) == 2:
max_num_samples = max_samples_other_explainer
# Sample 50 examples for Kernel Explainer
else:
max_num_samples = max_samples_kernel_explainer
num_samples = min(max_num_samples, len(features))
sampled_row_indices = np.random.choice(
features.shape[0], size=num_samples, replace=False)
features = features[sampled_row_indices]
y_test = y_test[sampled_row_indices].reset_index(drop=True)
# Select explainer and set shap values
if model_name in ['decisiontreeclassifier', 'randomforestclassifier']:
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(features)
expected_values = explainer.expected_value
elif model_name == 'gradientboostingclassifier' and len(class_names) == 2:
# Gradient Boosting Tree Explainer is only supported for Binary Classification
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(features)
expected_values = explainer.expected_value
elif 'linear' in model_name or 'logistic' in model_name:
# Supports Logistic Regression, LinearSVC
explainer = shap.LinearExplainer(model, features)
shap_values = explainer.shap_values(features)
expected_values = explainer.expected_value
elif 'svc' in model_name:
# Handle special case for SVC (probability=False)
return
else:
# KernelExplainer
explainer = shap.KernelExplainer(model.predict_proba, features)
# l1_reg not set to 'auto' to subside warnings
shap_values = explainer.shap_values(
features, l1_reg='num_features(10)')
expected_values = explainer.expected_value
# Generate predictions for the final features
y_predictions = pd.Series(model.predict(features))
# Determine link for decision plot
if 'linear' in model_name or 'logistic' in model_name:
link = 'logit'
else:
link = 'identity'
# Handle the case of multi-class SHAP outputs
if isinstance(shap_values, list):
for i, class_name in enumerate(class_names):
save_path = tmpdir + _id + '/shap_summary_curve' + \
_id + '_' + str(class_name) + '_.png'
examples_subset_index, misclassified = get_example_subset(
y_predictions, y_test, i)
combine_summary_decision_curve(
shap_values[i], expected_values[i], features, feature_names,
n_features, examples_subset_index, misclassified, link, save_path
)
else:
save_path = tmpdir + _id + '/shap_summary_curve' + _id + '_0_.png'
examples_subset_index, misclassified = get_example_subset(
y_predictions, y_test, class_names[1])
combine_summary_decision_curve(
shap_values, expected_values, features, feature_names,
n_features, examples_subset_index, misclassified, link, save_path
)
# Summary stats
shap_summary_dict = {
'shap_explainer': explainer.__class__.__name__,
'shap_num_samples': num_samples,
# 'shap_values': shap_values
}
file_name = 'shap_summary_curve' + '.png'
save_json_fmt(outdir=tmpdir, _id=_id,
fname=file_name, content=shap_summary_dict)
def combine_summary_decision_curve(
shap_value,
expected_value,
features,
feature_names,
n_features,
examples_subset_index,
misclassified,
link,
save_path
):
"""
Generate a combined SHAP Summary and Decision Plot
Parameters
----------
shap_value: np.ndarray
SHAP value for a particular output class
expected_value: float
Average of the classifier/model output over training dataset
features: np.ndarray
Features in testing dataset
feature_names: np.array
List of feature names
n_features: int
Maximum count of features to use for the plots
examples_subset_index: list of indices
Samples to select for the decision plot
misclassified: list of Boolean values
Denotes which selected samples are misclassified (set to true)
link: string
Link type to be used for the decision plot
save_path: string
File path where the plot will be saved
"""
figsize = (12, 6)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
# Generate Summary plot
plt.sca(ax1)
shap.summary_plot(shap_value,
features=features,
feature_names=feature_names,
sort=True,
show=False,
max_display=n_features)
# Generate Decision plot
plt.gcf().set_size_inches(figsize)
plt.sca(ax2)
feature_order = np.argsort(np.sum(np.abs(shap_value), axis=0))
shap.decision_plot(expected_value,
shap_value[examples_subset_index],
features[examples_subset_index, :],
feature_names=list(feature_names),
feature_display_range=slice(
None, -(n_features + 1), -1),
ignore_warnings=True,
highlight=misclassified,
show=False,
plot_color='viridis',
feature_order=feature_order,
link=link)
plt.plot([0.5, 0.5], [0, n_features], ':k', alpha=0.3)
ax2.set_yticklabels([])
plt.savefig(save_path)
plt.close()
# After switching to dynamic charts, possibly disable outputting graphs
# from this function
[docs]def plot_roc_curve(tmpdir, _id, X, y, cv_scores, figure_export):
"""
Plot ROC Curve.
Parameters
----------
tmpdir: string
Temporary directory for saving experiment results
_id: string
Experiment ID in Aliro
X: np.darray/pd.DataFrame
Features in training dataset
y: np.darray/pd.DataFrame
Target in training dataset
cv_scores: dictionary
Return from sklearn.model_selection.cross_validate
figure_export: boolean
If true, then export roc curve plot
Returns
-------
None
"""
from scipy import interp
from scipy.stats import sem, t
cv = StratifiedKFold(n_splits=10)
# Temporary fix to handle NaN values
# cv = StratifiedKFold(n_splits=8)
tprs = []
aucs = []
mean_fpr = np.linspace(0, 1, 100)
for cv_split, est in zip(cv.split(X, y), cv_scores['estimator']):
train, test = cv_split
try:
probas_ = est.predict_proba(X[test])[:, 1]
except AttributeError:
probas_ = est.decision_function(X[test])
# print(SCORERS['roc_auc'](est, X[train], y[train]))
# Compute ROC curve and area the curve
classes_encoded = np.array(
[list(est.classes_).index(c)
for c in y[test]], dtype=np.int
)
# print("Each classes_encoded:", classes_encoded)
fpr, tpr, thresholds = roc_curve(classes_encoded, probas_)
# Temporary fix to handle NaN values
# When the given data is extremely unbalanced, as illustrated by the example where classes_encoded consists solely of the class 0, both true positives (TP) and false negatives (FN) are zero. Consequently, the true positive rate (TPR) is calculated as TPR = TP / (TP + FN), which results in an undefined value (NaN) due to division by zero. In the specific scenario provided, where roc_curve([0,0,0], [0,0.9,0]) is called, it highlights a situation with no positive instances present in the true labels. For purposes of data visualization or further analysis where a numerical value is required, this NaN value is replaced with 0 to indicate the absence of true positives under these conditions.
fpr = np.nan_to_num(fpr)
tpr = np.nan_to_num(tpr)
tprs.append(interp(mean_fpr, fpr, tpr))
tprs[-1][0] = 0.0
roc_auc = auc(fpr, tpr)
aucs.append(roc_auc)
if figure_export:
plt.plot(fpr, tpr, lw=1, alpha=0.3)
mean_tpr = np.mean(tprs, axis=0)
mean_tpr[-1] = 1.0
mean_auc = auc(mean_fpr, mean_tpr)
std_auc = np.std(aucs)
std_err = sem(tprs, axis=0)
h = std_err * t.ppf(1.95 / 2, len(mean_tpr) - 1)
tprs_upper = np.minimum(mean_tpr + h, 1)
tprs_lower = np.maximum(mean_tpr - h, 0)
if figure_export:
plt.plot([0, 1], [0, 1], linestyle='--', lw=2, color='r',
label='Chance', alpha=.8)
plt.plot(
mean_fpr,
mean_tpr,
color='b',
label=r'Mean ROC (AUC = %0.2f $\pm$ %0.2f)' %
(mean_auc,
std_auc),
lw=2,
alpha=.8)
plt.fill_between(
mean_fpr,
tprs_lower,
tprs_upper,
color='grey',
alpha=.2,
label=r'95% Confidence Interval')
plt.xlim([-0.05, 1.05])
plt.ylim([-0.05, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC curve')
plt.legend(loc="lower right")
plt.savefig(tmpdir + _id + '/roc_curve' + '.png')
plt.close()
roc_curve_dict = {
'fpr': mean_fpr.tolist(),
'tpr': mean_tpr.tolist(),
'roc_auc_score': mean_auc
}
print("roc_curve_dict:", roc_curve_dict)
file_name = 'roc_curve' + '.json'
save_json_fmt(outdir=tmpdir, _id=_id,
fname=file_name, content=roc_curve_dict)
[docs]def plot_imp_score(tmpdir, _id, coefs, feature_names, imp_score_type):
"""Plot importance scores for features.
Parameters
----------
tmpdir: string
Temporary directory for saving experiment results
_id: string
Experiment ID in Aliro
coefs: array
Feature importance scores
feature_names: np.array
List of feature names
imp_score_type: string
Importance score type
Returns
-------
top_features: list
Top features with high importance score
indices: ndarray
Array of indices of top important features
"""
# plot bar charts for top important features
num_bar = min(max_bar_num, len(coefs))
indices = np.argsort(coefs)[-num_bar:]
h = plt.figure()
plt.title(imp_score_type)
plt.barh(range(num_bar), coefs[indices], color="r", align="center")
top_features = list(feature_names[indices])
plt.yticks(range(num_bar), feature_names[indices])
plt.ylim([-1, num_bar])
h.tight_layout()
plt.savefig(tmpdir + _id + '/imp_score' + '.png')
plt.close()
return top_features, indices
[docs]def plot_learning_curve(tmpdir, _id, model, features, target, cv, return_times=True):
"""Make learning curve.
Parameters
----------
tmpdir: string
Temporary directory for saving experiment results
_id: string
Experiment ID in Aliro
model: user specified model
features: np.darray/pd.DataFrame
Features in training dataset
target: np.darray/pd.DataFrame
Target in training dataset
cv: int, cross-validation generator or an iterable
Returns
-------
None
"""
features = np.array(features)
target = np.array(target)
target[target == -1] = 0
train_sizes, train_scores, test_scores, fit_times, _ = learning_curve(
model, features, target, None, np.linspace(0.1, 1.0, 5), cv, return_times=True)
plt.xlabel("Training examples")
plt.ylabel("Score")
plt.ylim([-0.05, 1.05])
train_scores_mean = np.mean(train_scores, axis=1)
train_scores_std = np.std(train_scores, axis=1)
test_scores_mean = np.mean(test_scores, axis=1)
test_scores_std = np.std(test_scores, axis=1)
plt.grid()
plt.fill_between(train_sizes, train_scores_mean - train_scores_std,
train_scores_mean + train_scores_std, alpha=0.1,
color="r")
plt.fill_between(train_sizes, test_scores_mean - test_scores_std,
test_scores_mean + test_scores_std, alpha=0.1, color="b")
plt.plot(train_sizes, np.mean(train_scores, axis=1),
'r', label=r'Training score')
plt.plot(train_sizes, np.mean(test_scores, axis=1),
'b', label=r'Cross-validation score')
plt.title('Learning curve')
plt.legend(loc='best')
plt.savefig(tmpdir + _id + '/learning_curve' + '.png')
plt.close()
if np.isnan(train_sizes.tolist()).all():
# replace nan with -1
train_sizes = np.nan_to_num(train_sizes, nan=-1)
if np.isnan(train_scores.tolist()).all():
# replace nan with -1
train_scores = np.nan_to_num(train_scores, nan=-1)
if np.isnan(test_scores.tolist()).all():
# replace nan with -1
test_scores = np.nan_to_num(test_scores, nan=-1)
# temp solution for nan values
train_sizes = np.nan_to_num(train_sizes, nan=-1)
train_scores = np.nan_to_num(train_scores, nan=-1)
test_scores = np.nan_to_num(test_scores, nan=-1)
print("train_sizes.tolist():", train_sizes.tolist())
print("train_scores.tolist():", train_scores.tolist())
print("test_scores.tolist():", test_scores.tolist())
learning_curve_dict = {
'train_sizes': train_sizes.tolist(),
'train_scores': train_scores.tolist(),
'test_scores': test_scores.tolist()
}
file_name = 'learning_curve' + '.json'
save_json_fmt(outdir=tmpdir, _id=_id,
fname=file_name, content=learning_curve_dict)
[docs]def plot_pca_2d(tmpdir, _id, features, target):
"""Make PCA on 2D.
Parameters
----------
tmpdir: string
Temporary directory for saving 2d pca plot and json file
_id: string
Experiment ID in Aliro
features: np.darray/pd.DataFrame
Features in training dataset
target: np.darray/pd.DataFrame
Target in training dataset
Returns
-------
None
"""
X = np.array(features)
y = np.array(target)
print(set(y))
plt.cla()
pca = decomposition.PCA(n_components=2)
# pca = decomposition.PCA(n_components=2)
pca.fit(X)
X = pca.transform(X)
num_classes = len(set(y))
# generate the number of colors equal to the number of classes
colors = plt.cm.Set1(np.linspace(0, 1, num_classes))
plt.scatter(X[:, 0], X[:, 1], c=y, cmap=mcolors.ListedColormap(colors))
# plot the legend where the colors are mapped to the classes
plt.legend(handles=[Patch(color=colors[i], label="class_"+str(i))
for i in range(num_classes)])
# write x axis as pc1 and y axis as pc2
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.savefig(tmpdir + _id + '/pca' + '.png')
plt.close()
# save X and y to json file
pca_dict = {
'X_pca': X.tolist(),
'y_pca': y.tolist()
}
file_name = 'pca-json' + '.json'
# save json file
save_json_fmt(outdir=tmpdir, _id=_id,
fname=file_name, content=pca_dict)
def plot_pca_3d(tmpdir, _id, features, target):
# import numpy as np
# import matplotlib.pyplot as plt
# from sklearn import datasets
# np.random.seed(5)
# iris = datasets.load_iris()
X = np.array(features)
y = np.array(target)
y[y == -1] = 0
fig = plt.figure(1, figsize=(4, 3))
plt.clf()
ax = fig.add_subplot(111, projection="3d", elev=48, azim=134)
# ax = fig.add_subplot(111, projection="2d", elev=48, azim=134)
ax.set_position([0, 0, 0.95, 1])
plt.cla()
pca = decomposition.PCA(n_components=3)
# pca = decomposition.PCA(n_components=2)
pca.fit(X)
X = pca.transform(X)
# for name, label in [("Setosa", 0), ("Versicolour", 1), ("Virginica", 2)]:
# ax.text3D(
# X[y == label, 0].mean(),
# X[y == label, 1].mean() + 1.5,
# X[y == label, 2].mean(),
# name,
# horizontalalignment="center",
# bbox=dict(alpha=0.5, edgecolor="w", facecolor="w"),
# )
classes_y = list(set(y))
for each_classes_y in classes_y:
name_label = (str(each_classes_y), each_classes_y)
name = name_label[0]
label = name_label[1]
ax.text3D(
X[y == label, 0].mean(),
# X[y == label, 1].mean() + 1.5,
X[y == label, 1].mean() + 1.0,
X[y == label, 2].mean(),
name,
horizontalalignment="center",
bbox=dict(alpha=0.5, edgecolor="w", facecolor="w"),
)
# for name, label in [("0", 0), ("1", 1)]:
# ax.text3D(
# X[y == label, 0].mean(),
# # X[y == label, 1].mean() + 1.5,
# X[y == label, 1].mean() + 1.0,
# X[y == label, 2].mean(),
# name,
# horizontalalignment="center",
# bbox=dict(alpha=0.5, edgecolor="w", facecolor="w"),
# )
# Reorder the labels to have colors matching the cluster results
# y = np.choose(y, [1, 2, 0]).astype(float)
# ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=y, cmap=plt.cm.nipy_spectral, edgecolor="k")
# y = np.choose(y, [0, 1]).astype(float)
# convert y type to float
y = y.astype(float)
# y = np.choose(y, [0, 1]).astype(float)
print(y)
ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=y,
cmap=plt.cm.nipy_spectral, edgecolor="k")
# ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=y)
# show which color is which class label based on scatter plot
print("X")
print(X)
# write x axis as pc1 and y axis as pc2
ax.set_xlabel('PC1')
ax.set_ylabel('PC2')
ax.set_zlabel('PC3')
ax.w_xaxis.set_ticklabels([])
ax.w_yaxis.set_ticklabels([])
ax.w_zaxis.set_ticklabels([])
# plt.show()
plt.savefig(tmpdir + _id + '/pca' + '.png')
plt.close()
[docs]def plot_tsne_2d(tmpdir, _id, features, target):
"""Make tsne on 2D.
Parameters
----------
tmpdir: string
Temporary directory for saving 2d t-sne plot and json file
_id: string
Experiment ID in Aliro
features: np.darray/pd.DataFrame
Features in training dataset
target: np.darray/pd.DataFrame
Target in training dataset
Returns
-------
None
"""
tsne = TSNE(n_components=2, verbose=1, random_state=123)
X_2d = tsne.fit_transform(features)
# version 2
num_classes = len(set(target))
# generate the number of colors equal to the number of classes
colors = plt.cm.Set1(np.linspace(0, 1, num_classes))
plt.scatter(X_2d[:, 0], X_2d[:, 1], c=target,
cmap=mcolors.ListedColormap(colors))
# plot the legend where the colors are mapped to the classes
plt.legend(handles=[Patch(color=colors[i], label="class_"+str(i))
for i in range(num_classes)])
# write x axis as pc1 and y axis as pc2
plt.xlabel('comp-1')
plt.ylabel('comp-2')
# plt.show()
plt.savefig(tmpdir + _id + '/tsne' + '.png')
plt.close()
# save X and y to json file
# X_tsne is 2d array, which is 2d t-sne plot.
# y_tsne is 1d array, which is class label.
tsne_dict = {
'X_tsne': X_2d.tolist(),
'y_tsne': target.tolist()
}
file_name = 'tsne-json' + '.json'
# save json file
save_json_fmt(outdir=tmpdir, _id=_id,
fname=file_name, content=tsne_dict)
[docs]def plot_dot_plot(tmpdir, _id, features,
target,
top_features_name,
indices,
random_state,
mode):
"""Make dot plot for based on decision tree.
Parameters
----------
tmpdir: string
Temporary directory for saving experiment results
_id: string
Experiment ID in Aliro
features: np.darray/pd.DataFrame
Features in training dataset
target: np.darray/pd.DataFrame
Target in training dataset
top_features: list
Top feature_names
indices: ndarray
Array of indices of top important features
random_state: int
Random seed for permuation importances
mode: string
'classification': Run classification analysis
'regression': Run regression analysis
Returns
-------
dtree_train_score, float
Test score from fitting decision tree on top important feat'
"""
# plot bar charts for top important features
import pydot
from sklearn.tree import export_graphviz
top_features = features[:, indices]
if mode == 'classification':
from sklearn.tree import DecisionTreeClassifier
dtree = DecisionTreeClassifier(random_state=random_state,
max_depth=DT_MAX_DEPTH)
scoring = SCORERS['balanced_accuracy']
else:
from sklearn.tree import DecisionTreeRegressor
dtree = DecisionTreeRegressor(random_state=random_state,
max_depth=DT_MAX_DEPTH)
scoring = SCORERS["neg_mean_squared_error"]
dtree.fit(top_features, target)
dtree_train_score = scoring(
dtree, top_features, target)
dot_file = '{0}{1}/dtree_{1}.dot'.format(tmpdir, _id)
png_file = '{0}{1}/dtree_{1}.png'.format(tmpdir, _id)
class_names = None
if mode == 'classification':
class_names = [str(i) for i in dtree.classes_]
export_graphviz(dtree, out_file=dot_file,
feature_names=top_features_name,
class_names=class_names,
filled=True, rounded=True,
special_characters=True)
(graph,) = pydot.graph_from_dot_file(dot_file)
graph.write_png(png_file)
return dtree_train_score
def plot_cv_pred(tmpdir, _id, X, y, cv_scores):
"""
Plot Cross-Validated Predictions and Residuals.
Parameters
----------
tmpdir: string
Temporary directory for saving experiment results
_id: string
Experiment ID in Aliro
X: np.darray/pd.DataFrame
Features in training dataset
y: np.darray/pd.DataFrame
Target in training dataset
cv_scores: dictionary
Return from sklearn.model_selection.cross_validate
Returns
-------
None
"""
pred_y = np.empty(y.shape)
resi_y = np.empty(y.shape)
cv = KFold(n_splits=10)
for cv_split, est in zip(cv.split(X, y), cv_scores['estimator']):
train, test = cv_split
pred_y[test] = est.predict(X[test])
resi_y[test] = pred_y[test] - y[test]
fig, ax = plt.subplots(figsize=(8, 6), dpi=300)
ax.set_title("10-Fold Cross-Validated Predictions")
ax.scatter(y, pred_y, edgecolors=(0, 0, 0))
ax.set_xlabel('Observed Values')
ax.set_ylabel('Predicted Values')
x = np.linspace(*ax.get_xlim())
ax.plot(x, x,
color="red",
linestyle='dashed')
plt.tight_layout()
plt.savefig(tmpdir + _id + '/reg_cv_pred' + '.png')
plt.close()
fig, ax = plt.subplots(1, 1, figsize=(8, 6), dpi=300)
ax.set_title("10-Fold Cross-Validated Residuals")
ax.scatter(pred_y, resi_y, edgecolors=(0, 0, 0))
ax.set_xlabel('Predicted Values')
ax.set_ylabel('Residuals')
ax.axhline(y=0.0,
color="red",
linestyle='dashed')
plt.tight_layout()
plt.savefig(tmpdir + _id + '/reg_cv_resi' + '.png')
plt.close()
fig, ax = plt.subplots(1, 1, figsize=(8, 6), dpi=300)
# add q-q plot for normalized CV residuals
from scipy.stats import probplot
z = (resi_y - np.mean(resi_y)) / np.std(resi_y)
series1 = probplot(z, dist="norm")
ax.scatter(series1[0][0], series1[0][1], edgecolors=(0, 0, 0))
ax.set_title("Q-Q Plot for Normalized Residuals")
x = np.linspace(*ax.get_xlim())
ax.plot(x, x,
color="red",
linestyle='dashed')
ax.set_xlabel('Theoretical Quantiles')
ax.set_ylabel('Ordered Normalized Residuals')
plt.tight_layout()
plt.savefig(tmpdir + _id + '/reg_cv_qq' + '.png')
plt.close()
# observed (y) vs predicted (pred_y)
# combine y, pred_y
# Cross-Validated Predictions
CVP_2d = list(map(list, zip(y.tolist(), pred_y.tolist())))
# same length as pred_y but it has 1
CVP_2d_class = np.zeros(len(pred_y))
# pred_y vs resi_y
# Cross-Validated Residuals
CVR_2d = list(map(list, zip(pred_y.tolist(), resi_y.tolist())))
CVR_2d_class = np.zeros(len(pred_y))
# series1[0][0] vs series1[0][1]
# Q-Q Plot for Normalized Residuals
QQNR_2d = list(map(list, zip(series1[0][0], series1[0][1])))
QQNR_2d_class = np.zeros(len(series1[0][0]))
# reg_cv_pred_resi_qq = {
# 'CVP_2d': CVP_2d,
# 'CVP_2d_class': CVP_2d_class.tolist(),
# 'CVR_2d': CVR_2d,
# 'CVR_2d_class': CVR_2d_class.tolist(),
# 'QQNR_2d': QQNR_2d,
# 'QQNR_2d_class': QQNR_2d_class.tolist(),
# }
# file_name = "reg_cv_pred_resi_qq" + ".json"
# save_json_fmt(outdir=tmpdir, _id=_id,
# fname=file_name, content=reg_cv_pred_resi_qq)
# cvp
reg_cvp = {
'CVP_2d': CVP_2d,
'CVP_2d_class': CVP_2d_class.tolist()
}
cvp_file_name = "reg_cvp" + ".json"
save_json_fmt(outdir=tmpdir, _id=_id,
fname=cvp_file_name, content=reg_cvp)
# cvr
reg_cvr = {
'CVR_2d': CVR_2d,
'CVR_2d_class': CVR_2d_class.tolist()
}
cvr_file_name = "reg_cvr" + ".json"
save_json_fmt(outdir=tmpdir, _id=_id,
fname=cvr_file_name, content=reg_cvr)
# qqnr
reg_qqnr = {
'QQNR_2d': QQNR_2d,
'QQNR_2d_class': QQNR_2d_class.tolist()
}
qqnr_file_name = "reg_qqnr" + ".json"
save_json_fmt(outdir=tmpdir, _id=_id,
fname=qqnr_file_name, content=reg_qqnr)
[docs]def export_model(tmpdir,
_id,
model,
filename,
target_name,
mode="classification",
random_state=42):
"""export model as a pickle file and generate a scripts for using the pickled model.
Parameters
----------
tmpdir: string
Temporary directory for saving experiment results
_id: string
Experiment ID in Aliro
model: scikit-learn estimator
A fitted scikit-learn model
filename: string
File name of input dataset
target_name: string
Target name in input data
mode: string
'classification': Run classification analysis
'regression': Run regression analysis
random_state: int
Random seed in model
Returns
-------
None
"""
pickle_file_name = 'model_{}.pkl'.format(_id)
pickle_file = '{0}{1}/model_{1}.pkl'.format(tmpdir, _id)
pickle_model = {}
pickle_model['model'] = model
pickle_model['data_filename'] = filename
joblib.dump(pickle_model, pickle_file)
pipeline_text1, pipeline_text2 = generate_export_codes(
pickle_file_name, model, filename, target_name, mode, random_state)
export_scripts = open(
"{0}{1}/scripts_reproduce_{1}.py".format(tmpdir, _id), "w")
export_scripts.write(pipeline_text1)
export_scripts.close()
export_scripts = open(
"{0}{1}/scripts_application_{1}.py".format(tmpdir, _id), "w")
export_scripts.write(pipeline_text2)
export_scripts.close()
[docs]def generate_export_codes(
pickle_file_name,
model,
filename,
target_name,
mode="classification",
random_state=42):
"""Generate all library import calls for use in stand alone python scripts.
Parameters
----------
pickle_file_name: string
Pickle file name for a fitted scikit-learn estimator
model: scikit-learn estimator
A fitted scikit-learn model
filename: string
File name of input dataset
target_name: string
Target name in input data
mode: string
'classification': Run classification analysis
'regression': Run regression analysis
random_state: int
Random seed in model
Returns
-------
pipeline_text: String
The Python scripts for applying the current
optimized pipeline in stand-alone python environment
"""
if mode == 'classification':
fold = "StratifiedKFold"
elif mode == 'regression':
fold = "KFold"
exported_codes_1 = """# Python version: {python_version}
# Results were generated with numpy v{numpy_version},
# pandas v{pandas_version} and scikit-learn v{skl_version}.
# random seed = {random_state}
# Training dataset filename = {dataset}
# Pickle filename = {pickle_file_name}
# Model in the pickle file: {model}
import numpy as np
import pandas as pd
import joblib
from sklearn.utils import check_X_y
from sklearn.metrics import make_scorer, confusion_matrix
from sklearn.model_selection import cross_validate, {fold}
# NOTE: Edit variables below with appropriate values
# path to your pickle file, below is the downloaded pickle file
pickle_file = '{pickle_file_name}'
# file path to the dataset
dataset = '{dataset}'
# target column name
target_column = '{target_name}'
seed = {random_state}
# load fitted model
pickle_model = joblib.load(pickle_file)
model = pickle_model['model']
# read input data
input_data = pd.read_csv(dataset, sep=None, engine='python')
""".format(
python_version=version.replace('\n', ''),
numpy_version=np.__version__,
pandas_version=pd.__version__,
skl_version=skl_version,
dataset=",".join(filename),
target_name=target_name,
pickle_file_name=pickle_file_name,
random_state=random_state,
model=str(model).replace('\n', '\n#'),
fold=fold
)
exported_codes_2 = exported_codes_1
if mode == "classification":
exported_codes_1 += """
# Balanced accuracy below was described in [Urbanowicz2015]:
# the average of sensitivity and specificity is computed for each class
# and then averaged over total number of classes.
# It is NOT the same as sklearn.metrics.balanced_accuracy_score,
# which is defined as the average of recall obtained on each class.
def balanced_accuracy(y_true, y_pred):
all_classes = list(set(np.append(y_true, y_pred)))
all_class_accuracies = []
for this_class in all_classes:
this_class_sensitivity = 0.
this_class_specificity = 0.
if sum(y_true == this_class) != 0:
this_class_sensitivity = \\
float(sum((y_pred == this_class) & (y_true == this_class))) /\\
float(sum((y_true == this_class)))
this_class_specificity = \\
float(sum((y_pred != this_class) & (y_true != this_class))) /\\
float(sum((y_true != this_class)))
this_class_accuracy = (this_class_sensitivity +
this_class_specificity) / 2.
all_class_accuracies.append(this_class_accuracy)
return np.mean(all_class_accuracies)
# reproducing training score and testing score from Aliro
features = input_data.drop(target_column, axis=1).values
target = input_data[target_column].values
# Checking dataset
features, target = check_X_y(features,
target,
dtype=None,
order="C",
force_all_finite=True)
scorer = make_scorer(balanced_accuracy)
# reproducing balanced accuracy scores
# computing cross-validated metrics
cv = StratifiedKFold(n_splits=10)
cv_scores = cross_validate(
estimator=model,
X=features,
y=target,
scoring=scorer,
cv=cv,
return_train_score=True,
return_estimator=True
)
train_score = cv_scores['train_score'].mean()
test_score = cv_scores['test_score'].mean()
print("Training score: ", train_score)
print("Testing score: ", test_score)
# reproducing confusion matrix
pred_cv_target = np.empty(target.shape)
for cv_split, est in zip(cv.split(features, target), cv_scores['estimator']):
train, test = cv_split
pred_cv_target[test] = est.predict(features[test])
cnf_matrix = confusion_matrix(
target, pred_cv_target, labels=model.classes_)
print("Confusion Matrix:", cnf_matrix)
"""
elif mode == "regression":
exported_codes_1 += """
# reproducing training score and testing score from Aliro
features = input_data.drop(target_column, axis=1).values
target = input_data[target_column].values
# Checking dataset
features, target = check_X_y(features,
target,
dtype=None,
order="C",
force_all_finite=True)
# reproducing r2 scores
# computing cross-validated metrics
cv_scores = cross_validate(
estimator=model,
X=features,
y=target,
scoring='r2',
cv=10,
return_train_score=True,
return_estimator=True
)
train_score = cv_scores['train_score'].mean()
test_score = cv_scores['test_score'].mean()
print("Training score: ", train_score)
print("Testing score: ", test_score)
"""
exported_codes_2 += """
# Application 1: cross validation of fitted model on a new dataset
testing_features = input_data.drop(target_column, axis=1).values
testing_target = input_data[target_column].values
# Get holdout score for fitted model
print("Holdout score: ", end="")
print(model.score(testing_features, testing_target))
# Application 2: predict outcome by fitted model
# In this application, the input dataset may not include target column
# Please comment this line below if there is no target column in input dataset
input_data.drop(target_column, axis=1, inplace=True)
predict_target = model.predict(input_data.values)
"""
return exported_codes_1, exported_codes_2