class GraphPipeline(_BaseComposition):
def __init__(
self,
graph,
cross_val_predict_cv=0, #signature function(estimator, X, y=none)
method='auto',
memory=None, #TODO memory caching like sklearn.pipeline
subset_column = None,
drop_subset_column = True,
use_label_encoder=False,
**kwargs,
):
super().__init__(**kwargs)
'''
An sklearn baseestimator that uses genetic programming to optimize a pipeline.
Parameters
----------
graph: networkx.DiGraph
A directed graph where the nodes are sklearn estimators and the edges are the inputs to those estimators.
cross_val_predict_cv: int, cross-validation generator or an iterable, optional
Determines the cross-validation splitting strategy used in inner classifiers or regressors
method: str, optional
The prediction method to use for the inner classifiers or regressors. If 'auto', it will try to use predict_proba, decision_function, or predict in that order.
memory: str or object with the joblib.Memory interface, optional
Used to cache the fitted transformers of the pipeline. By default, no caching is performed. If a string is given, it is the path to the caching directory.
subset_column: int, optional
The column of X that contains the subset values. If None, all rows of X are used. If not None, only the rows of X where X[:,subset_column] is in subset_values are used.
Used to evolve pipelines where recursive graphs use different subsets of rows.
drop_subset_column: bool, optional
If True, the subset_column is dropped from X before being passed to the pipeline. If False, the subset_column is kept in X.
use_label_encoder: bool, optional
If True, the label encoder is used to encode the labels to be 0 to N. If False, the label encoder is not used.
Mainly useful for classifiers (XGBoost) that require labels to be ints from 0 to N.
Can also be a sklearn.preprocessing.LabelEncoder object. If so, that label encoder is used.
'''
self.graph = graph
self.cross_val_predict_cv = cross_val_predict_cv
self.method = method
self.memory = memory
self.subset_column = subset_column
self.drop_subset_column = drop_subset_column
self.use_label_encoder = use_label_encoder
setup_ordered_successors(graph)
self.topo_sorted_nodes = list(nx.topological_sort(self.graph))
self.topo_sorted_nodes.reverse()
self.root = self.topo_sorted_nodes[-1]
if self.use_label_encoder:
if type(self.use_label_encoder) == LabelEncoder:
self.label_encoder = self.use_label_encoder
else:
self.label_encoder = LabelEncoder()
#TODO clean this up
try:
nx.find_cycle(self.G)
raise BaseException
except:
pass
def __str__(self):
if len(self.graph.edges) > 0:
return str(self.graph.edges)
else:
return str(self.graph.nodes)
def fit(self, X, y, subset_col = None):
# if self.subset_column is not None and self.subset_values is not None:
# if isinstance(X, pd.DataFrame):
# indeces_to_keep = X[self.subset_column].isin(self._subset_values)
# X = X[indeces_to_keep]
# y = y[indeces_to_keep]
# else:
# indeces_to_keep = np.isin(X[:,self.subset_column], self._subset_values)
# X = X[indeces_to_keep]
# y = y[indeces_to_keep]
if self.use_label_encoder:
if type(self.use_label_encoder) == LabelEncoder:
y = self.label_encoder.transform(y)
else:
y = self.label_encoder.fit_transform(y)
if self.subset_column is not None:
subset_col = X[:,self.subset_column]
if self.drop_subset_column:
X = np.delete(X, self.subset_column, axis=1)
fit_sklearn_digraph( graph=self.graph,
X=X,
y=y,
method=self.method,
cross_val_predict_cv = self.cross_val_predict_cv,
memory = self.memory,
topo_sort = self.topo_sorted_nodes,
subset_col = subset_col,
)
return self
def plot(self, ):
plot(graph = self.graph)
def __sklearn_is_fitted__(self):
'''Indicate whether pipeline has been fit.'''
try:
# check if the last step of the pipeline is fitted
# we only check the last step since if the last step is fit, it
# means the previous steps should also be fit. This is faster than
# checking if every step of the pipeline is fit.
sklearn.utils.validation.check_is_fitted(self.graph.nodes[self.root]["instance"])
return True
except sklearn.exceptions.NotFittedError:
return False
@available_if(_estimator_has('predict'))
def predict(self, X, **predict_params):
if self.subset_column is not None:
subset_col = X[:,self.subset_column]
if self.drop_subset_column:
X = np.delete(X, self.subset_column, axis=1)
this_X = get_inputs_to_node(self.graph,
X,
self.root,
method = self.method,
topo_sort = self.topo_sorted_nodes,
)
preds = self.graph.nodes[self.root]["instance"].predict(this_X, **predict_params)
if self.use_label_encoder:
preds = self.label_encoder.inverse_transform(preds)
return preds
@available_if(_estimator_has('predict_proba'))
def predict_proba(self, X, **predict_params):
if self.subset_column is not None:
if self.drop_subset_column:
X = np.delete(X, self.subset_column, axis=1)
this_X = get_inputs_to_node(self.graph,
X,
self.root,
method = self.method,
topo_sort = self.topo_sorted_nodes,
)
return self.graph.nodes[self.root]["instance"].predict_proba(this_X, **predict_params)
@available_if(_estimator_has('decision_function'))
def decision_function(self, X, **predict_params):
if self.subset_column is not None:
if self.drop_subset_column:
X = np.delete(X, self.subset_column, axis=1)
this_X = get_inputs_to_node(self.graph,
X,
self.root,
method = self.method,
topo_sort = self.topo_sorted_nodes,
)
return self.graph.nodes[self.root]["instance"].decision_function(this_X, **predict_params)
@available_if(_estimator_has('transform'))
def transform(self, X, **predict_params):
if self.subset_column is not None:
if self.drop_subset_column:
X = np.delete(X, self.subset_column, axis=1)
this_X = get_inputs_to_node(self.graph,
X,
self.root,
method = self.method,
topo_sort = self.topo_sorted_nodes,
)
return self.graph.nodes[self.root]["instance"].transform(this_X, **predict_params)
@property
def classes_(self):
"""The classes labels. Only exist if the last step is a classifier."""
if self.use_label_encoder:
return self.label_encoder.classes_
else:
return self.graph.nodes[self.root]["instance"].classes_
@property
def _estimator_type(self):
return self.graph.nodes[self.root]["instance"]._estimator_type