You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

673 lines
28 KiB
Python

# Dual Annealing implementation.
# Copyright (c) 2018 Sylvain Gubian <sylvain.gubian@pmi.com>,
# Yang Xiang <yang.xiang@pmi.com>
# Author: Sylvain Gubian, Yang Xiang, PMP S.A.
"""
A Dual Annealing global optimization algorithm
"""
from __future__ import division, print_function, absolute_import
import numpy as np
from scipy.optimize import OptimizeResult
from scipy.optimize import minimize
from scipy.special import gammaln
from scipy._lib._util import check_random_state
__all__ = ['dual_annealing']
class VisitingDistribution(object):
"""
Class used to generate new coordinates based on the distorted
Cauchy-Lorentz distribution. Depending on the steps within the strategy
chain, the class implements the strategy for generating new location
changes.
Parameters
----------
lb : array_like
A 1-D numpy ndarray containing lower bounds of the generated
components. Neither NaN or inf are allowed.
ub : array_like
A 1-D numpy ndarray containing upper bounds for the generated
components. Neither NaN or inf are allowed.
visiting_param : float
Parameter for visiting distribution. Default value is 2.62.
Higher values give the visiting distribution a heavier tail, this
makes the algorithm jump to a more distant region.
The value range is (0, 3].
rand_state : RandomState object
A numpy.random.RandomState object for using the current state of the
created random generator container.
"""
TAIL_LIMIT = 1.e8
MIN_VISIT_BOUND = 1.e-10
def __init__(self, lb, ub, visiting_param, rand_state):
self.visiting_param = visiting_param
self.rand_state = rand_state
self.lower = lb
self.upper = ub
self.bound_range = ub - lb
def visiting(self, x, step, temperature):
""" Based on the step in the strategy chain, new coordinated are
generated by changing all components is the same time or only
one of them, the new values are computed with visit_fn method
"""
dim = x.size
if step < dim:
# Changing all coordinates with a new visting value
visits = np.array([self.visit_fn(
temperature) for _ in range(dim)])
upper_sample = self.rand_state.random_sample()
lower_sample = self.rand_state.random_sample()
visits[visits > self.TAIL_LIMIT] = self.TAIL_LIMIT * upper_sample
visits[visits < -self.TAIL_LIMIT] = -self.TAIL_LIMIT * lower_sample
x_visit = visits + x
a = x_visit - self.lower
b = np.fmod(a, self.bound_range) + self.bound_range
x_visit = np.fmod(b, self.bound_range) + self.lower
x_visit[np.fabs(
x_visit - self.lower) < self.MIN_VISIT_BOUND] += 1.e-10
else:
# Changing only one coordinate at a time based on strategy
# chain step
x_visit = np.copy(x)
visit = self.visit_fn(temperature)
if visit > self.TAIL_LIMIT:
visit = self.TAIL_LIMIT * self.rand_state.random_sample()
elif visit < -self.TAIL_LIMIT:
visit = -self.TAIL_LIMIT * self.rand_state.random_sample()
index = step - dim
x_visit[index] = visit + x[index]
a = x_visit[index] - self.lower[index]
b = np.fmod(a, self.bound_range[index]) + self.bound_range[index]
x_visit[index] = np.fmod(b, self.bound_range[
index]) + self.lower[index]
if np.fabs(x_visit[index] - self.lower[
index]) < self.MIN_VISIT_BOUND:
x_visit[index] += self.MIN_VISIT_BOUND
return x_visit
def visit_fn(self, temperature):
""" Formula Visita from p. 405 of reference [2] """
factor1 = np.exp(np.log(temperature) / (self.visiting_param - 1.0))
factor2 = np.exp((4.0 - self.visiting_param) * np.log(
self.visiting_param - 1.0))
factor3 = np.exp((2.0 - self.visiting_param) * np.log(2.0) / (
self.visiting_param - 1.0))
factor4 = np.sqrt(np.pi) * factor1 * factor2 / (factor3 * (
3.0 - self.visiting_param))
factor5 = 1.0 / (self.visiting_param - 1.0) - 0.5
d1 = 2.0 - factor5
factor6 = np.pi * (1.0 - factor5) / np.sin(
np.pi * (1.0 - factor5)) / np.exp(gammaln(d1))
sigmax = np.exp(-(self.visiting_param - 1.0) * np.log(
factor6 / factor4) / (3.0 - self.visiting_param))
x = sigmax * self.rand_state.normal()
y = self.rand_state.normal()
den = np.exp(
(self.visiting_param - 1.0) * np.log((np.fabs(y))) / (
3.0 - self.visiting_param))
return x / den
class EnergyState(object):
"""
Class used to record the energy state. At any time, it knows what is the
currently used coordinates and the most recent best location.
Parameters
----------
lower : array_like
A 1-D numpy ndarray containing lower bounds for generating an initial
random components in the `reset` method.
upper : array_like
A 1-D numpy ndarray containing upper bounds for generating an initial
random components in the `reset` method
components. Neither NaN or inf are allowed.
callback : callable, ``callback(x, f, context)``, optional
A callback function which will be called for all minima found.
``x`` and ``f`` are the coordinates and function value of the
latest minimum found, and `context` has value in [0, 1, 2]
"""
# Maximimum number of trials for generating a valid starting point
MAX_REINIT_COUNT = 1000
def __init__(self, lower, upper, callback=None):
self.ebest = None
self.current_energy = None
self.current_location = None
self.xbest = None
self.lower = lower
self.upper = upper
self.callback = callback
def reset(self, func_wrapper, rand_state, x0=None):
"""
Initialize current location is the search domain. If `x0` is not
provided, a random location within the bounds is generated.
"""
if x0 is None:
self.current_location = self.lower + rand_state.random_sample(
len(self.lower)) * (self.upper - self.lower)
else:
self.current_location = np.copy(x0)
init_error = True
reinit_counter = 0
while init_error:
self.current_energy = func_wrapper.fun(self.current_location)
if self.current_energy is None:
raise ValueError('Objective function is returning None')
if (not np.isfinite(self.current_energy) or np.isnan(
self.current_energy)):
if reinit_counter >= EnergyState.MAX_REINIT_COUNT:
init_error = False
message = (
'Stopping algorithm because function '
'create NaN or (+/-) infinity values even with '
'trying new random parameters'
)
raise ValueError(message)
self.current_location = self.lower + rand_state.random_sample(
self.lower.size) * (self.upper - self.lower)
reinit_counter += 1
else:
init_error = False
# If first time reset, initialize ebest and xbest
if self.ebest is None and self.xbest is None:
self.ebest = self.current_energy
self.xbest = np.copy(self.current_location)
# Otherwise, we keep them in case of reannealing reset
def update_best(self, e, x, context):
self.ebest = e
self.xbest = np.copy(x)
if self.callback is not None:
val = self.callback(x, e, context)
if val is not None:
if val:
return('Callback function requested to stop early by '
'returning True')
def update_current(self, e, x):
self.current_energy = e
self.current_location = np.copy(x)
class StrategyChain(object):
"""
Class that implements within a Markov chain the strategy for location
acceptance and local search decision making.
Parameters
----------
acceptance_param : float
Parameter for acceptance distribution. It is used to control the
probability of acceptance. The lower the acceptance parameter, the
smaller the probability of acceptance. Default value is -5.0 with
a range (-1e4, -5].
visit_dist : VisitingDistribution
Instance of `VisitingDistribution` class.
func_wrapper : ObjectiveFunWrapper
Instance of `ObjectiveFunWrapper` class.
minimizer_wrapper: LocalSearchWrapper
Instance of `LocalSearchWrapper` class.
rand_state : RandomState object
A numpy.random.RandomState object for using the current state of the
created random generator container.
energy_state: EnergyState
Instance of `EnergyState` class.
"""
def __init__(self, acceptance_param, visit_dist, func_wrapper,
minimizer_wrapper, rand_state, energy_state):
# Local strategy chain minimum energy and location
self.emin = energy_state.current_energy
self.xmin = np.array(energy_state.current_location)
# Global optimizer state
self.energy_state = energy_state
# Acceptance parameter
self.acceptance_param = acceptance_param
# Visiting distribution instance
self.visit_dist = visit_dist
# Wrapper to objective function
self.func_wrapper = func_wrapper
# Wrapper to the local minimizer
self.minimizer_wrapper = minimizer_wrapper
self.not_improved_idx = 0
self.not_improved_max_idx = 1000
self._rand_state = rand_state
self.temperature_step = 0
self.K = 100 * len(energy_state.current_location)
def accept_reject(self, j, e, x_visit):
r = self._rand_state.random_sample()
pqv_temp = (self.acceptance_param - 1.0) * (
e - self.energy_state.current_energy) / (
self.temperature_step + 1.)
if pqv_temp <= 0.:
pqv = 0.
else:
pqv = np.exp(np.log(pqv_temp) / (
1. - self.acceptance_param))
if r <= pqv:
# We accept the new location and update state
self.energy_state.update_current(e, x_visit)
self.xmin = np.copy(self.energy_state.current_location)
# No improvement for a long time
if self.not_improved_idx >= self.not_improved_max_idx:
if j == 0 or self.energy_state.current_energy < self.emin:
self.emin = self.energy_state.current_energy
self.xmin = np.copy(self.energy_state.current_location)
def run(self, step, temperature):
self.temperature_step = temperature / float(step + 1)
self.not_improved_idx += 1
for j in range(self.energy_state.current_location.size * 2):
if j == 0:
if step == 0:
self.energy_state_improved = True
else:
self.energy_state_improved = False
x_visit = self.visit_dist.visiting(
self.energy_state.current_location, j, temperature)
# Calling the objective function
e = self.func_wrapper.fun(x_visit)
if e < self.energy_state.current_energy:
# We have got a better energy value
self.energy_state.update_current(e, x_visit)
if e < self.energy_state.ebest:
val = self.energy_state.update_best(e, x_visit, 0)
if val is not None:
if val:
return val
self.energy_state_improved = True
self.not_improved_idx = 0
else:
# We have not improved but do we accept the new location?
self.accept_reject(j, e, x_visit)
if self.func_wrapper.nfev >= self.func_wrapper.maxfun:
return ('Maximum number of function call reached '
'during annealing')
# End of StrategyChain loop
def local_search(self):
# Decision making for performing a local search
# based on strategy chain results
# If energy has been improved or no improvement since too long,
# performing a local search with the best strategy chain location
if self.energy_state_improved:
# Global energy has improved, let's see if LS improves further
e, x = self.minimizer_wrapper.local_search(self.energy_state.xbest,
self.energy_state.ebest)
if e < self.energy_state.ebest:
self.not_improved_idx = 0
val = self.energy_state.update_best(e, x, 1)
if val is not None:
if val:
return val
self.energy_state.update_current(e, x)
if self.func_wrapper.nfev >= self.func_wrapper.maxfun:
return ('Maximum number of function call reached '
'during local search')
# Check probability of a need to perform a LS even if no improvement
do_ls = False
if self.K < 90 * len(self.energy_state.current_location):
pls = np.exp(self.K * (
self.energy_state.ebest - self.energy_state.current_energy
) / self.temperature_step)
if pls >= self._rand_state.random_sample():
do_ls = True
# Global energy not improved, let's see what LS gives
# on the best strategy chain location
if self.not_improved_idx >= self.not_improved_max_idx:
do_ls = True
if do_ls:
e, x = self.minimizer_wrapper.local_search(self.xmin, self.emin)
self.xmin = np.copy(x)
self.emin = e
self.not_improved_idx = 0
self.not_improved_max_idx = self.energy_state.current_location.size
if e < self.energy_state.ebest:
val = self.energy_state.update_best(
self.emin, self.xmin, 2)
if val is not None:
if val:
return val
self.energy_state.update_current(e, x)
if self.func_wrapper.nfev >= self.func_wrapper.maxfun:
return ('Maximum number of function call reached '
'during dual annealing')
class ObjectiveFunWrapper(object):
def __init__(self, func, maxfun=1e7, *args):
self.func = func
self.args = args
# Number of objective function evaluations
self.nfev = 0
# Number of gradient function evaluation if used
self.ngev = 0
# Number of hessian of the objective function if used
self.nhev = 0
self.maxfun = maxfun
def fun(self, x):
self.nfev += 1
return self.func(x, *self.args)
class LocalSearchWrapper(object):
"""
Class used to wrap around the minimizer used for local search
Default local minimizer is SciPy minimizer L-BFGS-B
"""
LS_MAXITER_RATIO = 6
LS_MAXITER_MIN = 100
LS_MAXITER_MAX = 1000
def __init__(self, bounds, func_wrapper, **kwargs):
self.func_wrapper = func_wrapper
self.kwargs = kwargs
self.minimizer = minimize
bounds_list = list(zip(*bounds))
self.lower = np.array(bounds_list[0])
self.upper = np.array(bounds_list[1])
# If no minimizer specified, use SciPy minimize with 'L-BFGS-B' method
if not self.kwargs:
n = len(self.lower)
ls_max_iter = min(max(n * self.LS_MAXITER_RATIO,
self.LS_MAXITER_MIN),
self.LS_MAXITER_MAX)
self.kwargs['method'] = 'L-BFGS-B'
self.kwargs['options'] = {
'maxiter': ls_max_iter,
}
self.kwargs['bounds'] = list(zip(self.lower, self.upper))
def local_search(self, x, e):
# Run local search from the given x location where energy value is e
x_tmp = np.copy(x)
mres = self.minimizer(self.func_wrapper.fun, x, **self.kwargs)
if 'njev' in mres.keys():
self.func_wrapper.ngev += mres.njev
if 'nhev' in mres.keys():
self.func_wrapper.nhev += mres.nhev
# Check if is valid value
is_finite = np.all(np.isfinite(mres.x)) and np.isfinite(mres.fun)
in_bounds = np.all(mres.x >= self.lower) and np.all(
mres.x <= self.upper)
is_valid = is_finite and in_bounds
# Use the new point only if it is valid and return a better results
if is_valid and mres.fun < e:
return mres.fun, mres.x
else:
return e, x_tmp
def dual_annealing(func, bounds, args=(), maxiter=1000,
local_search_options={}, initial_temp=5230.,
restart_temp_ratio=2.e-5, visit=2.62, accept=-5.0,
maxfun=1e7, seed=None, no_local_search=False,
callback=None, x0=None):
"""
Find the global minimum of a function using Dual Annealing.
Parameters
----------
func : callable
The objective function to be minimized. Must be in the form
``f(x, *args)``, where ``x`` is the argument in the form of a 1-D array
and ``args`` is a tuple of any additional fixed parameters needed to
completely specify the function.
bounds : sequence, shape (n, 2)
Bounds for variables. ``(min, max)`` pairs for each element in ``x``,
defining bounds for the objective function parameter.
args : tuple, optional
Any additional fixed parameters needed to completely specify the
objective function.
maxiter : int, optional
The maximum number of global search iterations. Default value is 1000.
local_search_options : dict, optional
Extra keyword arguments to be passed to the local minimizer
(`minimize`). Some important options could be:
``method`` for the minimizer method to use and ``args`` for
objective function additional arguments.
initial_temp : float, optional
The initial temperature, use higher values to facilitates a wider
search of the energy landscape, allowing dual_annealing to escape
local minima that it is trapped in. Default value is 5230. Range is
(0.01, 5.e4].
restart_temp_ratio : float, optional
During the annealing process, temperature is decreasing, when it
reaches ``initial_temp * restart_temp_ratio``, the reannealing process
is triggered. Default value of the ratio is 2e-5. Range is (0, 1).
visit : float, optional
Parameter for visiting distribution. Default value is 2.62. Higher
values give the visiting distribution a heavier tail, this makes
the algorithm jump to a more distant region. The value range is (0, 3].
accept : float, optional
Parameter for acceptance distribution. It is used to control the
probability of acceptance. The lower the acceptance parameter, the
smaller the probability of acceptance. Default value is -5.0 with
a range (-1e4, -5].
maxfun : int, optional
Soft limit for the number of objective function calls. If the
algorithm is in the middle of a local search, this number will be
exceeded, the algorithm will stop just after the local search is
done. Default value is 1e7.
seed : {int or `numpy.random.RandomState` instance}, optional
If `seed` is not specified the `numpy.random.RandomState` singleton is
used.
If `seed` is an int, a new ``RandomState`` instance is used,
seeded with `seed`.
If `seed` is already a ``RandomState`` instance, then that
instance is used.
Specify `seed` for repeatable minimizations. The random numbers
generated with this seed only affect the visiting distribution
function and new coordinates generation.
no_local_search : bool, optional
If `no_local_search` is set to True, a traditional Generalized
Simulated Annealing will be performed with no local search
strategy applied.
callback : callable, optional
A callback function with signature ``callback(x, f, context)``,
which will be called for all minima found.
``x`` and ``f`` are the coordinates and function value of the
latest minimum found, and ``context`` has value in [0, 1, 2], with the
following meaning:
- 0: minimum detected in the annealing process.
- 1: detection occured in the local search process.
- 2: detection done in the dual annealing process.
If the callback implementation returns True, the algorithm will stop.
x0 : ndarray, shape(n,), optional
Coordinates of a single n-dimensional starting point.
Returns
-------
res : OptimizeResult
The optimization result represented as a `OptimizeResult` object.
Important attributes are: ``x`` the solution array, ``fun`` the value
of the function at the solution, and ``message`` which describes the
cause of the termination.
See `OptimizeResult` for a description of other attributes.
Notes
-----
This function implements the Dual Annealing optimization. This stochastic
approach derived from [3]_ combines the generalization of CSA (Classical
Simulated Annealing) and FSA (Fast Simulated Annealing) [1]_ [2]_ coupled
to a strategy for applying a local search on accepted locations [4]_.
An alternative implementation of this same algorithm is described in [5]_
and benchmarks are presented in [6]_. This approach introduces an advanced
method to refine the solution found by the generalized annealing
process. This algorithm uses a distorted Cauchy-Lorentz visiting
distribution, with its shape controlled by the parameter :math:`q_{v}`
.. math::
g_{q_{v}}(\\Delta x(t)) \\propto \\frac{ \\
\\left[T_{q_{v}}(t) \\right]^{-\\frac{D}{3-q_{v}}}}{ \\
\\left[{1+(q_{v}-1)\\frac{(\\Delta x(t))^{2}} { \\
\\left[T_{q_{v}}(t)\\right]^{\\frac{2}{3-q_{v}}}}}\\right]^{ \\
\\frac{1}{q_{v}-1}+\\frac{D-1}{2}}}
Where :math:`t` is the artificial time. This visiting distribution is used
to generate a trial jump distance :math:`\\Delta x(t)` of variable
:math:`x(t)` under artificial temperature :math:`T_{q_{v}}(t)`.
From the starting point, after calling the visiting distribution
function, the acceptance probability is computed as follows:
.. math::
p_{q_{a}} = \\min{\\{1,\\left[1-(1-q_{a}) \\beta \\Delta E \\right]^{ \\
\\frac{1}{1-q_{a}}}\\}}
Where :math:`q_{a}` is a acceptance parameter. For :math:`q_{a}<1`, zero
acceptance probability is assigned to the cases where
.. math::
[1-(1-q_{a}) \\beta \\Delta E] < 0
The artificial temperature :math:`T_{q_{v}}(t)` is decreased according to
.. math::
T_{q_{v}}(t) = T_{q_{v}}(1) \\frac{2^{q_{v}-1}-1}{\\left( \\
1 + t\\right)^{q_{v}-1}-1}
Where :math:`q_{v}` is the visiting parameter.
.. versionadded:: 1.2.0
References
----------
.. [1] Tsallis C. Possible generalization of Boltzmann-Gibbs
statistics. Journal of Statistical Physics, 52, 479-487 (1998).
.. [2] Tsallis C, Stariolo DA. Generalized Simulated Annealing.
Physica A, 233, 395-406 (1996).
.. [3] Xiang Y, Sun DY, Fan W, Gong XG. Generalized Simulated
Annealing Algorithm and Its Application to the Thomson Model.
Physics Letters A, 233, 216-220 (1997).
.. [4] Xiang Y, Gong XG. Efficiency of Generalized Simulated
Annealing. Physical Review E, 62, 4473 (2000).
.. [5] Xiang Y, Gubian S, Suomela B, Hoeng J. Generalized
Simulated Annealing for Efficient Global Optimization: the GenSA
Package for R. The R Journal, Volume 5/1 (2013).
.. [6] Mullen, K. Continuous Global Optimization in R. Journal of
Statistical Software, 60(6), 1 - 45, (2014). DOI:10.18637/jss.v060.i06
Examples
--------
The following example is a 10-dimensional problem, with many local minima.
The function involved is called Rastrigin
(https://en.wikipedia.org/wiki/Rastrigin_function)
>>> from scipy.optimize import dual_annealing
>>> func = lambda x: np.sum(x*x - 10*np.cos(2*np.pi*x)) + 10*np.size(x)
>>> lw = [-5.12] * 10
>>> up = [5.12] * 10
>>> ret = dual_annealing(func, bounds=list(zip(lw, up)), seed=1234)
>>> print("global minimum: xmin = {0}, f(xmin) = {1:.6f}".format(
... ret.x, ret.fun))
global minimum: xmin = [-4.26437714e-09 -3.91699361e-09 -1.86149218e-09 -3.97165720e-09
-6.29151648e-09 -6.53145322e-09 -3.93616815e-09 -6.55623025e-09
-6.05775280e-09 -5.00668935e-09], f(xmin) = 0.000000
"""
if x0 is not None and not len(x0) == len(bounds):
raise ValueError('Bounds size does not match x0')
lu = list(zip(*bounds))
lower = np.array(lu[0])
upper = np.array(lu[1])
# Check that restart temperature ratio is correct
if restart_temp_ratio <= 0. or restart_temp_ratio >= 1.:
raise ValueError('Restart temperature ratio has to be in range (0, 1)')
# Checking bounds are valid
if (np.any(np.isinf(lower)) or np.any(np.isinf(upper)) or np.any(
np.isnan(lower)) or np.any(np.isnan(upper))):
raise ValueError('Some bounds values are inf values or nan values')
# Checking that bounds are consistent
if not np.all(lower < upper):
raise ValueError('Bounds are not consistent min < max')
# Checking that bounds are the same length
if not len(lower) == len(upper):
raise ValueError('Bounds do not have the same dimensions')
# Wrapper for the objective function
func_wrapper = ObjectiveFunWrapper(func, maxfun, *args)
# Wrapper fot the minimizer
minimizer_wrapper = LocalSearchWrapper(
bounds, func_wrapper, **local_search_options)
# Initialization of RandomState for reproducible runs if seed provided
rand_state = check_random_state(seed)
# Initialization of the energy state
energy_state = EnergyState(lower, upper, callback)
energy_state.reset(func_wrapper, rand_state, x0)
# Minimum value of annealing temperature reached to perform
# re-annealing
temperature_restart = initial_temp * restart_temp_ratio
# VisitingDistribution instance
visit_dist = VisitingDistribution(lower, upper, visit, rand_state)
# Strategy chain instance
strategy_chain = StrategyChain(accept, visit_dist, func_wrapper,
minimizer_wrapper, rand_state, energy_state)
# Run the search loop
need_to_stop = False
iteration = 0
message = []
t1 = np.exp((visit - 1) * np.log(2.0)) - 1.0
while(not need_to_stop):
for i in range(maxiter):
# Compute temperature for this step
s = float(i) + 2.0
t2 = np.exp((visit - 1) * np.log(s)) - 1.0
temperature = initial_temp * t1 / t2
if iteration >= maxiter:
message.append("Maximum number of iteration reached")
need_to_stop = True
break
# Need a re-annealing process?
if temperature < temperature_restart:
energy_state.reset(func_wrapper, rand_state)
break
# starting strategy chain
val = strategy_chain.run(i, temperature)
if val is not None:
message.append(val)
need_to_stop = True
break
# Possible local search at the end of the strategy chain
if not no_local_search:
val = strategy_chain.local_search()
if val is not None:
message.append(val)
need_to_stop = True
break
iteration += 1
# Return the OptimizeResult
res = OptimizeResult()
res.x = energy_state.xbest
res.fun = energy_state.ebest
res.nit = iteration
res.nfev = func_wrapper.nfev
res.njev = func_wrapper.ngev
res.nhev = func_wrapper.nhev
res.message = message
return res