Source code for pypop7.optimizers.bo.lamcts

from typing import NamedTuple
from itertools import count
from collections import deque
import time

import numpy as np  # engine for numerical computing
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.svm import SVC

from pypop7.optimizers.bo.bo import BO  # abstract class of all Bayesian optimization (BO) classes
from pypop7.optimizers.es.cmaes import CMAES  # covariance matrix adaptation evolution strategy


class Sample(NamedTuple):  # basic data class for classification
    x: np.ndarray = np.empty(0)
    y: float = float('NaN')


def _remove_duplicate(x):  # for class Bag
    u, idx = np.unique(x, return_index=True, axis=0)
    return u.astype(dtype=float), idx


class Bag(object):  # basic data class for classification
    def __init__(self, x, y=np.empty(0)):
        self._ndim = x.shape[1] if isinstance(x, np.ndarray) else x
        if isinstance(x, np.ndarray):
            self.x, i = _remove_duplicate(x)
        else:
            self.x, i = np.empty((0, self._ndim)), None
        if y.size > 0:
            self.y = y.copy() if i is None else y[i]
        else:
            self.y = np.empty(0)
        self.best, self.mean = None, float('NaN')
        self._update_best()

    def __len__(self):
        return len(self.x)

    def _update_best(self):
        if self.y.size == 0:
            return
        self.mean, best = np.mean(self.y), np.argmin(self.y)
        if (self.best is None) or (self.y[best] < self.best.y):
            self.best = Sample(self.x[best], self.y[best])

    def extend(self, other):
        size_bak = len(self.x)
        self.x, idx = _remove_duplicate(np.concatenate((self.x, other.x)))
        if len(self.x) == size_bak:  # without duplicate
            return False
        if len(self.y) == size_bak:
            if other.y.size > 0:
                self.y = np.concatenate((self.y, other.y))[idx]
            elif len(self.y) > 0:
                self.y = np.concatenate((self.y, np.full((len(self.x) - len(self.y),), float('inf'))))
            self._update_best()
        return True


class KMSVM(object):  # classification class based on K-Means and SVM
    def __init__(self, lb, ub, rng):
        self._lb, self._ub = lb, ub
        self._min_lb, self._max_ub = np.min(lb), np.max(ub)
        self._diff = (self._max_ub - self._min_lb)/2.0
        self._center = (self._max_ub + self._min_lb)/2.0
        self._scaler = StandardScaler(with_mean=False, with_std=False)
        self._kmeans = KMeans(n_clusters=2, random_state=rng.integers(2**32))
        self._svm = SVC(kernel='rbf', gamma='auto', max_iter=100000,
                        random_state=rng.integers(2**32))
        self._is_fitted = False

    def _learn_labels(self, bag):
        std = bag.y.std()
        std = 1.0 if std == 0.0 else std
        y = ((bag.y - bag.y.mean())/std)*self._diff + self._center
        return self._kmeans.fit_predict(np.concatenate((bag.x, y.reshape(-1, 1)), axis=1))

    def classify(self, bag):
        labels = self._learn_labels(bag)
        if len(np.unique(labels)) <= 1:
            return np.array([], dtype=np.int)
        self._scaler.fit(bag.x)
        x = self._scaler.transform(bag.x)
        self._svm.fit(x, labels)
        self._is_fitted = self._svm.fit_status_ == 0
        if self._is_fitted:
            return (self._svm.predict(x) >= 0.5).astype(int)
        return np.array([], dtype=np.int)

    def predict(self, x):
        if self._is_fitted:
            return (self._svm.predict(self._scaler.transform(x)) >= 0.5).astype(int)
        else:
            return np.full(len(x), 1, dtype=int)


