"""~This file is part of the Aliro library~
Copyright (C) 2023 Epistasis Lab,
Center for Artificial Intelligence Research and Education (CAIRE),
Department of Computational Biomedicine (CBM),
Cedars-Sinai Medical Center.
Aliro is maintained by:
- Hyunjun Choi (hyunjun.choi@cshs.org)
- Miguel Hernandez (miguel.e.hernandez@cshs.org)
- Nick Matsumoto (nicholas.matsumoto@cshs.org)
- Jay Moran (jay.moran@cshs.org)
- and many other generous open source contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
(Autogenerated header, do not modify)
"""
import numpy as np
import pandas as pd
import time
from datetime import datetime
import pickle
import os
import warnings
import logging
import sys
from ..knowledgebase_utils import load_knowledgebase
from ..metalearning import generate_metafeatures
from ..metalearning import Dataset
from ..metrics import SCORERS
from ..recommender import (
AverageRecommender,
RandomRecommender,
KNNMetaRecommender,
CoClusteringRecommender,
KNNWithMeansRecommender,
KNNDatasetRecommender,
KNNMLRecommender,
SlopeOneRecommender,
SVDRecommender)
from .config import classifier_config_dict, regressor_config_dict
from sklearn.model_selection import cross_val_score, ParameterGrid
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.ensemble import VotingClassifier, VotingRegressor
from sklearn.exceptions import ConvergenceWarning
from joblib import Parallel, delayed
# ignore ConvergenceWarning in SVR and SVC
warnings.filterwarnings("ignore", category=ConvergenceWarning)
logger = logging.getLogger(__name__)
GitHub_URL = ("https://github.com/EpistasisLab/Aliro/raw/"
"master/data/knowledgebases/")
[docs]class PennAI(BaseEstimator):
"""Aliro standalone sklearn wrapper.
Responsible for:
- checking for user requests for recommendations,
- checking for new results from experiments,
- calling the recommender system to generate experiment recommendations,
- posting the recommendations to the API.
- handling communication with the API.
:param rec_class: ai.BaseRecommender - recommender to use
:param verbose: int, 0 quite, 1 info, 2 debug
:param serialized_rec: string or None
Path of the file to save/load a serialized recommender.
If the filename is not provided, the default filename based on the recommender
type, and metric, and knowledgebase used.
:param scoring: str - scoring for evaluating recommendations
:param n_recs: int - number of recommendations to make for each iteration
:param n_iters: int = total number of iteration
:param knowledgebase: file - input file for knowledgebase
:param kb_metafeatures: inputfile for metafeature
:param config_dict: python dictionary - inputfile for hyperparams space for all ML algorithms
:param ensemble: if it is a integer N, Aliro will use
VotingClassifier/VotingRegressor to ensemble
top N best models into one model.
:param max_time_mins: maximum time in minutes that Aliro can run
:param stopping_criteria: int, optional
A number of iterations without improvments in best metric.
Stop recommendations early if the best metric
does not improve in the number of iterations iterations.
:param random_state: random state for recommenders
:param n_jobs: int (default: 1) The number of cores to dedicate to
computing the scores with joblib. Assigning this parameter to -1
will dedicate as many cores as are available on your system.
"""
def __init__(self,
rec_class=None,
verbose=0,
serialized_rec=None,
scoring=None,
n_recs=10,
n_iters=10,
knowledgebase=None,
kb_metafeatures=None,
config_dict=None,
ensemble=None,
max_time_mins=None,
stopping_criteria=None,
random_state=None,
n_jobs=1):
"""Initializes AI managing agent."""
self.rec_class = rec_class
self.verbose = verbose
self.serialized_rec = serialized_rec
self.scoring = scoring
self.n_recs = n_recs
self.n_iters = n_iters
self.knowledgebase = knowledgebase
self.kb_metafeatures = kb_metafeatures
self.config_dict = config_dict
self.ensemble = ensemble
self.max_time_mins = max_time_mins
self.stopping_criteria = stopping_criteria
self.random_state = random_state
self.n_jobs = n_jobs
def _fit_init(self):
"""
fit initilization
"""
# recommendation engines for different problem types
# will be expanded as more types of probles are supported
# (classification, regression, unsupervised, etc.)
if self.scoring is not None:
self.scoring_ = self.scoring
# match scoring_ to metric in knowledgebase
metric_match = {
"accuracy": "accuracy",
"balanced_accuracy": "bal_accuracy",
"f1": "macrof1",
"f1_macro": "macrof1",
"r2": "r2_cv_mean",
"explained_variance": "explained_variance_cv_mean",
"neg_mean_squared_error": "neg_mean_squared_error_cv_mean"
}
self.metric_ = metric_match[self.scoring_]
if self.verbose == 2:
logger_level = logging.DEBUG
elif self.verbose == 1:
logger_level = logging.INFO
elif self.verbose <= 0:
logger_level = logging.ERROR
logger.setLevel(logger_level)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(module)s: %(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
# Request manager settings
self.n_recs_ = self.n_recs if self.n_recs > 0 else 1
# local dataframe of datasets and their metafeatures
self.dataset_mf_cache = pd.DataFrame()
self._initilize_recommenders(self.rec_class) # set self.rec_engine
if self.stopping_criteria is not None:
if self.stopping_criteria < 0:
raise ValueError(
"stopping_criteria should be a positive number.")
self.best_score_init = -float("inf")
self.bad_iteration = 0
if self.max_time_mins is not None:
if self.max_time_mins < 0:
raise ValueError("max_time_mins should be a positive number.")
def _generate_metafeatures_from_X_y(self, X, y):
"""
Return meta_features based on input X and y in fit().
:param X: pd.DataFrame
:param y: pd.Series
"""
df = X.copy()
df['pennai_target'] = y
dataset = Dataset(df=df,
dependent_col="pennai_target",
prediction_type=self.mode
)
self.datasetId = dataset.m_data_hash()
meta_features = generate_metafeatures(dataset)
mf = [meta_features]
df = pd.DataFrame.from_records(mf, columns=meta_features.keys())
# include dataset name
df['dataset'] = self.datasetId
df.sort_index(axis=1, inplace=True)
return df
def _valid_combo(self, combo, bad_combos):
"""Checks if parameter combination is valid."""
for bad_combo in bad_combos:
bc = {}
for b in bad_combo:
bc.update(b)
bad = True
for k, v in bc.items():
if combo[k] != v:
bad = False
return not bad
def _get_all_ml_p(self, categoryFilter=None):
"""
Returns a list of ml and parameter options based on config dictionary
:returns: pd.DataFrame - unique ml algorithm and parameter combinations
with columns 'alg_name', 'category', 'alg_name', 'parameters'
'parameters' is a dictionary of parameters
"""
if self.config_dict is not None:
self.config_dict_ = self.config_dict
result = [] # returned value
self.algorithms = []
for k, v in self.config_dict_.items():
k_split = k.split('.')
model_name = k_split[-1]
algo = {}
algo['name'] = model_name
algo['path'] = ".".join(k_split[:-1])
logger.debug('Checking ML: ' + model_name)
# get a dictionary of hyperparameters and their values
hyperparam_dict = v['params']
if "static_parameters" in v.keys():
self.static_parameters[model_name] = v["static_parameters"]
else:
self.static_parameters[model_name] = {}
all_hyperparam_combos = list(ParameterGrid(hyperparam_dict))
#print('\thyperparams: ',hyperparam_dict)
logger.debug(
'{} hyperparameter combinations for {}'.format(
len(all_hyperparam_combos), model_name)
)
# print(all_hyperparam_combos)
for ahc in all_hyperparam_combos:
if 'invalid_params_comb' in v.keys():
if not self._valid_combo(
ahc, v['invalid_params_comb']):
continue
result.append({'algorithm': model_name,
'category': self.mode,
'parameters': ahc,
'alg_name': model_name})
self.algorithms.append(algo)
# convert to dataframe, making sure there are no duplicates
all_ml_p = pd.DataFrame(result)
tmp = all_ml_p.copy()
tmp['parameters'] = tmp['parameters'].apply(str)
assert (len(all_ml_p) == len(tmp.drop_duplicates()))
if (len(all_ml_p) > 0):
logger.info(str(len(all_ml_p)) + ' ml-parameter options loaded')
logger.info('_get_all_ml_p() algorithms:' +
str(all_ml_p.algorithm.unique()))
else:
logger.error('_get_all_ml_p() parsed no results')
return all_ml_p
# -----------------
# Init methods
# -----------------
def _initilize_recommenders(self, rec_class):
"""
Initilize recommender
"""
# default supervised learning recommender settings
self.REC_ARGS = {'metric': self.metric_,
'ml_type': self.ml_type,
'random_state': self.random_state}
# add static_parameters for each ML methods
self.static_parameters = {}
# set the registered ml parameters in the recommenders
ml_p = self._get_all_ml_p()
self.REC_ARGS['ml_p'] = ml_p
if self.knowledgebase and self.kb_metafeatures: #both are not None
self.kb_ = self.knowledgebase
self.mf_ = self.kb_metafeatures
elif self.knowledgebase or self.kb_metafeatures: # one of them are missing
raise ValueError(
"please provide both knowledgebase and kb_metafeatures")
resultsData = self._load_kb()
logger.info('Knowledgebase loaded')
if self.serialized_rec:
head_tail = os.path.split(self.serialized_rec)
self.REC_ARGS['serialized_rec_filename'] = head_tail[1]
self.REC_ARGS['serialized_rec_directory'] = head_tail[0]
self.REC_ARGS['load_serialized_rec'] = "always"
self.REC_ARGS['knowledgebase_results'] = resultsData
# Create supervised learning recommenders
if self.rec_class is not None:
self.rec_engine = self.rec_class(
**self.REC_ARGS)
else:
self.rec_engine = SVDRecommender(
**self.REC_ARGS)
if not self.serialized_rec:
self.rec_engine.update(
resultsData, self.dataset_mf_cache, source='pennai')
logger.debug("recomendation engines initilized。 ")
def _load_kb(self):
"""Bootstrap the recommenders with the knowledgebase."""
logger.info('loading pmlb knowledgebase')
kb = load_knowledgebase(
resultsFiles=[self.kb_],
metafeaturesFiles=[self.mf_]
)
all_df_mf = kb['metafeaturesData'].set_index('_id', drop=False)
# keep only metafeatures with results
df = all_df_mf.loc[kb['resultsData'][self.mode]['_id'].unique()]
self.dataset_mf_cache = self.dataset_mf_cache.append(df)
return kb['resultsData'][self.mode]
# -----------------
# Utility methods
# -----------------
# todo ! to working yet
def _get_results_metafeatures(self):
"""
Return a pandas dataframe of metafeatures
Retireves metafeatures from self.dataset_mf_cache if they exist,
otherwise queries the api and updates the cache.
:param results_data: experiment results with associated datasets
"""
d = self.datasetId
df = self.meta_features
df['dataset'] = d
df.set_index('dataset', inplace=True)
self.dataset_mf_cache = self.dataset_mf_cache.append(df)
return df
def _update_recommender(self, new_results_df):
"""Update recommender models based on new experiment results in
new_results_df.
"""
if len(new_results_df) >= 1:
new_mf = self._get_results_metafeatures()
self.rec_engine.update(new_results_df, new_mf)
logger.debug(time.strftime("%Y %I:%M:%S %p %Z", time.localtime()) +
': recommender updated')
# -----------------
# Syncronous actions an AI request can take
# -----------------
def _generate_recommendations(self):
"""
:returns list of maps that represent request payload objects
"""
logger.debug(
"_generate_recommendations({},{})".format(
self.datasetId, self.n_recs_))
recommendations = []
ml, p, ai_scores = self.rec_engine.recommend(
dataset_id=self.datasetId,
n_recs=self.n_recs_,
dataset_mf=self.meta_features)
for alg, params, score in zip(ml, p, ai_scores):
recommendations.append({'dataset_id': self.datasetId,
'algorithm': alg,
'parameters': params,
'ai_score': score,
})
return recommendations
def _stop_by_max_time_mins(self):
"""Stop optimization process once maximum minutes have elapsed."""
if self.max_time_mins:
total_mins_elapsed = (
datetime.now() - self._start_datetime).total_seconds() / 60.
return total_mins_elapsed >= self.max_time_mins
else:
return False
def _stop_by_stopping_criteria(self):
"""Stop optimization process once stopping_criteria have reached."""
if self.stopping_criteria is not None:
if self.best_score_iter > self.best_score_init:
# a new loop
self.best_score_init = self.best_score_iter
# iteration without improvments
self.bad_iteration = 0
else:
self.bad_iteration += 1
if self.bad_iteration >= self.stopping_criteria:
return True
else:
return False
else:
return False
[docs] def fit(self, X, y):
"""Trains Aliro on X,y.
Parameters
----------
X: array-like {n_samples, n_features}
Feature matrix of the training set
y : ndarray of shape (n_samples,)
Target of the training set
Returns
-------
self : object
"""
self._fit_init()
# generate datasetId based on import X, y
# make pd.DataFrameBased on X, y
if isinstance(X, np.ndarray):
columns = ["Feature_{}".format(i) for i in range(X.shape[1])]
features = pd.DataFrame(X, columns=columns)
if "pennai_target" in features.columns:
raise ValueError(
'The column name "pennai_target" is not allowed in X, '
'please check your dataset and remove/rename that column')
# get meta_features based on X, y
self.meta_features = self._generate_metafeatures_from_X_y(features, y)
# save all results
self.recomms = []
for i, x in enumerate(self.algorithms):
logger.debug('Importing ML methods: ' + str(x['name']))
# import scikit obj from string
exec('from {} import {}'.format(x['path'], x['name']))
self._start_datetime = datetime.now()
for i in range(self.n_iters):
# stop by max_time if step
if self._stop_by_max_time_mins():
logger.info(
"Stop optimization process since"
" {} minutes have elapsed.".format(
self.max_time_mins))
break
logger.info("Start iteration #{}".format(i + 1))
recommendations = self._generate_recommendations()
new_results = []
ests = []
ress = []
for r in recommendations:
logger.debug(r)
# evaluate each recomendation
# convert string to scikit-learn obj
est = eval(r['algorithm'])()
# convert str to bool/none
params = r['parameters']
for k, v in params.items():
if isinstance(v, str):
new_v = _bool_or_none(v)
params[k] = new_v
# add staticparameters
params.update(self.static_parameters[r['algorithm']])
avail_params = est.get_params()
if 'random_state' in avail_params and self.random_state:
params['random_state'] = self.random_state
est.set_params(**params)
# initilize a result
res = {
'_id': self.datasetId,
'algorithm': r['algorithm'],
'parameters': params,
}
ests.append(est)
ress.append(res)
# Parallel computing step
scores_list = Parallel(n_jobs=self.n_jobs)(delayed(
cross_val_score)(estimator=est,
X=X,
y=y,
cv=10,
scoring=self.scoring_)
for est in ests)
# summary result
for res, scores in zip(ress, scores_list):
res[self.metric_] = np.mean(scores)
new_results.append(res)
self.recomms += new_results
new_results_df = pd.DataFrame(new_results)
# get best score in each iteration
self.best_score_iter = new_results_df[self.metric_].max()
# update recommender each iteration
self._update_recommender(new_results_df)
# get best score in new results in this iteration
# stop by stopping_criteria
if self._stop_by_stopping_criteria():
logger.info(
"Stop optimization process since recommendations"
" did not imporve over {} iterations.".format(
self.stopping_criteria))
break
# convert to pandas.DataFrame from finalize result
self.recomms = pd.DataFrame(self.recomms)
self.recomms.sort_values(
by=self.metric_,
ascending=False,
inplace=True
)
self.best_result_score = self.recomms[self.metric_].values[0]
self.best_result = self.recomms.iloc[0]
self.best_algorithm = self.best_result['algorithm']
self.best_params = self.best_result['parameters']
if not self.ensemble:
self.estimator = eval(self.best_algorithm)()
self.estimator.set_params(**self.best_params)
else:
ensemble_ests = self.recomms['algorithm'].values[:self.ensemble]
ests_params = self.recomms['parameters'].values[:self.ensemble]
estimators = []
for est, params in zip(ensemble_ests, ests_params):
estimator = eval(est)()
estimator.set_params(**params)
est_name = 'clf' + str(len(estimators))
estimators.append((est_name, estimator))
if self.mode == "classification":
self.estimator = VotingClassifier(estimators=estimators,
voting='hard',
n_jobs=self.n_jobs)
else:
self.estimator = VotingRegressor(estimators=estimators,
n_jobs=self.n_jobs)
self.estimator.fit(X, y)
logger.info("Best model: {}".format(self.estimator))
return self
[docs] def predict(self, X):
"""
Predictions for X.
Parameters
----------
X: array-like {n_samples, n_features}
Feature matrix of the testing set
Returns
-------
y : ndarray of shape (n_samples,)
The predicted target.
"""
if not hasattr(self, 'estimator'):
raise RuntimeError(
'A estimator has not yet been optimized.'
' Please call fit() first.'
)
return self.estimator.predict(X)
[docs] def score(self, X, y):
"""Return the score on the given testing data using the user-specified
scoring function.
Parameters
----------
X: array-like {n_samples, n_features}
Feature matrix of the testing set
y : ndarray of shape (n_samples,)
Target of the testing set
Returns
-------
accuracy_score: float
The estimated test set accuracy
"""
if not hasattr(self, 'estimator'):
raise RuntimeError(
'A estimator has not yet been optimized.'
' Please call fit() first.'
)
scorer = SCORERS[self.scoring_]
score = scorer(
self.estimator,
X,
y
)
return score
#def save(self, filename):
#"""save pickled recommender.
#Parameters
#----------
#filename: string
#Filename for saving pickled recommender.
#Returns
#-------
#None
#"""
#self.rec_engine.save(filename)
def _bool_or_none(val):
"""Convert string to boolean type/None.
Parameters
----------
val: string
Value of a parameter in string type
Returns
-------
_: boolean or None
Converted value in boolean type
"""
if (val.lower() == 'true'):
return True
elif (val.lower() == 'false'):
return False
elif (val.lower() == 'none'):
return None
else:
return val
[docs]class PennAIClassifier(PennAI, ClassifierMixin):
"""Aliro engine for classification tasks.
Read more in the :ref:`userguide_sklearn_api`.
Parameters
----------
rec_class: ai.recommender.base.BaseRecommender or None
Recommender to use in the Aliro engine.
if it is None, Aliro will use SVDRecommender by default.
verbose: int
0 quite, 1 info, 2 debug
serialized_rec: string or None
Path of the file to save/load a serialized recommender.
If the filename is not provided, the default filename based
on the recommender type, and metric, and knowledgebase used.
scoring: str
scoring for evaluating recommendations. It could be "accuracy",
"balanced_accuracy", "f1", "f1_macro"
n_recs: int
number of recommendations to make for each iteration
n_iters: int
total number of iterations
knowledgebase: str
input file for knowledgebase
kb_metafeatures: str
input file for metafeature
config_dict: python dictionary
dictionary for hyperparameter search space for all ML algorithms
ensemble: int
if it is a integer N, Aliro will use VotingClassifier/VotingRegressor
to ensemble top N best models into one model.
max_time_mins:
maximum time in minutes that Aliro can run
stopping_criteria: int
A number of iterations without improvments in best metric.
Stop recommendations early if the best metric
does not improve in the number of iterations iterations.
random_state: int
random state for recommenders
n_jobs: int
The number of cores to dedicate to computing the scores with joblib.
Assigning this parameter to -1 will dedicate as many cores as
are available on your system.
"""
mode = "classification"
scoring_ = "accuracy"
ml_type = "classifier"
config_dict_ = classifier_config_dict
kb_ = GitHub_URL + "sklearn-benchmark-data-knowledgebase-r6.tsv.gz"
mf_ = GitHub_URL + "pmlb_classification_metafeatures.csv.gz"
[docs]class PennAIRegressor(PennAI, RegressorMixin):
"""Aliro engine for regression tasks.
Read more in the :ref:`userguide_sklearn_api`.
Parameters
----------
rec_class: ai.recommender.base.BaseRecommender or None
Recommender to use in the Aliro engine.
if it is None, Aliro will use SVDRecommender by default.
verbose: int
0 quite, 1 info, 2 debug
serialized_rec: string or None
Path of the file to save/load a serialized recommender.
If the filename is not provided, the default filename based
on the recommender type, and metric, and knowledgebase used.
scoring: str
scoring for evaluating recommendations. It could be "r2",
"explained_variance", "neg_mean_squared_error"
n_recs: int
number of recommendations to make for each iteration
n_iters: int
total number of iterations
knowledgebase: str
input file for knowledgebase
kb_metafeatures: str
input file for metafeature
config_dict: python dictionary
dictionary for hyperparameter search space for all ML algorithms
ensemble: int
if it is a integer N, Aliro will use VotingClassifier/VotingRegressor
to ensemble top N best models into one model.
max_time_mins:
maximum time in minutes that Aliro can run
stopping_criteria: int
A number of iterations without improvments in best metric.
Stop recommendations early if the best metric
does not improve in the number of iterations iterations.
random_state: int
random state for recommenders
n_jobs: int
The number of cores to dedicate to computing the scores with joblib.
Assigning this parameter to -1 will dedicate as many cores as
are available on your system.
"""
mode = "regression"
scoring_ = "neg_mean_squared_error"
ml_type = "regressor"
config_dict_ = regressor_config_dict
kb_ = GitHub_URL + "pmlb_regression_results.tsv.gz"
mf_ = GitHub_URL + "pmlb_regression_metafeatures.csv.gz"