Source code for cpmpy.solvers.cpo

"""
    Interface to CP Optimizer's Python API.

    CP Optimizer, also a feature of IBM ILOG Optimization Studio, is a software library of constraint programming tools 
    supporting constraint propagation, domain reduction, and highly optimized solution search.

    Always use :func:`cp.SolverLookup.get("cpo") <cpmpy.solvers.utils.SolverLookup.get>` to instantiate the solver object.

    ============
    Installation
    ============

    Requires that the 'docplex' python package is installed:

    .. code-block:: console
    
        $ pip install docplex

    docplex documentation:
    https://ibmdecisionoptimization.github.io/docplex-doc/

    You will also need to install CPLEX Optimization Studio from IBM's website,
    and add the location of the CP Optimizer binary to your path.
    There is a free community version available.
    https://www.ibm.com/products/ilog-cplex-optimization-studio

    See detailed installation instructions at:
    https://www.ibm.com/docs/en/icos/22.1.2?topic=2212-installing-cplex-optimization-studio

    Academic license:
    https://community.ibm.com/community/user/ai-datascience/blogs/xavier-nodet1/2020/07/09/cplex-free-for-students

    The rest of this documentation is for advanced users.

    ===============
    List of classes
    ===============

    .. autosummary::
        :nosignatures:

        CPM_cpo
"""

import shutil
import warnings

from .solver_interface import SolverInterface, SolverStatus, ExitStatus
from .. import DirectConstraint
from ..expressions.core import Expression, Comparison, Operator, BoolVal
from ..expressions.globalconstraints import GlobalConstraint
from ..expressions.globalfunctions import GlobalFunction
from ..expressions.variables import _BoolVarImpl, NegBoolView, _IntVarImpl, _NumVarImpl
from ..expressions.utils import is_num, is_any_list, eval_comparison, argval, argvals, get_bounds
from ..transformations.get_variables import get_variables
from ..transformations.normalize import toplevel_list
from ..transformations.decompose_global import decompose_in_tree
from ..transformations.safening import no_partial_functions



