Source code for pypop7.optimizers.bo.lamcts

from typing import NamedTuple
from itertools import count
from collections import deque
import time

import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.svm import SVC

from pypop7.optimizers.bo.bo import BO
from pypop7.optimizers.es.cmaes import CMAES


class Sample(NamedTuple):  # basic data class for classification
    x: np.ndarray = np.empty(0)
    y: float = float('NaN')


def _remove_duplicate(x):  # for class Bag
    u, idx = np.unique(x, return_index=True, axis=0)
    return u.astype(dtype=float), idx


class Bag(object):  # basic data class for classification
    def __init__(self, x, y=np.empty(0)):
        self._ndim = x.shape[1] if isinstance(x, np.ndarray) else x
        if isinstance(x, np.ndarray):
            self.x, i = _remove_duplicate(x)
        else:
            self.x, i = np.empty((0, self._ndim)), None
        if y.size > 0:
            self.y = y.copy() if i is None else y[i]
        else:
            self.y = np.empty(0)
        self.best, self.mean = None, float('NaN')
        self._update_best()

    def __len__(self):
        return len(self.x)

    def _update_best(self):
        if self.y.size == 0:
            return
        self.mean, best = np.mean(self.y), np.argmin(self.y)
        if (self.best is None) or (self.y[best] < self.best.y):
            self.best = Sample(self.x[best], self.y[best])

    def extend(self, other):
        size_bak = len(self.x)
        self.x, idx = _remove_duplicate(np.concatenate((self.x, other.x)))
        if len(self.x) == size_bak:  # without duplicate
            return False
        if len(self.y) == size_bak:
            if other.y.size > 0:
                self.y = np.concatenate((self.y, other.y))[idx]
            elif len(self.y) > 0:
                self.y = np.concatenate((self.y, np.full((len(self.x) - len(self.y),), float('inf'))))
            self._update_best()
        return True


class KMSVM(object):  # classification class based on K-Means and SVM
    def __init__(self, lb, ub, rng):
        self._lb, self._ub = lb, ub
        self._min_lb, self._max_ub = np.min(lb), np.max(ub)
        self._diff = (self._max_ub - self._min_lb)/2.0
        self._center = (self._max_ub + self._min_lb)/2.0
        self._scaler = StandardScaler(with_mean=False, with_std=False)
        self._kmeans = KMeans(n_clusters=2, random_state=rng.integers(2**32))
        self._svm = SVC(kernel='rbf', gamma='auto', max_iter=100000,
                        random_state=rng.integers(2**32))
        self._is_fitted = False

    def _learn_labels(self, bag):
        std = bag.y.std()
        std = 1.0 if std == 0.0 else std
        y = ((bag.y - bag.y.mean())/std)*self._diff + self._center
        return self._kmeans.fit_predict(np.concatenate((bag.x, y.reshape(-1, 1)), axis=1))

    def classify(self, bag):
        labels = self._learn_labels(bag)
        if len(np.unique(labels)) <= 1:
            return np.array([], dtype=np.int)
        self._scaler.fit(bag.x)
        x = self._scaler.transform(bag.x)
        self._svm.fit(x, labels)
        self._is_fitted = self._svm.fit_status_ == 0
        if self._is_fitted:
            return (self._svm.predict(x) >= 0.5).astype(int)
        return np.array([], dtype=np.int)

    def predict(self, x):
        if self._is_fitted:
            return (self._svm.predict(self._scaler.transform(x)) >= 0.5).astype(int)
        else:
            return np.full(len(x), 1, dtype=int)


