Parallelization¶
This tutorial covers advanced setups for parallelizing TPOT with Dask. If you just want to parallelize TPOT within a single computer with multiple processes, set the n_jobs parameter to the number of threads you want to use and skip this tutorial.
TPOT uses Dask for parallelization and defaults to using a dask.distributed.LocalCluster for local parallelization. A user can pass in a custom Dask client or cluster for advanced usage. For example, a multi-node parallelization is possible using the dask-jobqueue package.
TPOT can be easily parallelized on a local computer by setting the n_jobs and memory_limit parameters.
n_jobs
dictates how many dask workers to launch. In TPOT this corresponds to the number of pipelines to evaluate in parallel.
memory_limit
is the amount of RAM to use per worker.
TPOT with Python Scripts¶
When running tpot from an .py script, it is important to protect code with if __name__=="__main__":
This is due to how parallelization is handled in Python. In short, when Python spawns new processes, each new process reimports code from the relevant .py files, including rerunning code. The context under if __name__=="__main__":
ensures the code under it only executed by the main process and only once. More info here.
import tpot
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot.config.get_search_space("selectors"),
inner_search_space = tpot.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot.TPOTEstimator(
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
verbose = 2,
n_jobs=16,
memory_limit="4GB"
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
/opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html from .autonotebook import tqdm as notebook_tqdm Generation: : 8it [01:00, 7.57s/it] /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/decomposition/_fastica.py:595: UserWarning: n_components is too large: it will be set to 4 warnings.warn( /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:349: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn(
0.997905982905983
Manual Dask Clients and Dashboard¶
You can also manually initialize a dask client. This can be useful to gain additional control over the parallelization, debugging, as well as viewing a dashboard of the live performance of TPOT.
You can find more details in the official documentation here.
Dask Python Tutorial Dask Dashboard
Note that the if a client is passed in manually, TPOT will ignore n_jobs and memory_limit. If there is no client passed in, TPOT will ignore any global/existing client and create its own.
Initializing a basic dask local cluster
from dask.distributed import Client, LocalCluster
n_jobs = 4
memory_limit = "4GB"
cluster = LocalCluster(n_workers=n_jobs, #if no client is passed in and no global client exists, create our own
threads_per_worker=1,
memory_limit=memory_limit)
client = Client(cluster)
Get the link to view the dask Dashboard.
client.dashboard_link
'http://127.0.0.1:8787/status'
graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot.config.get_search_space("selectors"),
inner_search_space = tpot.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
# this is equivalent to:
# est = tpot.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()
Generation: : 8it [01:01, 7.69s/it] /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:349: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn( 2025-02-21 16:37:55,843 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('eval_objective_list-8bdcf8a1c1f54374fc47664011238a6d')" coro=<Worker.execute() done, defined at /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/distributed/worker_state_machine.py:3607>> ended with CancelledError
0.997905982905983
Option 2
You can initialize the cluster and client with a context manager that will automatically close them.
from dask.distributed import Client, LocalCluster
import tpot
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
n_jobs = 4
memory_limit = "4GB"
with LocalCluster(
n_workers=n_jobs,
threads_per_worker=1,
memory_limit='4GB',
) as cluster, Client(cluster) as client:
graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot.config.get_search_space("selectors"),
inner_search_space = tpot.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot.TPOTEstimator(
client = client,
scorers = ["roc_auc_ovr"],
scorers_weights = [1],
classification = True,
cv = 5,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
Generation: : 10it [01:00, 6.07s/it] /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/sklearn/linear_model/_sag.py:349: ConvergenceWarning: The max_iter was reached which means the coef_ did not converge warnings.warn( 2025-02-21 16:38:57,976 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('eval_objective_list-87c6eded7038f6c8291a3ee9879aef3f')" coro=<Worker.execute() done, defined at /opt/anaconda3/envs/tpotenv/lib/python3.10/site-packages/distributed/worker_state_machine.py:3607>> ended with CancelledError
1.0
2025-02-21 16:39:01,975 - distributed.nanny - WARNING - Worker process still alive after 4.0 seconds, killing
Dask multi node parallelization on HPC¶
Dask can parallelize across multiple nodes via job queueing systems. This is done using the Dask-Jobqueue package. More information can be found in the official documentation here.
To parallelize TPOT with Dask-Jobqueue, simply pass in a client based on a Jobqueue cluster with desired settings into the client parameter. Each job will evaluate a single pipeline.
Note that TPOT will ignore n_jobs and memory_limit as these should be set inside the Dask cluster.
The following example is specific to the Sun Grid Engine. Other supported clusters can be found in the Dask-Jobqueue documentation here
from dask.distributed import Client, LocalCluster
import sklearn
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import tpot
from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler.
import os
if os.system("which qsub") != 0:
print("Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.")
else:
print("Sun Grid Engine is installed.")
cluster = SGECluster(
queue='all.q',
cores=2,
memory="50 GB"
)
cluster.adapt(minimum_jobs=10, maximum_jobs=100) # auto-scale between 10 and 100 jobs
client = Client(cluster)
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)
graph_search_space = tpot.search_spaces.pipelines.GraphSearchPipeline(
root_search_space= tpot.config.get_search_space(["KNeighborsClassifier", "LogisticRegression", "DecisionTreeClassifier"]),
leaf_search_space = tpot.config.get_search_space("selectors"),
inner_search_space = tpot.config.get_search_space(["transformers"]),
max_size = 10,
)
est = tpot.TPOTEstimator(
client = client,
scorers = ["roc_auc"],
scorers_weights = [1],
classification = True,
cv = 10,
search_space = graph_search_space,
max_time_mins = 60,
early_stop=10,
verbose = 2,
)
est.fit(X_train, y_train)
# this is equivalent to:
# est = tpot.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))
#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()
Sun Grid Engine is not installed. This example requires Sun Grid Engine to be installed.