Parallelization¶
This tutorial covers advanced setups for parallelizing TPOT2 with Dask. If you just want to parallelize TPOT2 within a single computer with multiple processes, set the n_jobs parameter to the number of threads you want to use and skip this tutorial.
TPOT2 uses Dask for parallelization and defaults to using a dask.distributed.LocalCluster for local parallelization. A user can pass in a custom Dask client or cluster for advanced usage. For example, a multi-node parallelization is possible using the dask-jobqueue package.
TPOT2 can be easily parallelized on a local computer by setting the n_jobs and memory_limit parameters.
n_jobs
dictates how many dask workers to launch. In TPOT2 this corresponds to the number of pipelines to evaluate in parallel.
memory_limit
is the amount of RAM to use per worker.
TPOT2 with Python Scripts¶
When running tpot from an .py script, it is important to protect code with if __name__=="__main__":
This is due to how parallelization is handled in Python. In short, when Python spawns new processes, each new process reimports code from the relevant .py files, including rerunning code. The context under if __name__=="__main__":
ensures the code under it only executed by the main process and only once. More info here.
import tpot2
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot2.search_spaces.pipelines.GraphPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
verbose = 2,
n_jobs=16,
memory_limit="4GB"
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
Generation: 100%|██████████| 5/5 [00:11<00:00, 2.24s/it] /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/decomposition/_fastica.py:595: UserWarning: n_components is too large: it will be set to 8 warnings.warn( /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/decomposition/_fastica.py:128: ConvergenceWarning: FastICA did not converge. Consider increasing tolerance or the maximum number of iterations. warnings.warn( /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:350: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn(
1.0
Manual Dask Clients and Dashboard¶
You can also manually initialize a dask client. This can be useful to gain additional control over the parallelization, debugging, as well as viewing a dashboard of the live performance of TPOT2.
You can find more details in the official documentation here.
Dask Python Tutorial Dask Dashboard
Note that the if a client is passed in manually, TPOT will ignore n_jobs and memory_limit. If there is no client passed in, TPOT will ignore any global/existing client and create its own.
Initializing a basic dask local cluster
from dask.distributed import Client, LocalCluster
n_jobs = 4
memory_limit = "4GB"
cluster = LocalCluster(n_workers=n_jobs, #if no client is passed in and no global client exists, create our own
threads_per_worker=1,
memory_limit=memory_limit)
client = Client(cluster)
Get the link to view the dask Dashboard.
client.dashboard_link
'http://127.0.0.1:8787/status'
graph_search_space = tpot2.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
# this is equivalent to:
# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()
Generation: 100%|██████████| 5/5 [00:13<00:00, 2.62s/it] /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:350: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn(
1.0
Option 2
You can initialize the cluster and client with a context manager that will automatically close them.
from dask.distributed import Client, LocalCluster
import tpot2
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
n_jobs = 4
memory_limit = "4GB"
with LocalCluster(
n_workers=n_jobs,
threads_per_worker=1,
memory_limit='4GB',
) as cluster, Client(cluster) as client:
graph_search_space = tpot2.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 5,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
Generation: 100%|██████████| 5/5 [00:16<00:00, 3.33s/it] /home/ribeirop/miniconda3/envs/tpot2env/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:350: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn(
1.0
Dask multi node parallelization on HPC¶
Dask can parallelize across multiple nodes via job queueing systems. This is done using the Dask-Jobqueue package. More information can be found in the official documentation here.
To parallelize TPOT2 with Dask-Jobqueue, simply pass in a client based on a Jobqueue cluster with desired settings into the client parameter. Each job will evaluate a single pipeline.
Note that TPOT will ignore n_jobs and memory_limit as these should be set inside the Dask cluster.
The following example is specific to the Sun Grid Engine. Other supported clusters can be found in the Dask-Jobqueue documentation here
from dask.distributed import Client, LocalCluster
import sklearn
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import tpot2
from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler.
import os
if os.system("which qsub") != 0:
print("Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.")
else:
print("Sun Grid Engine is installed.")
cluster = SGECluster(
queue='all.q',
cores=2,
memory="50 GB"
)
cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs
client = Client(cluster)
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot2.search_spaces.pipelines.GraphPipeline(
root_search_space= tpot2.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot2.config.get_search_space("selectors"),
inner_search_space = tpot2.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot2.TPOTEstimator(
client = client,
scorers = ["roc_auc"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
# this is equivalent to:
# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()