class Node:  # basic class for recursive decomposition
    _next_id, _all_nodes, _all_leaves = count(), {}, set()

    @staticmethod
    def build_tree(root):
        queue = deque()
        queue.append(root)
        while len(queue) > 0:
            node = queue.popleft()
            for child in node.children:
                if child.is_leaf:
                    Node._all_leaves.remove(child._id)
                else:
                    queue.append(child)
                del Node._all_nodes[child._id]
            node.clear_children()
        queue = deque()
        queue.append(root)
        while len(queue) > 0:
            node = queue.popleft()
            Node._all_nodes[node._id] = node
            children = node.split()
            if len(children) == 0:
                Node._all_leaves.add(node._id)
            else:
                for child in children:
                    queue.append(child)
        return Node._all_nodes[root._id]

    @property
    def best(self):
        return self._bag.best

    @property
    def children(self):
        return [Node._all_nodes[n] for n in self._children]

    @property
    def is_leaf(self):
        return len(self._children) == 0

    @property
    def mean(self):
        return self._bag.mean

    def n_d(self):
        c = len(self._children)
        for n in self._children:
            c += Node._all_nodes[n].n_d()
        return c

    @property
    def parent(self):
        return None if self._parent < 0 else Node._all_nodes[self._parent]

    def _update_cb(self):
        self.cb = self._bag.best.y
        if self._parent >= 0:
            parent = Node._all_nodes[self._parent]
            n_p, n_j = len(parent._bag.y), len(self._bag.y)
            self.cb -= 2.0*self.c_e*np.sqrt(2.0*np.power(n_p, 0.5)/n_j)

    def __init__(self, ndim, leaf_size, c_e, bag, label=-1, parent=-1, lb=None, ub=None, rng=None):
        self._ndim, self._leaf_size, self.c_e = ndim, leaf_size, c_e
        self._lb, self._ub = lb, ub
        self._label, self._parent = label, parent
        self._classifier, self.rng = KMSVM(lb, ub, rng), rng
        self._children = []
        self._bag = bag
        self.cb = float('NaN')
        self._update_cb()
        self._id = next(Node._next_id)
        Node._all_nodes[self._id] = self

    def add_bag(self, bag):
        if not self._bag.extend(bag):
            return
        self._update_cb()
        if self._parent >= 0:
            Node._all_nodes[self._parent].add_bag(bag)

    def classify(self):
        s = []
        if len(self._bag) >= self._leaf_size:
            labels = self._classifier.classify(self._bag)
            unique_labels = np.unique(labels)
            if len(unique_labels) > 1:
                for label in unique_labels:
                    choice = label == labels
                    if choice.sum() < 3:
                        s.clear()
                        break
                    s.append((label, choice))
        return s

    def clear_children(self):
        self._children.clear()

    def path_from_root(self):
        nodes, node = [self], self.parent
        while node is not None:
            nodes.append(node)
            node = node.parent
        nodes.reverse()
        return nodes

    def split(self, s=None):
        self._children.clear()
        s = self.classify() if s is None else s
        children = []
        for label, choice in s:
            bag = Bag(self._bag.x[choice], self._bag.y[choice])
            child = Node(self._ndim, self._leaf_size, self.c_e, bag, label, self._id,
                         self._lb, self._ub, self.rng)
            children.append(child)
            self._children.append(child._id)
        return children

    def sort_leaves(self, nodes):
        if self.is_leaf:
            nodes.append(self)
        else:
            s = sorted(self.children, key=lambda c: (c.cb, c.mean, c.best.y), reverse=False)
            for node in s:
                node.sort_leaves(nodes)


class Path:  # basic class for recursive decomposition
    def __init__(self, nodes):
        self._path = nodes

    def __len__(self):
        return len(self._path)

    def __getitem__(self, i):
        return self._path[i]

    def expand(self):
        last = self._path[-1]
        while not last.is_leaf:
            last = sorted(last.children, key=lambda c: (c.cb, c.mean, c.best.y), reverse=False)[0]
            self._path.append(last)


class Sampler(object):  # for sampling in nodes
    def __init__(self, problem, func, rng, ev):
        self.problem, self._ndim = problem, problem['ndim_problem']
        self.problem['fitness_function'] = func
        self.lb, self.ub = problem['lower_boundary'], problem['upper_boundary']
        self.cmaes, self.rng, self.fitness = None, rng, []
        self.ev = ev

    def sample(self, n_samples, path=None):
        if path is None or len(path) < 2:
            x = (self.ub + self.lb)/2.0
            sigma = np.max(self.ub - self.lb)/6.0
        else:
            bag = path[-1]._bag
            x, x_std = np.mean(bag.x, axis=0), np.std(bag.x, axis=0) 
            sigma = (np.mean(x_std) + 3.0*np.std(x_std))/3.0
            sigma = np.max(self.ub - self.lb)/6.0 if sigma == 0.0 else sigma
        bags = Bag(self._ndim)
        options = {'mean': x, 'sigma': sigma, 'n_individuals': 4,
                   'seed_rng': self.rng.integers(np.iinfo(np.int64).max),
                   'saving_fitness': 1, 'verbose': False, 'max_function_evaluations':
                       self.ev.max_function_evaluations - self.ev.n_function_evaluations}
        self.cmaes = CMAES(self.problem, options)
        self.cmaes.start_time = time.time()
        x, mean, p_s, p_c, cm, e_ve, e_va, y, d = self.cmaes.initialize()
        while len(bags) < n_samples:
            x, y, d = self.cmaes.iterate(x, mean, e_ve, e_va, y, d)
            self.fitness.extend(y)
            if self.ev._check_terminations():
                return bags
            self.cmaes._n_generations += 1
            mean, p_s, p_c, cm, e_ve, e_va = self.cmaes.update_distribution(
                x, p_s, p_c, cm, e_ve, e_va, y, d)
            bags.extend(Bag(np.copy(x), np.copy(y)))
        return bags


