"""~This file is part of the Aliro library~
Copyright (C) 2023 Epistasis Lab,
Center for Artificial Intelligence Research and Education (CAIRE),
Department of Computational Biomedicine (CBM),
Cedars-Sinai Medical Center.
Aliro is maintained by:
- Hyunjun Choi (hyunjun.choi@cshs.org)
- Miguel Hernandez (miguel.e.hernandez@cshs.org)
- Nick Matsumoto (nicholas.matsumoto@cshs.org)
- Jay Moran (jay.moran@cshs.org)
- and many other generous open source contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
(Autogenerated header, do not modify)
"""
"""
Recommender system for Aliro.
"""
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(module)s: %(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
import numpy as np
import os
import pdb
import pickle
import gzip
import random
import hashlib
import copy
from pandas.util import hash_pandas_object
import pandas as pd
# implementing metaclass __repr__ for more human readable
# names for generated tests in test_recommender.py
class MC(type):
def __repr__(self):
return self.__qualname__
[docs]class BaseRecommender(object, metaclass=MC):
"""Base recommender for Aliro
The BaseRecommender is not intended to be used directly; it is a skeleton
class defining the interface for future recommenders within the Aliro
project.
Parameters
----------
ml_type: str, 'classifier' or 'regressor'
Recommending classifiers or regressors. Used to determine ML options.
metric: str (default: accuracy for classifiers, mse for regressors)
The metric by which to assess performance on the datasets.
ml_p: DataFrame (default: None)
Contains all valid ML parameter combos, with columns 'algorithm' and
'parameters'
knowledgebase_results: Pandas DataFrame or None
Initial knowledgebase results data.
If not None and not loading a serialized recommender, the recommender
will initialize and train on this data.
If loading a serialized recommender, this is the knowledgebase that
accompanies it.
knowledgebase_metafeatures: Pandas DataFrame or None
Initial knowledgebase metafeatures data.
If loading a serialized recommender, this is the knowledgebase that
accompanies it.
serialized_rec_directory: string or None
Name of the directory to save/load a serialized recommender.
Default directory is "."
serialized_rec_filename: string or None
Name of the file to save/load a serialized recommender.
If the filename is not provided, the default filename based on the
recommender type, and metric, and knowledgebase used.
load_serialized_rec: str, "always", "never", "if_exists"
Whether to attempt to load a serialized recommender:
"if_exists" - If a serialized recomender exsists at the specified
path, load it.
"always" - Always load a serialized recommender. Throw an
exception if no serialized recommender exists.
"never" - Never load a serialized recommender.
"""
def __init__(self,
ml_type='classifier',
metric=None,
ml_p=None,
random_state=None,
knowledgebase_results=None,
knowledgebase_metafeatures=None,
load_serialized_rec="if_exists",
serialized_rec_directory=None,
serialized_rec_filename=None):
"""Initialize recommendation system."""
if ml_type not in ['classifier', 'regressor']:
raise ValueError('ml_type must be "classifier" or "regressor"')
if load_serialized_rec not in ["always", "never", "if_exists"]:
raise ValueError('load_serialized_rec must be "always", "never" or'
' "if_exists"')
self.random_state = random_state
if self.random_state is not None:
random.seed(self.random_state)
np.random.seed(self.random_state)
logger.info('self.random_state: ' + str(self.random_state))
self.ml_type = ml_type
if metric is None:
self.metric='bal_accuracy' if self.ml_type=='classifier' else 'mse'
else:
self.metric = metric
# maintain a set of dataset-algorithm-parameter combinations that have
# already been evaluated
self.trained_dataset_models = set()
# hash table for parameter options
self.hash_2_param = {}
# get ml+p combos (note: this triggers a property in base recommender)
# self.ml_htable = {}
self.ml_p = ml_p
# generate the serialized recommender path
self.serialized_rec_path = self._generate_serialized_rec_path(
serialized_rec_filename,
serialized_rec_directory
)
# train an empty recommender, either using the provided kb or
# loading a serialized rec from file
self._train_empty_rec(
ml_type,
metric,
ml_p,
random_state,
knowledgebase_results,
knowledgebase_metafeatures,
load_serialized_rec,
serialized_rec_directory,
serialized_rec_filename)
def _train_empty_rec(self,
ml_type,
metric,
ml_p,
random_state,
knowledgebase_results,
knowledgebase_metafeatures,
load_serialized_rec,
serialized_rec_directory,
serialized_rec_filename):
# load serialized rec, or initialize from the given
# knowledgebase
logger.info(f"load_serialized_rec='{load_serialized_rec}'")
logger.info(f"self.serialized_rec_path='{self.serialized_rec_path}'")
if load_serialized_rec == "always":
if not os.path.exists(self.serialized_rec_path):
raise ValueError(f"load_serialized_rec='{load_serialized_rec}'"
" but cannot load serialized recommender:"
f" '{self.serialized_rec_path}'")
self.load(self.serialized_rec_path, knowledgebase_results)
elif load_serialized_rec == "if_exists":
if os.path.exists(self.serialized_rec_path):
logger.info(f"Loading serialized recommender:"
f" '{self.serialized_rec_path}'")
self.load(self.serialized_rec_path, knowledgebase_results)
else:
logger.warn(f"Not loading serialized recommender, file does "
f"not exist: '{self.serialized_rec_path}'")
if knowledgebase_results is not None:
logger.info(f"Initializing new recommender from provided "
"knowledgebase")
self.update(knowledgebase_results,
knowledgebase_metafeatures, source='knowledgebase')
else:
logger.info(f"Not loading serialized recommender.")
if knowledgebase_results is not None:
logger.info(f"Initializing new recommender from provided "
"knowledgebase")
self.update(knowledgebase_results, knowledgebase_metafeatures,
source='knowledgebase')
def _default_serialized_rec_filename(self):
"""Generate the default name of the serialized instance of this
recommender
"""
# Hardcoading the informal kb descriptor for now, this should be changed.
return (
self.__class__.__name__
+ '_' + self.ml_type
+ '_' + self.metric
+ '_pmlb_20200821'
+'.pkl.gz')
def _generate_serialized_rec_path(self,
serialized_rec_filename=None,
serialized_rec_directory=None):
""" Generate the path to save/load serialized recommender
Parameters
----------
serialized_rec_filename
serialized_rec_directory
"""
# dynamic default values
serialized_rec_directory = serialized_rec_directory or "."
serialized_rec_filename = serialized_rec_filename or \
self._default_serialized_rec_filename()
return os.path.join(serialized_rec_directory, serialized_rec_filename)
[docs] def update(self, results_data, results_mf=None, source='pennai'):
"""Update ML / Parameter recommendations.
Parameters
----------
results_data: DataFrame
columns corresponding to:
'algorithm'
'parameters'
self.metric
results_mf: DataFrame, optional
columns corresponding to metafeatures of each dataset in
results_data.
source: string
if 'pennai', will update tally of trained dataset models
"""
assert(results_data is not None), "results_data cannot be None"
if results_data.isna().values.any():
logger.warning('There are NaNs in results_data.')
#logger.warning(str(results_data))
logger.warning(results_data.head())
logger.error('Dropping NaN results.')
results_data.dropna(inplace=True)
# update parameter hash table
logger.info('updating hash_2_param...')
self.hash_2_param.update(
{self._hash_simple_dict(x):x
for x in results_data['parameters'].values})
param_2_hash = {frozenset(v.items()):k
for k,v in self.hash_2_param.items()}
# store parameter_hash variable in results_data
logger.info('storing parameter hash...')
results_data['parameter_hash'] = results_data['parameters'].apply(
lambda x: param_2_hash[frozenset(x.items())])
# update results list
if source == 'pennai':
self._update_trained_dataset_models_from_df(results_data)
def _hash_simple_dict(self, x):
"""Provides sha256 hash for a dictionary with hashable items."""
hasher = hashlib.sha256()
hasher.update(repr(tuple(sorted(x.items()))).encode())
return hasher.hexdigest()
[docs] def recommend(self, dataset_id=None, n_recs=1, dataset_mf=None):
"""Return a model and parameter values expected to do best on dataset.
Parameters
----------
dataset_id: string
ID of the dataset for which the recommender is generating
recommendations.
n_recs: int (default: 1), optional
Return a list of length n_recs in order of estimators and parameters
expected to do best.
dataset_mf: DataFrame
metafeatures of the dataset represented by dataset_id
"""
# self.dataset_id_to_hash.update(
# {dataset_id:dataset_mf['_id'].values[0]})
[docs] def load(self, filename=None, knowledgebase=None):
"""Load a saved recommender state.
:param filename: string or None
Name of file to load
:param knowledgebase: string or None
DataFrame with columns corresponding to:
'dataset'
'algorithm'
'parameters'
self.metric
"""
if filename is None:
fn = self.serialized_rec_path
else:
fn = filename
if os.path.isfile(fn):
logger.info('loading recommender ' + fn + ' from file')
f = gzip.open(fn, 'rb')
tmp_dict = pickle.load(f)
f.close()
#logger.debug(f"rec keys: {tmp_dict.keys()}")
# check if parameters match, if not throw warning/error
for k,v in tmp_dict.items():
if k in self.__dict__.keys():
try:
if self.__dict__[k] != tmp_dict[k]:
logger.warn(k+' changing from '
+ str(self.__dict__[k])[:20] + '... to '
+ str(tmp_dict[k])[:20] + '...')
except:
pass
else:
logger.warn('adding ' + k+'=' + str(tmp_dict[k])[:20]
+ '...')
logger.info('updating internal state')
# check ml_p hashes
rowHashes = hash_pandas_object(self.ml_p.apply(str)).values
newHash = hashlib.sha256(rowHashes).hexdigest()
if 'ml_p_hash' in tmp_dict.keys():
if newHash == tmp_dict['ml_p_hash']:
logger.info('ml_p hashes match')
else:
error_msg = ('the ml_p hash from the pickle is different. '
'This likely means the algorithm configurations have '
'changed since this recommender was saved. You should '
'update and save a new one.')
logger.error(error_msg)
# debugging
if ('_ml_p' in tmp_dict):
pd.testing.assert_frame_equal(self.ml_p, tmp_dict['_ml_p'])
else:
logger.error(f"Pickle does not contain _ml_p for debugging.")
logger.error(f"Keys: {tmp_dict.keys()}")
raise ValueError(error_msg)
del tmp_dict['ml_p_hash']
# update self with loaded pickle
self.__dict__.update(tmp_dict)
return True
else:
logger.warning('Could not load filename '+ fn)
return False
[docs] def save(self, filename=None):
"""Save the current recommender.
:param filename: string or None
Name of file to load
"""
if filename is None:
fn = self.serialized_rec_path
else:
fn = filename
if os.path.isfile(fn):
logger.warning('overwriting ' + fn)
save_dict = copy.deepcopy(self.__dict__)
# remove results_df to save space. this gets loaded by load() fn.
if 'results_df' in save_dict.keys():
logger.debug('deleting save_dict[results_df]:'
+str(save_dict['results_df'].head()))
rowHashes = hash_pandas_object(save_dict['results_df']).values
save_dict['results_df_hash'] = hashlib.sha256(
rowHashes).hexdigest()
del save_dict['results_df']
# remove ml_p to save space
rowHashes = hash_pandas_object(save_dict['_ml_p'].apply(str)).values
save_dict['ml_p_hash'] = hashlib.sha256(rowHashes).hexdigest()
del save_dict['_ml_p']
del save_dict['mlp_combos']
logger.info('saving recommender as ' + fn)
f = gzip.open(fn, 'wb')
pickle.dump(save_dict, f, 2)
f.close()
[docs] def update_and_save(self, results_data, results_mf=None, source='pennai',
filename=None):
"""runs self.update() and self.save.
Parameters
----------
results_data: DataFrame
columns corresponding to:
'algorithm'
'parameters'
self.metric
results_mf: DataFrame, optional
columns corresponding to metafeatures of each dataset in
results_data.
source: string
if 'pennai', will update tally of trained dataset models
"""
self.update(results_data, results_mf, source)
self.save(filename)
@property
def ml_p(self):
logger.debug('getting ml_p')
return self._ml_p
@ml_p.setter
def ml_p(self, value):
logger.debug('setting ml_p')
if value is not None:
#filter out SVC (temporary)
self._ml_p = value[['algorithm','parameters']]
logger.debug('setting hash table')
# maintain a parameter hash table for parameter settings
# if 'alg_name' not in value.columns:
# self._ml_p['alg_name'] = self._ml_p['algorithm']
self.hash_2_param = {
self._hash_simple_dict(x):x
for x in self._ml_p['parameters'].values}
param_2_hash = {frozenset(v.items()):k
for k,v in self.hash_2_param.items()}
# machine learning - parameter combinations
self.mlp_combos = (self._ml_p['algorithm']+'|'+
self._ml_p['parameters'].apply(lambda x:
param_2_hash[frozenset(x.items())]))
# filter out duplicates
self.mlp_combos = self.mlp_combos.drop_duplicates()
# # set ml_htable
# if 'alg_name' in value.columns:
# self.ml_htable = {
# k:v for v,k in zip(value['alg_name'].unique(),
# value['algorithm'].unique())
# }
else:
logger.warning('value of ml_p is None')
logger.debug('param_2_hash:{} objects'.format(len(param_2_hash)))
def _update_trained_dataset_models_from_df(self, results_data):
'''stores the trained_dataset_models to aid in filtering repeats.'''
results_data.loc[:, 'dataset-algorithm-parameters'] = (
results_data['_id'].values + '|' +
results_data['algorithm'].values + '|' +
results_data['parameter_hash'].values)
for i,phash in enumerate(results_data['parameter_hash'].values):
if phash not in self.hash_2_param.keys():
logger.error(phash
+' not in self.hash_2_param. parameter values: '
+ str(results_data['parameters'].values[i]))
# get unique dataset / parameter / classifier combos in results_data
d_ml_p = results_data['dataset-algorithm-parameters'].unique()
self.trained_dataset_models.update(d_ml_p)
def _update_trained_dataset_models_from_rec(self, dataset_id, ml_rec,
phash_rec):
'''update the recommender's memory with the new algorithm-parameter
combos that it recommended'''
if dataset_id is not None:
# datahash = self.dataset_id_to_hash[dataset_id]
self.trained_dataset_models.update(
['|'.join([dataset_id, ml, p])
for ml, p in zip(ml_rec, phash_rec)])