Skip to content

Dynamicunion

This file is part of the TPOT library.

The current version of TPOT was developed at Cedars-Sinai by: - Pedro Henrique Ribeiro (https://github.com/perib, https://www.linkedin.com/in/pedro-ribeiro/) - Anil Saini (anil.saini@cshs.org) - Jose Hernandez (jgh9094@gmail.com) - Jay Moran (jay.moran@cshs.org) - Nicholas Matsumoto (nicholas.matsumoto@cshs.org) - Hyunjun Choi (hyunjun.choi@cshs.org) - Gabriel Ketron (gabriel.ketron@cshs.org) - Miguel E. Hernandez (miguel.e.hernandez@cshs.org) - Jason Moore (moorejh28@gmail.com)

The original version of TPOT was primarily developed at the University of Pennsylvania by: - Randal S. Olson (rso@randalolson.com) - Weixuan Fu (weixuanf@upenn.edu) - Daniel Angell (dpa34@drexel.edu) - Jason Moore (moorejh28@gmail.com) - and many more generous open-source contributors

TPOT is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

TPOT is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License along with TPOT. If not, see http://www.gnu.org/licenses/.

DynamicUnionPipeline

Bases: SearchSpace

Source code in tpot/search_spaces/pipelines/dynamicunion.py
class DynamicUnionPipeline(SearchSpace):
    def __init__(self, search_space : SearchSpace, max_estimators=None, allow_repeats=False ) -> None:
        """
        Takes in a list of search spaces. will produce a pipeline of Sequential length. Each step in the pipeline will correspond to the the search space provided in the same index.
        """

        self.search_space = search_space
        self.max_estimators = max_estimators
        self.allow_repeats = allow_repeats

    def generate(self, rng=None):
        rng = np.random.default_rng(rng)
        return DynamicUnionPipelineIndividual(self.search_space, max_estimators=self.max_estimators, allow_repeats=self.allow_repeats, rng=rng)

__init__(search_space, max_estimators=None, allow_repeats=False)

Takes in a list of search spaces. will produce a pipeline of Sequential length. Each step in the pipeline will correspond to the the search space provided in the same index.

Source code in tpot/search_spaces/pipelines/dynamicunion.py
def __init__(self, search_space : SearchSpace, max_estimators=None, allow_repeats=False ) -> None:
    """
    Takes in a list of search spaces. will produce a pipeline of Sequential length. Each step in the pipeline will correspond to the the search space provided in the same index.
    """

    self.search_space = search_space
    self.max_estimators = max_estimators
    self.allow_repeats = allow_repeats

DynamicUnionPipelineIndividual

Bases: SklearnIndividual

Takes in one search space. Will produce a FeatureUnion of up to max_estimators number of steps. The output of the FeatureUnion will the all of the steps concatenated together.

Source code in tpot/search_spaces/pipelines/dynamicunion.py
class DynamicUnionPipelineIndividual(SklearnIndividual):
    """
    Takes in one search space.
    Will produce a FeatureUnion of up to max_estimators number of steps.
    The output of the FeatureUnion will the all of the steps concatenated together.

    """

    def __init__(self, search_space : SearchSpace, max_estimators=None, allow_repeats=False, rng=None) -> None:
        super().__init__()
        self.search_space = search_space

        if max_estimators is None:
            self.max_estimators = np.inf
        else:
            self.max_estimators = max_estimators

        self.allow_repeats = allow_repeats

        self.union_dict = {}

        if self.max_estimators == np.inf:
            init_max = 3
        else:
            init_max = self.max_estimators

        rng = np.random.default_rng(rng)

        for _ in range(rng.integers(1, init_max)):
            self._mutate_add_step(rng)


    def mutate(self, rng=None):
        rng = np.random.default_rng(rng)
        mutation_funcs = [self._mutate_add_step, self._mutate_remove_step, self._mutate_replace_step, self._mutate_note]
        rng.shuffle(mutation_funcs)
        for mutation_func in mutation_funcs:
            if mutation_func(rng):
                return True

    def _mutate_add_step(self, rng):
        rng = np.random.default_rng(rng)
        max_attempts = 10
        if len(self.union_dict) < self.max_estimators:
            for _ in range(max_attempts):
                new_step = self.search_space.generate(rng)
                if new_step.unique_id() not in self.union_dict:
                    self.union_dict[new_step.unique_id()] = new_step
                    return True
        return False

    def _mutate_remove_step(self, rng):
        rng = np.random.default_rng(rng)
        if len(self.union_dict) > 1:
            self.union_dict.pop( rng.choice(list(self.union_dict.keys())))  
            return True
        return False

    def _mutate_replace_step(self, rng):
        rng = np.random.default_rng(rng)        
        changed = self._mutate_remove_step(rng) or self._mutate_add_step(rng)
        return changed

    #TODO mutate one step or multiple?
    def _mutate_note(self, rng):
        rng = np.random.default_rng(rng)
        changed = False
        values = list(self.union_dict.values())
        for step in values:
            if rng.random() < 0.5:
                changed = step.mutate(rng) or changed

        self.union_dict = {step.unique_id(): step for step in values}

        return changed


    def crossover(self, other, rng=None):
        rng = np.random.default_rng(rng)

        cx_funcs = [self._crossover_swap_multiple_nodes, self._crossover_node]
        rng.shuffle(cx_funcs)
        for cx_func in cx_funcs:
            if cx_func(other, rng):
                return True

        return False


    def _crossover_swap_multiple_nodes(self, other, rng):
        rng = np.random.default_rng(rng)
        self_values = list(self.union_dict.values())
        other_values = list(other.union_dict.values())

        rng.shuffle(self_values)
        rng.shuffle(other_values)

        self_idx = rng.integers(0,len(self_values))
        other_idx = rng.integers(0,len(other_values))

        #Note that this is not one-point-crossover since the sequence doesn't matter. this is just a quick way to swap multiple random items
        self_values[:self_idx], other_values[:other_idx] = other_values[:other_idx], self_values[:self_idx]

        self.union_dict = {step.unique_id(): step for step in self_values}
        other.union_dict = {step.unique_id(): step for step in other_values}

        return True


    def _crossover_node(self, other, rng):
        rng = np.random.default_rng(rng)

        changed = False
        self_values = list(self.union_dict.values())
        other_values = list(other.union_dict.values())

        rng.shuffle(self_values)
        rng.shuffle(other_values)

        for self_step, other_step in zip(self_values, other_values):
            if rng.random() < 0.5:
                changed = self_step.crossover(other_step, rng) or changed

        self.union_dict = {step.unique_id(): step for step in self_values}
        other.union_dict = {step.unique_id(): step for step in other_values}

        return changed

    def export_pipeline(self, **kwargs):
        values = list(self.union_dict.values())
        return sklearn.pipeline.make_union(*[step.export_pipeline(**kwargs) for step in values])

    def unique_id(self):
        values = list(self.union_dict.values())
        l = [step.unique_id() for step in values]
        # if all items are strings, then sort them
        if all([isinstance(x, str) for x in l]):
            l.sort()
        l = ["FeatureUnion"] + l
        return TupleIndex(frozenset(l))