[docs]class LAMCTS(BO): """Latent Action Monte Carlo Tree Search (LAMCTS). Parameters ---------- problem : dict problem arguments with the following common settings (`keys`): * 'fitness_function' - objective function to be **minimized** (`func`), * 'ndim_problem' - number of dimensionality (`int`), * 'upper_boundary' - upper boundary of search range (`array_like`), * 'lower_boundary' - lower boundary of search range (`array_like`). options : dict optimizer options with the following common settings (`keys`): * 'max_function_evaluations' - maximum of function evaluations (`int`, default: `np.Inf`), * 'max_runtime' - maximal runtime to be allowed (`float`, default: `np.Inf`), * 'seed_rng' - seed for random number generation needed to be *explicitly* set (`int`); and with the following particular settings (`keys`): * 'n_individuals' - number of individuals/samples (`int`, default: `100`), * 'c_e' - factor to control exploration (`float`, default: `0.01`), * 'leaf_size' - leaf size (`int`, default: 40). Examples -------- Use the optimizer to minimize the well-known test function `Rosenbrock <http://en.wikipedia.org/wiki/Rosenbrock_function>`_: .. code-block:: python :linenos: >>> import numpy >>> from pypop7.benchmarks.base_functions import rosenbrock # function to be minimized >>> from pypop7.optimizers.bo.lamcts import LAMCTS >>> problem = {'fitness_function': rosenbrock, # define problem arguments ... 'ndim_problem': 2, ... 'lower_boundary': -5*numpy.ones((2,)), ... 'upper_boundary': 5*numpy.ones((2,))} >>> options = {'max_function_evaluations': 5000, # set optimizer options ... 'seed_rng': 1} >>> lamcts = LAMCTS(problem, options) # initialize the optimizer class >>> results = lamcts.optimize() # run the optimization process >>> # return the number of used function evaluations and found best-so-far fitness >>> print(f"LAMCTS: {results['n_function_evaluations']}, {results['best_so_far_y']}") LAMCTS: 5000, 0.00011439953866179555 For its correctness checking of coding, refer to `this code-based repeatability report <https://tinyurl.com/5f827dwh>`_ for more details. Attributes ---------- c_e : float factor to control exploration. init_individuals : int number of initial individuals. leaf_size : int leaf size. n_individuals : int number of individuals/samples. References ---------- Wang, L., Fonseca, R. and Tian, Y., 2020. Learning search space partition for black-box optimization using monte carlo tree search. Advances in Neural Information Processing Systems, 33, pp.19511-19522. https://arxiv.org/abs/2007.00708 (an updated version) https://proceedings.neurips.cc/paper/2020/hash/e2ce14e81dba66dbff9cbc35ecfdb704-Abstract.html (the original version) https://github.com/facebookresearch/LA-MCTS (an updated version) https://github.com/facebookresearch/LaMCTS (the original version) """ def __init__(self, problem, options): BO.__init__(self, problem, options) self.n_individuals = options.get('n_individuals', 100) # number of individuals/samples self.c_e = options.get('c_e', 0.01) # factor to control exploration self.leaf_size = options.get('leaf_size', 40) # leaf size self.init_individuals = options.get('init_individuals', 100) # number of initial individuals self._sampler, self._root, self._mcts = None, None, None self._n_generations = 0 self.problem = problem @property def stats(self): if self._mcts is not None: return self._mcts return self._stats() def _stats(self): if self._root is not None: leaves, sizes = [], [] self._root.sort_leaves(leaves) for leaf in leaves: sizes.append(len(leaf._bag)) sizes = np.array(sizes) return self._root.n_d() + 1, len(sizes), np.mean(sizes), np.median(sizes) else: return 0, 0, 0.0, 0.0 def initialize(self, args=None): def _func(x): return self._evaluate_fitness(x, args) self._sampler = Sampler(self.problem, _func, self.rng_initialization, self) samples = self._sampler.sample(self.init_individuals) self._root = Node(self.ndim_problem, self.leaf_size, self.c_e*samples.best.y, samples, lb=self.lower_boundary, ub=self.upper_boundary, rng=self.rng_initialization) self._root = Node.build_tree(self._root) self._mcts = self._stats() def iterate(self): all_leaves = [] self._root.sort_leaves(all_leaves) bags = None for sample_node in all_leaves: if self._check_terminations(): return path = Path(sample_node.path_from_root()) if bags is None: bags = self._sampler.sample(self.n_individuals, path) else: bags.extend(self._sampler.sample(self.n_individuals, path)) if len(bags) >= self.n_individuals: break if bags is None or len(bags) == 0: bags = self._sampler.sample(self.n_individuals) self._root.add_bag(bags) self._root.c_e = self.c_e*self._root._bag.best.y self._root = Node.build_tree(self._root) self._mcts = self._stats() self._n_generations += 1 def _collect(self, fitness, y=None): fitness.extend(y) results = BO._collect(self, fitness) results['_n_generations'] = self._n_generations return results def optimize(self, fitness_function=None, args=None): fitness = BO.optimize(self, fitness_function) self.initialize(args) while not self._check_terminations(): self.iterate() if self.verbose: print(f'n_function_evaluations {self.n_function_evaluations}: ' + f'best_so_far_y {self.best_so_far_y:.5}') return self._collect(fitness, self._sampler.fitness)