Source code for cpmpy.tools.tune_solver

"""
    This file implements parameter tuning for constraint solvers based on SMBO and using adaptive capping.
    Based on the following paper:
    Ignace Bleukx, Senne Berden, Lize Coenen, Nicholas Decleyre, Tias Guns (2022). Model-Based Algorithm
    Configuration with Adaptive Capping and Prior Distributions. In: Schaus, P. (eds) Integration of Constraint
    Programming, Artificial Intelligence, and Operations Research. CPAIOR 2022. Lecture Notes in Computer Science,
    vol 13292. Springer, Cham. https://doi.org/10.1007/978-3-031-08011-1_6

    - DOI: https://doi.org/10.1007/978-3-031-08011-1_6
    - Link to paper: https://rdcu.be/cQyWR
    - Link to original code of paper: https://github.com/ML-KULeuven/DeCaprio

    This code currently only implements the author's 'Hamming' surrogate function.
    The parameter tuner iteratively finds better hyperparameters close to the current best configuration during the search.
    Searching and time-out start at the default configuration for a solver (if available in the solver class)
"""
import math
import time
from random import shuffle
import numpy as np
from ..solvers.utils import SolverLookup, param_combinations
from ..solvers.solver_interface import ExitStatus, SolverInterface, SolverStatus