class Node:  # basic class for recursive decomposition
    _next_id, _all_nodes, _all_leaves = count(), {}, set()

    @staticmethod
    def build_tree(root):
        queue = deque()
        queue.append(root)
        while len(queue) > 0:
            node = queue.popleft()
            for child in node.children:
                if child.is_leaf:
                    Node._all_leaves.remove(child._id)
                else:
                    queue.append(child)
                del Node._all_nodes[child._id]
            node.clear_children()
        queue = deque()
        queue.append(root)
        while len(queue) > 0:
            node = queue.popleft()
            Node._all_nodes[node._id] = node
            children = node.split()
            if len(children) == 0:
                Node._all_leaves.add(node._id)
            else:
                for child in children:
                    queue.append(child)
        return Node._all_nodes[root._id]

    @property
    def best(self):
        return self._bag.best

    @property
    def children(self):
        return [Node._all_nodes[n] for n in self._children]

    @property
    def is_leaf(self):
        return len(self._children) == 0

    @property
    def mean(self):
        return self._bag.mean

    def n_d(self):
        c = len(self._children)
        for n in self._children:
            c += Node._all_nodes[n].n_d()
        return c

    @property
    def parent(self):
        return None if self._parent < 0 else Node._all_nodes[self._parent]

    def _update_cb(self):
        self.cb = self._bag.best.y
        if self._parent >= 0:
            parent = Node._all_nodes[self._parent]
            n_p, n_j = len(parent._bag.y), len(self._bag.y)
            self.cb -= 2.0*self.c_e*np.sqrt(2.0*np.power(n_p, 0.5)/n_j)

    def __init__(self, ndim, leaf_size, c_e, bag, label=-1, parent=-1, lb=None, ub=None, rng=None):
        self._ndim, self._leaf_size, self.c_e = ndim, leaf_size, c_e
        self._lb, self._ub = lb, ub
        self._label, self._parent = label, parent
        self._classifier, self.rng = KMSVM(lb, ub, rng), rng
        self._children = []
        self._bag = bag
        self.cb = float('NaN')
        self._update_cb()
        self._id = next(Node._next_id)
        Node._all_nodes[self._id] = self

    def add_bag(self, bag):
        if not self._bag.extend(bag):
            return
        self._update_cb()
        if self._parent >= 0:
            Node._all_nodes[self._parent].add_bag(bag)

    def classify(self):
        s = []
        if len(self._bag) >= self._leaf_size:
            labels = self._classifier.classify(self._bag)
            unique_labels = np.unique(labels)
            if len(unique_labels) > 1:
                for label in unique_labels:
                    choice = label == labels
                    if choice.sum() < 3:
                        s.clear()
                        break
                    s.append((label, choice))
        return s

    def clear_children(self):
        self._children.clear()

    def path_from_root(self):
        nodes, node = [self], self.parent
        while node is not None:
            nodes.append(node)
            node = node.parent
        nodes.reverse()
        return nodes

    def split(self, s=None):
        self._children.clear()
        s = self.classify() if s is None else s
        children = []
        for label, choice in s:
            bag = Bag(self._bag.x[choice], self._bag.y[choice])
            child = Node(self._ndim, self._leaf_size, self.c_e, bag, label, self._id,
                         self._lb, self._ub, self.rng)
            children.append(child)
            self._children.append(child._id)
        return children

    def sort_leaves(self, nodes):
        if self.is_leaf:
            nodes.append(self)
        else:
            s = sorted(self.children, key=lambda c: (c.cb, c.mean, c.best.y), reverse=False)
            for node in s:
                node.sort_leaves(nodes)


class Path:  # basic class for recursive decomposition
    def __init__(self, nodes):
        self._path = nodes

    def __len__(self):
        return len(self._path)

    def __getitem__(self, i):
        return self._path[i]

    def expand(self):
        last = self._path[-1]
        while not last.is_leaf:
            last = sorted(last.children, key=lambda c: (c.cb, c.mean, c.best.y), reverse=False)[0]
            self._path.append(last)


class Sampler(object):  # for sampling in nodes
    def __init__(self, problem, func, rng, ev):
        self.problem, self._ndim = problem, problem['ndim_problem']
        self.problem['fitness_function'] = func
        self.lb, self.ub = problem['lower_boundary'], problem['upper_boundary']
        self.cmaes, self.rng, self.fitness = None, rng, []
        self.ev = ev

    def sample(self, n_samples, path=None):
        if path is None or len(path) < 2:
            x = (self.ub + self.lb)/2.0
            sigma = np.max(self.ub - self.lb)/6.0
        else:
            bag = path[-1]._bag
            x, x_std = np.mean(bag.x, axis=0), np.std(bag.x, axis=0) 
            sigma = (np.mean(x_std) + 3.0*np.std(x_std))/3.0
            sigma = np.max(self.ub - self.lb)/6.0 if sigma == 0.0 else sigma
        bags = Bag(self._ndim)
        options = {'mean': x, 'sigma': sigma, 'n_individuals': 4,
                   'seed_rng': self.rng.integers(np.iinfo(np.int64).max),
                   'saving_fitness': 1, 'verbose': False, 'max_function_evaluations':
                       self.ev.max_function_evaluations - self.ev.n_function_evaluations}
        self.cmaes = CMAES(self.problem, options)
        self.cmaes.start_time = time.time()
        x, mean, p_s, p_c, cm, e_ve, e_va, y, d = self.cmaes.initialize()
        while len(bags) < n_samples:
            x, y, d = self.cmaes.iterate(x, mean, e_ve, e_va, y, d)
            self.fitness.extend(y)
            if self.ev._check_terminations():
                return bags
            self.cmaes._n_generations += 1
            mean, p_s, p_c, cm, e_ve, e_va = self.cmaes.update_distribution(
                x, p_s, p_c, cm, e_ve, e_va, y, d)
            bags.extend(Bag(np.copy(x), np.copy(y)))
        return bags


