"""~This file is part of the Aliro library~
Copyright (C) 2023 Epistasis Lab,
Center for Artificial Intelligence Research and Education (CAIRE),
Department of Computational Biomedicine (CBM),
Cedars-Sinai Medical Center.
Aliro is maintained by:
- Hyunjun Choi (hyunjun.choi@cshs.org)
- Miguel Hernandez (miguel.e.hernandez@cshs.org)
- Nick Matsumoto (nicholas.matsumoto@cshs.org)
- Jay Moran (jay.moran@cshs.org)
- and many other generous open source contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
(Autogenerated header, do not modify)
"""
# Recommender system for Aliro.
import pandas as pd
from pandas.util import hash_pandas_object
import hashlib
import os
import gzip
import pickle
import copy
# import json
# import urllib.request, urllib.parse
from .base import BaseRecommender
#from ..metalearning import get_metafeatures
# from sklearn.preprocessing import RobustScaler
# from sklearn.pipeline import Pipeline
import numpy as np
from collections import defaultdict, OrderedDict
import pdb
from surprise import (Reader, Dataset, CoClustering, SlopeOne, KNNWithMeans,
KNNBasic, mySVD)
# import pyximport
# pyximport.install()
# from .svdedit import mySVD
from collections import defaultdict
import itertools as it
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(module)s: %(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
[docs]class SurpriseRecommender(BaseRecommender):
"""Class to support generic recommenders from the Surprise library.
Not intended to be used as a standalone class.
Parameters
----------
ml_type: str, 'classifier' or 'regressor'
Recommending classifiers or regressors. Used to determine ML options.
metric: str (default: accuracy for classifiers, mse for regressors)
The metric by which to assess performance on the datasets.
"""
def __init__(self,
ml_type='classifier',
metric=None,
ml_p=None,
random_state=None,
knowledgebase_results=None,
knowledgebase_metafeatures=None,
load_serialized_rec="if_exists",
serialized_rec_directory=None,
serialized_rec_filename=None):
""" set default recommender specific parameters; might be overwritten by loading serialized recommender"""
if self.__class__.__name__ == 'SurpriseRecommender':
raise RuntimeError('Do not instantiate the SurpriseRecommender class '
'directly; use one of the method-specific classes instead.')
self.set_algo()
self.random_state = random_state
if hasattr(self.algo, 'random_state'):
self.algo.random_state = self.random_state
# store results
self.results_df = pd.DataFrame()
self.first_fit = True
# reader for translating btw Aliro results and Suprise training set
self.reader = Reader(rating_scale=(0,1))
self.ml_type = ml_type
if metric is None:
logger.warning('metric is None, setting...')
self.metric='bal_accuracy' if self.ml_type=='classifier' else 'mse'
else:
self.metric = metric
assert(self.metric is not None)
logger.info('initlizing SurpriseRecommender')
logger.info('self.algo_name: '+self.algo_name)
logger.info('ml_type: '+self.ml_type)
logger.info('metric: '+self.metric)
self.min_epochs = 10
self.max_epochs = 100
"""Initialize recommendation system."""
super().__init__(
ml_type,
metric,
ml_p,
serialized_rec_directory=serialized_rec_directory,
serialized_rec_filename=serialized_rec_filename,
load_serialized_rec=load_serialized_rec,
knowledgebase_results=knowledgebase_results,
random_state=random_state)
@property
def algo_name(self):
if type(self.algo).__name__ is None:
return type(self).__name__
return type(self.algo).__name__
def _reconstruct_training_data(self, results_data, results_mf=None,
source='pennai'):
"""Used for loading pickled recomenders to set results_df
without training.
:param results_data: DataFrame with columns corresponding to:
'dataset'
'algorithm'
'parameters'
self.metric
:param results_mf: metafeatures for the datasets in results_data
:param source: str, optional (default: 'pennai')
if 'pennai', will update tally of trained dataset models
"""
# update trained dataset models and hash table
super().update(results_data, results_mf, source)
print('results_data', results_data)
print('results_mf', results_mf)
print('source', source)
# updates self.results_df and self.trainset
self._update_training_data(results_data, shuffle=True)
# check whether the set train data matches the pickled recommender's
# training data.
rowHashes = hash_pandas_object(self.results_df).values
# test
data = [10,20,30,40,50,60]
df = pd.DataFrame(data, columns=['Numbers'])
test_df=hash_pandas_object(df).values
print('test_df', test_df)
print('self.results_df', self.results_df)
print('rowHashes', rowHashes)
newHash = hashlib.sha256(rowHashes).hexdigest()
# for rowHash in rowHashes:
# print('rowHash: ', rowHash)
print('newHash', newHash)
# test
test_newHash = hashlib.sha256(b"Nobody inspects the spammish repetition").hexdigest()
print('test_newHash: ', test_newHash)
# temporary fix for pickled recommender's not having a hash
hasattr(self, 'results_df_hash')
print('self.results_df_hash', self.results_df_hash)
print('newHash == self.results_df_hash',newHash == self.results_df_hash)
if hasattr(self, 'results_df_hash'):
if newHash == self.results_df_hash:
logger.info('results_df hashes match')
else:
error_msg = 'the results_df hash from the pickle is different'
logger.error(error_msg)
raise ValueError(error_msg)
del self.results_df_hash
[docs] def load(self, filename=None, knowledgebase = None):
"""Load a saved recommender state."""
if knowledgebase is None:
logger.warning('A knowledgebase needs to be provided to load '
'Surprise Recommenders from file. Not loading.')
return
loaded = super().load(filename=filename)
if loaded:
logger.info('setting training data...')
self._reconstruct_training_data(knowledgebase,
source='knowledgebase')
[docs] def update(self, results_data, results_mf=None, source='pennai'):
"""Update ML / Parameter recommendations based on overall performance in
results_data.
:param results_data: DataFrame with columns corresponding to:
'dataset'
'algorithm'
'parameters'
self.metric
:param results_mf: metafeatures for the datasets in results_data
"""
# update trained dataset models and hash table
super().update(results_data, results_mf, source)
# update internal model
self._update_model(results_data)
def _update_training_data(self, results_data, shuffle=False):
"""Appends results_data to self.results_df. Sets the trainset for
the surprise recommender.
:param results_data: DataFrame with columns corresponding to:
'dataset'
'algorithm'
'parameters'
self.metric
:param shuffle: boolean, optional (default: False)
If true, results_data is shuffled before it is added to
self.results_df or self.trainset.
"""
if shuffle:
# shuffle the results data
logger.debug('shuffling results_data')
results_data = results_data.sample(frac=1,
random_state=self.random_state)
results_data.loc[:, 'algorithm-parameters'] = (
results_data['algorithm'].values + '|' +
results_data['parameter_hash'].values)
results_data.rename(columns={self.metric:'score'},inplace=True)
logger.info('append and drop dupes')
self.results_df = self.results_df.append(
results_data[['algorithm-parameters','_id','score']]
).drop_duplicates()
logger.info('load_from_df')
data = Dataset.load_from_df(self.results_df[['_id',
'algorithm-parameters',
'score']],
self.reader)
# build training set from the data
self.trainset = data.build_full_trainset()
logger.debug('self.trainset # of ML-P combos: ' +
str(self.trainset.n_items))
logger.debug('self.trainset # of datasets: '
+ str(self.trainset.n_users))
def _update_model(self,results_data):
"""Stores new results and updates algo."""
logger.debug('updating '+self.algo_name+' model')
self._update_training_data(results_data, self.first_fit)
self.first_fit=False
logger.debug('fitting self.algo...')
# set the number of training iterations proportionally to the amount of
# results_data
self.algo.fit(self.trainset)
logger.debug('done.')
logger.debug('model '+self.algo_name+' updated')
[docs] def recommend(self, dataset_id, n_recs=1, dataset_mf = None):
"""Return a model and parameter values expected to do best on dataset.
Parameters
----------
dataset_id: string
ID of the dataset for which the recommender is generating
recommendations.
n_recs: int (default: 1), optional
Return a list of length n_recs in order of estimators and
parameters expected to do
best.
"""
# dataset hash table
super().recommend(dataset_id, n_recs, dataset_mf)
# dataset_hash = self.dataset_id_to_hash[dataset_id]
try:
predictions = []
filtered =0
for alg_params in self.mlp_combos:
if (dataset_id+'|'+alg_params not in
self.trained_dataset_models):
predictions.append(self.algo.predict(dataset_id, alg_params,
clip=False))
else:
filtered +=1
logger.debug('filtered '+ str(filtered) + ' recommendations')
logger.debug('getting top n predictions')
ml_rec, phash_rec, score_rec = self._get_top_n(predictions, n_recs)
logger.debug('returning ml recs')
except Exception as e:
logger.error( 'error running self.best_model_prediction for'+
str(dataset_id))
raise e
# update the recommender's memory with the new algorithm-parameter combos
# that it recommended
self._update_trained_dataset_models_from_rec(dataset_id,
ml_rec, phash_rec)
p_rec = [self.hash_2_param[ph] for ph in phash_rec]
return ml_rec, p_rec, score_rec
def _get_top_n(self,predictions, n=10):
'''Return the top-N recommendation for each user from a set of predictions.
Args:
predictions(list of Prediction objects): The list of predictions, as
returned by the test method of an algorithm.
n(int): The number of recommendation to output for each user. Default
is 10.
Returns:
ml recs, parameter recs, and their scores in three lists
'''
# grabs the ml ids and their estimated scores for this dataset
top_n = []
ml_dist = {}
for uid, iid, true_r, est, _ in predictions:
top_n.append((iid, est))
ml = iid.split('|')[0]
if ml in ml_dist.keys():
ml_dist[ml] += 1.0
else:
ml_dist[ml] = 1.0
n_ml = len(ml_dist.keys())
######
# Shuffle top_n just to remove tied algorithm bias when sorting
# Make uniform random choices from the Algorithms, then uniform random
# choices from their parameters to shuffle top_n
# the probability for each ML method is 1/total_methods/(# instances of that
# method)
inv_ml_dist = {k:1/n_ml/v for k,v in ml_dist.items()}
top_n_dist = np.array([inv_ml_dist[tn[0].split('|')[0]]
for tn in top_n])
top_n_idx = np.arange(len(top_n))
top_n_idx_s = np.random.choice(top_n_idx, len(top_n), replace=False,
p=top_n_dist)
top_n = [top_n[i] for i in top_n_idx_s]
#####
# sort top_n
top_n = sorted(top_n, key=lambda x: x[1], reverse=True)
top_n = top_n[:n]
logger.debug('filtered top_n:'+str(top_n))
ml_rec = [n[0].split('|')[0] for n in top_n]
p_rec = [n[0].split('|')[1] for n in top_n]
score_rec = [n[1] for n in top_n]
return ml_rec, p_rec, score_rec
[docs]class CoClusteringRecommender(SurpriseRecommender):
"""Generates recommendations via CoClustering, see
https://surprise.readthedocs.io/en/stable/co_clustering.html
"""
def set_algo(self):
self.algo = CoClustering(n_cltr_u = 10)
# def __init__(self, ml_type='classifier', metric=None, ml_p=None,
# algo=None):
# super().__init__(ml_type, metric, ml_p, algo)
# # set n clusters for ML equal to # of ML methods
# self.
def _update_model(self,results_data):
"""Stores new results and updates algo."""
self.algo.n_cltr_i = self.ml_p.algorithm.nunique()
super()._update_model(results_data)
[docs]class KNNWithMeansRecommender(SurpriseRecommender):
"""Generates recommendations via KNNWithMeans, see
https://surprise.readthedocs.io/en/stable/knn_inspired.html
"""
def set_algo(self):
self.algo = KNNWithMeans()
[docs]class KNNDatasetRecommender(SurpriseRecommender):
"""Generates recommendations via KNN with clusters defined over datasets, see
https://surprise.readthedocs.io/en/stable/knn_inspired.html
"""
def set_algo(self):
self.algo = KNNBasic(sim_options={'user_based':True})
@property
def algo_name(self):
return 'KNN-Dataset'
[docs]class KNNMLRecommender(SurpriseRecommender):
"""Generates recommendations via KNN with clusters defined over algorithms, see
https://surprise.readthedocs.io/en/stable/knn_inspired.html
"""
def set_algo(self):
self.algo = KNNBasic(sim_options={'user_based':False})
@property
def algo_name(self):
return 'KNN-ML'
[docs]class SlopeOneRecommender(SurpriseRecommender):
"""Generates recommendations via SlopeOne, see
https://surprise.readthedocs.io/en/stable/slope_one.html
"""
def set_algo(self):
self.algo = SlopeOne()
[docs]class SVDRecommender(SurpriseRecommender):
"""SVD recommender.
see https://surprise.readthedocs.io/en/stable/matrix_factorization.html
Recommends machine learning algorithms and parameters using the SVD algorithm.
- stores ML + P and every dataset.
- learns a matrix factorization on the non-missing data.
- given a dataset, estimates the rankings of all ML+P and returns the top n_recs.
Note that we use a custom online version of SVD found here:
https://github.com/lacava/surprise
"""
def set_algo(self, surprise_kwargs={}):
alg_kwargs = {'n_factors':20,
'biased':True,
'init_mean':0,
'init_std_dev':.2,
'lr_all':.01,
'reg_all':.02,
'verbose':False}
alg_kwargs.update(surprise_kwargs)
self.algo = mySVD(**alg_kwargs)
# def __init__(self, ml_type='classifier', metric=None, ml_p=None,
# filename=None, knowledgebase=None, random_state=None,
# surprise_kwargs={}):
# super().__init__(ml_type=ml_type, metric=metric, ml_p=ml_p,
# filename=filename, knowledgebase=knowledgebase,
# random_state=random_state)
def _update_model(self,results_data):
"""Stores new results and updates SVD."""
logger.info('updating SVD model')
# shuffle the results data the first time
if self.first_fit:
logger.debug('shuffling results_data')
results_data = results_data.sample(frac=1,
random_state=self.random_state)
self.first_fit=False
self._update_training_data(results_data)
# set the number of training iterations proportionally to the amount of
# results_data
logger.info('algo random_state: '+str(self.algo.random_state))
self.algo.n_epochs = min(len(results_data),self.max_epochs)
self.algo.n_epochs = max(self.algo.n_epochs,self.min_epochs)
logger.info('fitting self.algo...')
self.algo.partial_fit(self.trainset)
logger.info('done.')
logger.info('model SVD updated')