[docs] class ParameterTuner: """ Parameter tuner based on DeCaprio method [ref_to_decaprio] """ def __init__(self, solvername, model, all_params=None, defaults=None): """ :param solvername: Name of solver to tune :param model: CPMpy model to tune parameters on :param all_params: optional, dictionary with parameter names and values to tune. If None, use predefined parameter set. :param defaults: required ONLY IF all_params is set, dictionary with corresponding default parameter values. """ self.solvername = solvername self.model = model if all_params is None: self.all_params = SolverLookup.lookup(solvername).tunable_params() self.best_params = SolverLookup.lookup(solvername).default_params() else: self.all_params = all_params assert defaults is not None, "ParameterTuner: if 'all_params' is set then 'defaults' must too" self.best_params = defaults self._param_order = list(self.all_params.keys()) self._best_config = self._params_to_np([self.best_params])[0]
[docs] def tune(self, time_limit=None, max_tries=None, fix_params={}, verbose=1): """ :param time_limit: Time budget to run tuner in seconds. Solver will be interrupted when time budget is exceeded :param max_tries: Maximum number of configurations to test :param fix_params: Non-default parameters to run solvers with. :param verbose: how much information to print (0=none) """ if time_limit is not None: start_time = time.time() # Init solver if verbose >= 1: print(f"Running {self.solvername} with default parameters") if not isinstance(self.model, list): solver = SolverLookup.get(self.solvername, self.model) else: solver = MultiSolver(self.solvername, self.model) solver.solve(**self.best_params, time_limit=time_limit) if not _has_finished(solver): raise TimeoutError("Time's up before solving init solver call") self.base_runtime = solver.status().runtime self.best_runtime = self.base_runtime self._best_config = {} if verbose >= 1: print(f" - took {self.best_runtime:.1f} seconds") # Get all possible hyperparameter configurations combos = list(param_combinations(self.all_params)) combos_np = self._params_to_np(combos) # Ensure random start np.random.shuffle(combos_np) i = 0 if max_tries is None: max_tries = len(combos_np) while len(combos_np) and i < max_tries: # Make new solver if not isinstance(self.model, list): solver = SolverLookup.get(self.solvername, self.model) else: solver = MultiSolver(self.solvername, self.model) # Apply scoring to all combos scores = self._get_score(combos_np) max_idx = np.where(scores == scores.min())[0][0] # Get index of optimal combo params_np = combos_np[max_idx] # Remove optimal combo from combos combos_np = np.delete(combos_np, max_idx, axis=0) # Convert numpy array back to dictionary params_dict = self._np_to_params(params_np) # set fixed params params_dict.update(fix_params) timeout = self.best_runtime # set timeout depending on time budget if time_limit is not None: if (time.time() - start_time) >= time_limit: break timeout = min(timeout, time_limit - (time.time() - start_time)) if verbose >= 1: print(f"Starting trial {i+1}/{max_tries}, cap: {timeout:.1f}s -- remaining configs: {len(combos_np)}" + f" budget: {time_limit-(time.time()-start_time):.1f}s" if time_limit else "") # run solver solver.solve(**params_dict, time_limit=timeout) if _has_finished(solver): self.best_runtime = solver.status().runtime # update surrogate self._best_config = params_np if verbose >= 1: print(f" - new best runtime {self.best_runtime}") if verbose >= 2: print(f" - new best params {self._np_to_params(self._best_config)}") i += 1 self.best_params = self._np_to_params(self._best_config) self.best_params.update(fix_params) if verbose >= 2: print("Best runtime: {self.best_runtime} for params {self.best_params}") return self.best_params
def _get_score(self, combos): """ Return the hamming distance for each remaining configuration to the current best config. Lower score means better configuration, so exploit the current best configuration by only allowing small changes. """ return np.count_nonzero(combos != self._best_config, axis=1) def _params_to_np(self,combos): arr = [[params[key] for key in self._param_order] for params in combos] return np.array(arr) def _np_to_params(self,arr): return {key: val for key, val in zip(self._param_order, arr)}
[docs] class GridSearchTuner(ParameterTuner): """ Grid search parameter tuner that exhaustively tests all parameter combinations. Inherits from ParameterTuner but uses a simple grid search strategy """ def __init__(self, solvername, model, all_params=None, defaults=None): """ Initialize a grid search parameter tuner. :param solvername: Name of solver to tune :param model: CPMpy model to tune parameters on :param all_params: optional, dictionary with parameter names and values to tune. If None, use predefined parameter set. :param defaults: required ONLY IF all_params is set, dictionary with corresponding default parameter values. """ super().__init__(solvername, model, all_params, defaults)
[docs] def tune(self, time_limit=None, max_tries=None, fix_params={}, verbose=1): """ :param time_limit: Time budget to run tuner in seconds. Solver will be interrupted when time budget is exceeded :param max_tries: Maximum number of configurations to test :param fix_params: Non-default parameters to run solvers with. :param verbose: how much information to print (0=none) """ if time_limit is not None: start_time = time.time() # Init solver if verbose >= 1: print(f"Running {self.solvername} with default parameters") if not isinstance(self.model, list): solver = SolverLookup.get(self.solvername, self.model) else: solver = MultiSolver(self.solvername, self.model) solver.solve(**self.best_params,time_limit=time_limit) if not _has_finished(solver): raise TimeoutError("Time's up before solving init solver call") self.base_runtime = solver.status().runtime self.best_runtime = self.base_runtime if verbose >= 1: print(f" - took {self.best_runtime:.1f} seconds") # Get all possible hyperparameter configurations combos = list(param_combinations(self.all_params)) shuffle(combos) # test in random order if max_tries is not None: combos = combos[:max_tries] for i, params_dict in enumerate(combos): # Make new solver if not isinstance(self.model, list): solver = SolverLookup.get(self.solvername, self.model) else: solver = MultiSolver(self.solvername, self.model) # set fixed params params_dict.update(fix_params) timeout = self.best_runtime # set timeout depending on time budget if time_limit is not None: if (time.time() - start_time) >= time_limit: break timeout = min(timeout, time_limit - (time.time() - start_time)) if verbose >= 1: print(f"Starting trial {i+1}/{len(combos)}, cap: {timeout:.1f}s" + f" budget: {time_limit-(time.time()-start_time):.1f}s" if time_limit else "") # run solver solver.solve(**params_dict, time_limit=timeout) if _has_finished(solver): self.best_runtime = solver.status().runtime # update surrogate self.best_params = params_dict if verbose >= 1: print(f" - new best runtime {self.best_runtime:.1f}") if verbose >= 2: print(f" - new best params {self.best_params}") if time_limit is not None and (time.time() - start_time) >= time_limit: break if verbose >= 2: print(f"Best runtime: {self.best_runtime:.1f} for params {self.best_params}") return self.best_params
def _has_finished(solver): """ Check whether a given solver has found the target solution. Parameters ---------- solver : SolverInterface Returns ------- bool True if the solver has has found the target solution. This means: - For a `MultiSolver`: its own `has_finished()` method determines completion. - For a problem with an objective: status is OPTIMAL. - For a problem without an objective: status is FEASIBLE. - For an unsat problem: status is UNSATISFIABLE. False otherwise. """ if isinstance(solver,MultiSolver): return solver.has_finished() elif (((solver.has_objective() and solver.status().exitstatus == ExitStatus.OPTIMAL) or (not solver.has_objective() and solver.status().exitstatus == ExitStatus.FEASIBLE)) or (solver.status().exitstatus == ExitStatus.UNSATISFIABLE)): return True return False
[docs] class MultiSolver(SolverInterface): """ Class that manages multiple solver instances. Attributes ---------- name : str Name of the solver used for all instances. solvers : list of SolverInterface The solver instances corresponding to each model. cpm_status : SolverStatus Aggregated solver status. Tracks runtime and per-solver exit statuses. """ def __init__(self,solvername,models): """ Initialize a MultiSolver with the given list of solvers. Parameters ---------- solvername : str Name of the solver backend (e.g., "ortools", "gurobi"). models : list of Model The models to create solver instances for. """ self.name = solvername self.solvers = [] for mdl in models: self.solvers.append(SolverLookup.get(solvername,mdl))
[docs] def solve(self, time_limit=None, **kwargs): """ Solve the models sequentially using the solvers. Parameters ---------- time_limit : Global time limit in seconds for all solvers combined. **kwargs : dict Additional arguments passed to each solve method. Returns ------- bool True if all solvers returned a solution, False otherwise. """ self.cpm_status = SolverStatus(self.name) self.cpm_status.exitstatus = [ExitStatus.NOT_RUN] * len(self.solvers) all_has_sol = True # initialize exitstatus list init_start = time.time() for i, s in enumerate(self.solvers): # call solver start = time.time() has_sol = s.solve(time_limit=time_limit, **kwargs) # update only the current solver's exitstatus self.cpm_status.exitstatus[i] = s.status().exitstatus if time_limit is not None: time_limit = time_limit - (time.time() - start) if time_limit <= 0: break all_has_sol = all_has_sol and has_sol end = time.time() # update runtime self.cpm_status.runtime = end - init_start return all_has_sol
[docs] def has_finished(self): """ Check whether all solvers in the MultiSolver have finished. A solver is considered finished if: - It has an objective and reached OPTIMAL, or - It has no objective and reached FEASIBLE, or - It reached UNSATISFIABLE. Returns ------- bool True if all solvers have finished, False otherwise. """ all_have_finished = True for s in self.solvers: finished = ((s.has_objective() and s.status().exitstatus == ExitStatus.OPTIMAL) or (not s.has_objective() and s.status().exitstatus == ExitStatus.FEASIBLE) or (s.status().exitstatus == ExitStatus.UNSATISFIABLE)) all_have_finished = all_have_finished and finished return all_have_finished