[docs]class LAMCTS(BO): """Latent Action Monte Carlo Tree Search (LAMCTS). Parameters ---------- problem : dict problem arguments with the following common settings (`keys`): * 'fitness_function' - objective function to be **minimized** (`func`), * 'ndim_problem' - number of dimensionality (`int`), * 'upper_boundary' - upper boundary of search range (`array_like`), * 'lower_boundary' - lower boundary of search range (`array_like`). options : dict optimizer options with the following common settings (`keys`): * 'max_function_evaluations' - maximum of function evaluations (`int`, default: `np.inf`), * 'max_runtime' - maximal runtime to be allowed (`float`, default: `np.inf`), * 'seed_rng' - seed for random number generation needed to be *explicitly* set (`int`); and with the following particular settings (`keys`): * 'n_individuals' - number of individuals/samples (`int`, default: `100`), * 'c_e' - factor to control exploration (`float`, default: `0.01`), * 'leaf_size' - leaf size (`int`, default: 40). Examples -------- Use the black-box optimizer `LAMCTS` to minimize the well-known test function `Rosenbrock <http://en.wikipedia.org/wiki/Rosenbrock_function>`_: .. code-block:: python :linenos: >>> import numpy >>> from pypop7.benchmarks.base_functions import rosenbrock # function to be minimized >>> from pypop7.optimizers.bo.lamcts import LAMCTS >>> problem = {'fitness_function': rosenbrock, # to define problem arguments ... 'ndim_problem': 2, ... 'lower_boundary': -5.0*numpy.ones((2,)), ... 'upper_boundary': 5.0*numpy.ones((2,))} >>> options = {'max_function_evaluations': 5000, # to set optimizer options ... 'seed_rng': 1} >>> lamcts = LAMCTS(problem, options) # to initialize the optimizer class >>> results = lamcts.optimize() # to run the optimization process >>> print(f"LAMCTS: {results['n_function_evaluations']}, {results['best_so_far_y']}") LAMCTS: 5000, 0.0001 For its correctness checking of coding, refer to `this code-based repeatability report <https://tinyurl.com/5f827dwh>`_ for more details. Attributes ---------- c_e : float factor to control exploration. init_individuals : int number of initial individuals. leaf_size : int leaf size. n_individuals : int number of individuals/samples. References ---------- Wang, L., Fonseca, R. and Tian, Y., 2020. Learning search space partition for black-box optimization using monte carlo tree search. Advances in Neural Information Processing Systems, 33, pp.19511-19522. https://arxiv.org/abs/2007.00708 (an updated version) https://proceedings.neurips.cc/paper/2020/hash/e2ce14e81dba66dbff9cbc35ecfdb704-Abstract.html (the original version) https://github.com/facebookresearch/LA-MCTS (an updated version) https://github.com/facebookresearch/LaMCTS (the original version) """ def __init__(self, problem, options): BO.__init__(self, problem, options) self.n_individuals = options.get('n_individuals', 100) # number of individuals/samples self.c_e = options.get('c_e', 0.01) # factor to control exploration self.leaf_size = options.get('leaf_size', 40) # leaf size self.init_individuals = options.get('init_individuals', 100) # number of initial individuals self._sampler, self._root, self._mcts = None, None, None self._n_generations = 0 self.problem = problem @property def stats(self): if self._mcts is not None: return self._mcts return self._stats() def _stats(self): if self._root is not None: leaves, sizes = [], [] self._root.sort_leaves(leaves) for leaf in leaves: sizes.append(len(leaf._bag)) sizes = np.array(sizes) return self._root.n_d() + 1, len(sizes), np.mean(sizes), np.median(sizes) else: return 0, 0, 0.0, 0.0 def initialize(self, args=None): def _func(x): return self._evaluate_fitness(x, args) self._sampler = Sampler(self.problem, _func, self.rng_initialization, self) samples = self._sampler.sample(self.init_individuals) self._root = Node(self.ndim_problem, self.leaf_size, self.c_e*samples.best.y, samples, lb=self.lower_boundary, ub=self.upper_boundary, rng=self.rng_initialization) self._root = Node.build_tree(self._root) self._mcts = self._stats() def iterate(self): all_leaves = [] self._root.sort_leaves(all_leaves) bags = None for sample_node in all_leaves: if self._check_terminations(): return path = Path(sample_node.path_from_root()) if bags is None: bags = self._sampler.sample(self.n_individuals, path) else: bags.extend(self._sampler.sample(self.n_individuals, path)) if len(bags) >= self.n_individuals: break if bags is None or len(bags) == 0: bags = self._sampler.sample(self.n_individuals) self._root.add_bag(bags) self._root.c_e = self.c_e*self._root._bag.best.y self._root = Node.build_tree(self._root) self._mcts = self._stats() self._n_generations += 1 def _collect(self, fitness, y=None): fitness.extend(y) results = BO._collect(self, fitness) results['_n_generations'] = self._n_generations return results def optimize(self, fitness_function=None, args=None): fitness = BO.optimize(self, fitness_function) self.initialize(args) while not self._check_terminations(): self.iterate() if self.verbose: print(f'n_function_evaluations {self.n_function_evaluations}: ' + f'best_so_far_y {self.best_so_far_y:.5}') return self._collect(fitness, self._sampler.fitness)