[docs]class CPM_cpo(SolverInterface): """ Interface to CP Optimizer's Python API. Creates the following attributes (see parent constructor for more): - ``cpo_model``: object, CP Optimizers model object Documentation of the solver's own Python API: (all modeling functions) https://ibmdecisionoptimization.github.io/docplex-doc/cp/docplex.cp.modeler.py.html#module-docplex.cp.modeler """ _docp = None # Static attribute to hold the docplex.cp module
[docs] @classmethod def get_docp(cls): if cls._docp is None: import docplex.cp as docp # Import only once cls._docp = docp return cls._docp
[docs] @staticmethod def supported(): return CPM_cpo.installed() and CPM_cpo.license_ok()
[docs] @staticmethod def installed(): # try to import the package try: import docplex.cp as docp return True except ModuleNotFoundError: return False
[docs] @staticmethod def license_ok(): if not CPM_cpo.installed(): warnings.warn(f"License check failed, python package 'docplex' is not installed! Please check 'CPM_cpo.installed()' before attempting to check license.") return False else: try: from docplex.cp.model import CpoModel mdl = CpoModel() mdl.solve(LogVerbosity='Quiet') return True except: return False
def __init__(self, cpm_model=None, subsolver=None): """ Constructor of the native solver object Arguments: cpm_model: Model(), a CPMpy Model() (optional) subsolver: str, name of a subsolver (optional) """ if not self.installed(): raise Exception("CPM_cpo: Install the python package 'docplex'") if not self.license_ok(): raise Exception("You need to install the CPLEX Optimization Studio to use this solver. " "Also make sure that the binary is in your path") docp = self.get_docp() assert subsolver is None self.cpo_model = docp.model.CpoModel() super().__init__(name="cpo", cpm_model=cpm_model)
[docs] def solve(self, time_limit=None, **kwargs): """ Call the CP Optimizer solver Arguments: time_limit (float, optional): maximum solve time in seconds kwargs: any keyword argument, sets parameters of solver object Arguments that correspond to solver parameters: ============================= ============ Argument Description ============================= ============ LogVerbosity Determines the verbosity of the search log. Choose a value from ['Quiet', 'Terse', 'Normal', 'Verbose']. Default value is 'Quiet'. OptimalityTolerance This parameter sets an absolute tolerance on the objective value for optimization models. The value is a positive float. Default value is 1e-09. RelativeOptimalityTolerance This parameter sets a relative tolerance on the objective value for optimization models. The optimality of a solution is proven if either of the two parameters' criteria is fulfilled. Presolve This parameter controls the presolve of the model to produce more compact formulations and to achieve more domain reduction. Possible values for this parameter are On (presolve is activated) and Off (presolve is deactivated). The value is a symbol in ['On', 'Off']. Default value is 'On'. Workers This parameter sets the number of workers to run in parallel to solve your model. The value is a positive integer. Default value is Auto. (Auto = use all available CPU cores) ============================= ============ All solver parameters are documented here: https://ibmdecisionoptimization.github.io/docplex-doc/cp/docplex.cp.parameters.py.html#docplex.cp.parameters.CpoParameters """ # ensure all vars are known to solver self.solver_vars(list(self.user_vars)) # call the solver, with parameters if 'LogVerbosity' not in kwargs: kwargs['LogVerbosity'] = 'Quiet' # set time limit if time_limit is not None and time_limit <= 0: raise ValueError("Time limit must be positive") self.cpo_result = self.cpo_model.solve(TimeLimit=time_limit, **kwargs) # new status, translate runtime self.cpo_status = self.cpo_result.get_solve_status() self.cpm_status = SolverStatus(self.name) self.cpm_status.runtime = self.cpo_result.get_solve_time() # wallclock time in (float) seconds # translate solver exit status to CPMpy exit status if self.cpo_status == "Feasible": self.cpm_status.exitstatus = ExitStatus.FEASIBLE elif self.cpo_status == "Infeasible": self.cpm_status.exitstatus = ExitStatus.UNSATISFIABLE elif self.cpo_status == "Unknown": # can happen when timeout is reached... self.cpm_status.exitstatus = ExitStatus.UNKNOWN elif self.cpo_status == "Optimal": self.cpm_status.exitstatus = ExitStatus.OPTIMAL elif self.cpo_status == "JobFailed": self.cpm_status.exitstatus = ExitStatus.ERROR elif self.cpo_status == "JobAborted": self.cpm_status.exitstatus = ExitStatus.NOT_RUN else: # another? raise NotImplementedError(self.cpo_status) # a new status type was introduced, please report on GitHub # True/False depending on self.cpm_status has_sol = self._solve_return(self.cpm_status) # translate solution values (of user specified variables only) if has_sol: # fill in variable values for cpm_var in self.user_vars: sol_var = self.solver_var(cpm_var) if isinstance(cpm_var,_BoolVarImpl): # because cp optimizer does not have boolean variables we use an integer var x with domain 0, 1 # and then replace a boolvar by x == 1 sol_var = sol_var.children[0] cpm_var._value = bool(self.cpo_result.get_var_solution(sol_var).get_value()) else: cpm_var._value = self.cpo_result.get_var_solution(sol_var).get_value() # translate objective, for optimisation problems only if self.has_objective(): self.objective_value_ = self.cpo_result.get_objective_value() else: # clear values of variables for cpm_var in self.user_vars: cpm_var.clear() return has_sol
[docs] def solveAll(self, display=None, time_limit=None, solution_limit=None, call_from_model=False, **kwargs): """ A shorthand to (efficiently) compute all (optimal) solutions, map them to CPMpy and optionally display the solutions. If the problem is an optimization problem, returns only optimal solutions. Arguments: display: either a list of CPMpy expressions, OR a callback function, called with the variables after value-mapping. Default is None, meaning nothing is displayed. time_limit: Stop after this many seconds. Default is None. solution_limit: Stop after this many solutions. Default is None. call_from_model: Whether the method is called from a CPMpy Model instance or not. **kwargs: Any other keyword arguments. Returns: int: Number of solutions found. """ docp = self.get_docp() solution_count = 0 while solution_limit is None or solution_count < solution_limit: cpo_result = self.solve(time_limit=time_limit, **kwargs) if not cpo_result: break solution_count += 1 if self.has_objective(): # only find all optimal solutions self.cpo_model.add(self.cpo_model.get_objective_expression().children[0] == self.objective_value_) solvars = [] vals = [] for cpm_var in self.user_vars: sol_var = self.solver_var(cpm_var) cpm_value = cpm_var._value if isinstance(cpm_var, _BoolVarImpl): # because cp optimizer does not have boolean variables we use an integer var x with domain 0, 1 # and then replace a boolvar by x == 1 sol_var = sol_var.children[0] cpm_value = int(cpm_value) solvars.append(sol_var) vals.append(cpm_value) self.cpo_model.add(docp.modeler.forbidden_assignments(solvars, [vals])) if display is not None: if isinstance(display, Expression): print(argval(display)) elif isinstance(display, list): print(argvals(display)) else: display() # callback return solution_count
[docs] def solver_var(self, cpm_var): """ Creates solver variable for cpmpy variable or returns from cache if previously created """ docp = self.get_docp() if is_num(cpm_var): # shortcut, eases posting constraints return cpm_var # special case, negative-bool-view # work directly on var inside the view if isinstance(cpm_var, NegBoolView): return (self.solver_var(cpm_var._bv) == 0) # create if it does not exist if cpm_var not in self._varmap: if isinstance(cpm_var, _BoolVarImpl): # note that a binary var is an integer var with domain (0,1), you cannot do boolean operations on it. # we should add == 1 to turn it into a boolean expression revar = docp.expression.binary_var(str(cpm_var)) == 1 elif isinstance(cpm_var, _IntVarImpl): revar = docp.expression.integer_var(min=cpm_var.lb, max=cpm_var.ub, name=str(cpm_var)) else: raise NotImplementedError("Not a known var {}".format(cpm_var)) self._varmap[cpm_var] = revar self.cpo_model.add(revar >= cpm_var.lb) # ensure the model also has the variable # return from cache return self._varmap[cpm_var]
[docs] def objective(self, expr, minimize=True): """ Post the given expression to the solver as objective to minimize/maximize ``objective()`` can be called multiple times, only the last one is stored .. note:: technical side note: any constraints created during conversion of the objective are permanently posted to the solver """ dom = self.get_docp().modeler if self.has_objective(): self.cpo_model.remove(self.cpo_model.get_objective_expression()) expr = self._cpo_expr(expr) if minimize: self.cpo_model.add(dom.minimize(expr)) else: self.cpo_model.add(dom.maximize(expr))
[docs] def has_objective(self): return self.cpo_model.get_objective() is not None
# `add()` first calls `transform()`
[docs] def transform(self, cpm_expr): """ Transform arbitrary CPMpy expressions to constraints the solver supports Implemented through chaining multiple solver-independent **transformation functions** from the `cpmpy/transformations/` directory. See the :ref:`Adding a new solver` docs on readthedocs for more information. :param cpm_expr: CPMpy expression, or list thereof :type cpm_expr: Expression or list of Expression :return: list of Expression """ # apply transformations cpm_cons = toplevel_list(cpm_expr) cpm_cons = no_partial_functions(cpm_cons, safen_toplevel=frozenset({})) # count is only supported with a constant to be counted, so we decompose supported = {"alldifferent", 'inverse', 'nvalue', 'element', 'table', 'indomain', "negative_table", "gcc", 'max', 'min', 'abs', 'cumulative', 'no_overlap'} supported_reified = {"alldifferent", 'table', 'indomain', "negative_table"} # global functions by default here cpm_cons = decompose_in_tree(cpm_cons, supported=supported, supported_reified=supported_reified) # no flattening required return cpm_cons
[docs] def add(self, cpm_expr): """ Eagerly add a constraint to the underlying solver. Any CPMpy expression given is immediately transformed (through `transform()`) and then posted to the solver in this function. This can raise 'NotImplementedError' for any constraint not supported after transformation The variables used in expressions given to add are stored as 'user variables'. Those are the only ones the user knows and cares about (and will be populated with a value after solve). All other variables are auxiliary variables created by transformations. :param cpm_expr: CPMpy expression, or list thereof :type cpm_expr: Expression or list of Expression :return: self """ # add new user vars to the set get_variables(cpm_expr, collect=self.user_vars) for cpm_con in self.transform(cpm_expr): # translate each expression tree, then post straight away cpo_con = self._cpo_expr(cpm_con) self.cpo_model.add(cpo_con) return self
__add__ = add # avoid redirect in superclass def _cpo_expr(self, cpm_con): """ CP Optimizer supports nested expressions, so we recursively translate our expressions to theirs. Accepts single constraints or a list thereof, return type changes accordingly. """ dom = self.get_docp().modeler if is_any_list(cpm_con): # arguments can be lists return [self._cpo_expr(con) for con in cpm_con] elif isinstance(cpm_con, BoolVal): return cpm_con.args[0] elif is_num(cpm_con): return cpm_con elif isinstance(cpm_con, _NumVarImpl): return self.solver_var(cpm_con) # Operators: base (bool), lhs=numexpr, lhs|rhs=boolexpr (reified ->) elif isinstance(cpm_con, Operator): arity, _ = Operator.allowed[cpm_con.name] # 'and'/n, 'or'/n, '->'/2 if cpm_con.name == 'and': return dom.logical_and(self._cpo_expr(cpm_con.args)) elif cpm_con.name == 'or': return dom.logical_or(self._cpo_expr(cpm_con.args)) elif cpm_con.name == '->': return dom.if_then(*self._cpo_expr(cpm_con.args)) elif cpm_con.name == 'not': return dom.logical_not(self._cpo_expr(cpm_con.args[0])) # 'sum'/n, 'wsum'/2 elif cpm_con.name == 'sum': return dom.sum(self._cpo_expr(cpm_con.args)) elif cpm_con.name == 'wsum': w = cpm_con.args[0] x = self._cpo_expr(cpm_con.args[1]) return dom.scal_prod(w,x) # 'sub'/2, 'mul'/2, 'div'/2, 'pow'/2, 'm2od'/2 elif arity == 2 or cpm_con.name == "mul": assert len(cpm_con.args) == 2, "Currently only support multiplication with 2 vars" x, y = self._cpo_expr(cpm_con.args) if cpm_con.name == 'sub': return x - y elif cpm_con.name == "mul": return x * y elif cpm_con.name == "div": return x // y elif cpm_con.name == "pow": return x ** y elif cpm_con.name == "mod": return x % y # '-'/1 elif cpm_con.name == "-": return -self._cpo_expr(cpm_con.args[0]) else: raise NotImplementedError(f"Operator {cpm_con} not (yet) implemented for CP Optimizer, " f"please report on github if you need it") # Comparisons (just translate the subexpressions and re-post) elif isinstance(cpm_con, Comparison): lhs, rhs = self._cpo_expr(cpm_con.args) # post the comparison return eval_comparison(cpm_con.name, lhs, rhs) # rest: base (Boolean) global constraints elif isinstance(cpm_con, GlobalConstraint): if cpm_con.name == 'alldifferent': return dom.all_diff(self._cpo_expr(cpm_con.args)) elif cpm_con.name == "gcc": vars, vals, occ = self._cpo_expr(cpm_con.args) cons = [dom.distribute(occ, vars, vals)] if cpm_con.closed: # not supported by cpo, so post separately cons += [dom.allowed_assignments(v, vals) for v in vars] return cons elif cpm_con.name == "inverse": x, y = self._cpo_expr(cpm_con.args) return dom.inverse(x, y) elif cpm_con.name == "table": arr, table = self._cpo_expr(cpm_con.args) return dom.allowed_assignments(arr, table) elif cpm_con.name == "indomain": expr, arr = self._cpo_expr(cpm_con.args) return dom.allowed_assignments(expr, arr) elif cpm_con.name == "negative_table": arr, table = self._cpo_expr(cpm_con.args) return dom.forbidden_assignments(arr, table) elif cpm_con.name == "cumulative": start, dur, end, height, capacity = cpm_con.args docp = self.get_docp() total_usage = [] cons = [] for s, d, e, h in zip(start, dur, end, height): cpo_s, cpo_d, cpo_e, cpo_h = self.solver_vars([s, d, e, h]) task = docp.expression.interval_var(start=get_bounds(s), size=get_bounds(d), end=get_bounds(e)) task_height = dom.pulse(task, get_bounds(h)) cons += [dom.start_of(task) == cpo_s, dom.size_of(task) == cpo_d, dom.end_of(task) == cpo_e] cons += [cpo_h == dom.height_at_start(task, task_height)] total_usage.append(task_height) cons += [dom.sum(total_usage) <= self.solver_var(capacity)] return cons elif cpm_con.name == "no_overlap": start, dur, end = cpm_con.args docp = self.get_docp() cons = [] tasks = [] for s, d, e in zip(start, dur, end): cpo_s, cpo_d, cpo_e = self.solver_vars([s, d, e]) task = docp.expression.interval_var(start=get_bounds(s), size=get_bounds(d), end=get_bounds(e)) tasks.append(task) cons += [dom.start_of(task) == cpo_s, dom.size_of(task) == cpo_d, dom.end_of(task) == cpo_e] return cons + [dom.no_overlap(tasks)] # a direct constraint, make with cpo (will be posted to it by calling function) elif isinstance(cpm_con, DirectConstraint): return cpm_con.callSolver(self, self.cpo_model) else: try: cpo_global = getattr(dom, cpm_con.name) return cpo_global(self._cpo_expr(cpm_con.args)) # works if our naming is the same except AttributeError: raise ValueError(f"Global constraint {cpm_con} not known in CP Optimizer, please report on github.") elif isinstance(cpm_con, GlobalFunction): if cpm_con.name == "element": return dom.element(*self._cpo_expr(cpm_con.args)) elif cpm_con.name == "min": return dom.min(self._cpo_expr(cpm_con.args)) elif cpm_con.name == "max": return dom.max(self._cpo_expr(cpm_con.args)) elif cpm_con.name == "abs": return dom.abs(self._cpo_expr(cpm_con.args)[0]) elif cpm_con.name == "nvalue": return dom.count_different(self._cpo_expr(cpm_con.args)) raise NotImplementedError("CP Optimizer: constraint not (yet) supported", cpm_con)