#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## choco.py
##
"""
Interface to Choco solver's Python API.
Choco-solver is an open-source Java library for Constraint Programming (see https://choco-solver.org/).
It comes with many features such as various types of variables, various state-of-the-art constraints, various search strategies, etc.
Always use :func:`cp.SolverLookup.get("choco") <cpmpy.solvers.utils.SolverLookup.get>` to instantiate the solver object.
============
Installation
============
Requires that the 'pychoco' python package is installed:
.. code-block:: console
$ pip install pychoco
Detailed installation instructions available at:
- https://pypi.org/project/pychoco/
- https://pychoco.readthedocs.io/en/latest/
The rest of this documentation is for advanced users.
===============
List of classes
===============
.. autosummary::
:nosignatures:
CPM_choco
==============
Module details
==============
"""
import time
import numpy as np
import warnings
import pkg_resources
from pkg_resources import VersionConflict
from ..transformations.normalize import toplevel_list
from .solver_interface import SolverInterface, SolverStatus, ExitStatus
from ..expressions.core import Expression, Comparison, Operator, BoolVal
from ..expressions.globalconstraints import DirectConstraint
from ..expressions.variables import _NumVarImpl, _IntVarImpl, _BoolVarImpl, NegBoolView, intvar
from ..expressions.globalconstraints import GlobalConstraint
from ..expressions.utils import is_num, is_int, is_boolexpr, is_any_list, get_bounds, argval, argvals, STAR
from ..transformations.decompose_global import decompose_in_tree
from ..transformations.get_variables import get_variables
from ..transformations.flatten_model import flatten_constraint
from ..transformations.comparison import only_numexpr_equality
from ..transformations.linearize import canonical_comparison
from ..transformations.safening import no_partial_functions
from ..transformations.reification import reify_rewrite
from ..exceptions import ChocoBoundsException, NotSupportedError
[docs]class CPM_choco(SolverInterface):
"""
Interface to the Choco solver python API
Creates the following attributes (see parent constructor for more):
- ``chc_model`` : the pychoco.Model() created by _model()
- ``chc_solver`` : the choco Model().get_solver() instance used in solve()
Documentation of the solver's own Python API:
- https://pypi.org/project/pychoco/
- https://pychoco.readthedocs.io/en/latest/
"""
[docs] @staticmethod
def supported():
# try to import the package
try:
# check if pychoco is installed
import pychoco as chc
# check it's the correct version
# CPMPy uses features only available from 0.2.1
pkg_resources.require("pychoco>=0.2.1")
return True
except ModuleNotFoundError:
return False
except VersionConflict: # unsupported version of pychoco
warnings.warn(f"CPMpy uses features only available from Pychoco version 0.2.1, "
f"but you have version {pkg_resources.get_distribution('pychoco').version}.")
return False
except Exception as e:
raise e
def __init__(self, cpm_model=None, subsolver=None):
"""
Constructor of the native solver object
Requires a CPMpy model as input, and will create the corresponding
choco model and solver object (chc_model and chc_solver)
chc_model and chc_solver can both be modified externally before
calling solve(), a prime way to use more advanced solver features
Arguments:
cpm_model: Model(), a CPMpy Model() (optional)
subsolver: None
"""
if not self.supported():
raise Exception("CPM_choco: Install the python package 'pychoco' to use this solver interface.")
import pychoco as chc
assert (subsolver is None), "Choco does not support any subsolver"
# initialise the native solver objects
self.chc_model = chc.Model()
# for the objective
self.obj = None
self.minimize_obj = None
self.helper_var = None
# for solving with assumption variables, TO-CHECK
# initialise everything else and post the constraints/objective
super().__init__(name="choco", cpm_model=cpm_model)
@property
def native_model(self):
"""
Returns the solver's underlying native model (for direct solver access).
"""
return self.chc_model
[docs] def solve(self, time_limit=None, **kwargs):
"""
Call the Choco solver
Arguments:
time_limit (float, optional): maximum solve time in seconds
kwargs: any keyword argument, sets parameters of solver object
"""
# ensure all vars are known to solver
self.solver_vars(list(self.user_vars))
# call the solver, with parameters
self.chc_solver = self.chc_model.get_solver()
start = time.time()
# set time limit
if time_limit is not None:
if time_limit <= 0:
raise ValueError("Time limit must be positive")
self.chc_solver.limit_time(str(time_limit) + "s")
if self.has_objective():
sol = self.chc_solver.find_optimal_solution(maximize= not self.minimize_obj,
objective=self.solver_var(self.obj),
**kwargs)
else:
sol = self.chc_solver.find_solution()
end = time.time()
# new status, get runtime
self.cpm_status = SolverStatus(self.name)
self.cpm_status.runtime = end - start
# translate exit status
# A) Found a solution
if sol is not None:
# COP
if self.has_objective():
if time_limit is None or self.cpm_status.runtime < time_limit: # solved to optimality
self.cpm_status.exitstatus = ExitStatus.OPTIMAL
else: # solved, but optimality not proven
self.cpm_status.exitstatus = ExitStatus.FEASIBLE
# CSP
else:
self.cpm_status.exitstatus = ExitStatus.FEASIBLE
# B) Found unsat
elif time_limit is None or self.cpm_status.runtime < time_limit: # proven unsat
self.cpm_status.exitstatus = ExitStatus.UNSATISFIABLE
# C) Timeout
else:
self.cpm_status.exitstatus = ExitStatus.UNKNOWN # can happen when timeout is reached...
# True/False depending on self.chc_status
has_sol = sol is not None
# translate solution values (of user specified variables only)
self.objective_value_ = None
if has_sol:
# fill in variable values
for cpm_var in self.user_vars:
value = sol.get_int_val(self.solver_var(cpm_var))
if isinstance(cpm_var, _BoolVarImpl):
cpm_var._value = bool(value)
else:
cpm_var._value = value
# translate objective
if self.has_objective():
self.objective_value_ = sol.get_int_val(self.solver_var(self.obj))
else: # clear values of variables
for cpm_var in self.user_vars:
cpm_var._value = None
return has_sol
[docs] def solveAll(self, display=None, time_limit=None, solution_limit=None, call_from_model=False, **kwargs):
"""
Compute all (optimal) solutions, map them to CPMpy and optionally display the solutions.
Arguments:
display: either a list of CPMpy expressions, OR a callback function, called with the variables after value-mapping
default/None: nothing displayed
solution_limit: stop after this many solutions (default: None)
time_limit (float, optional): maximum solve time in seconds
Returns:
number of solutions found
"""
# ensure all vars are known to solver
self.solver_vars(list(self.user_vars))
# create solver object
self.chc_solver = self.chc_model.get_solver()
# set time limit (if given)
if time_limit is not None:
self.chc_solver.limit_time(str(time_limit) + "s")
start = time.time()
if self.has_objective():
sols = self.chc_solver.find_all_optimal_solutions(maximize=not self.minimize_obj,
solution_limit=solution_limit,
objective=self.solver_var(self.obj),
**kwargs)
else:
sols = self.chc_solver.find_all_solutions(solution_limit=solution_limit)
end = time.time()
# new status, get runtime
self.cpm_status = SolverStatus(self.name)
self.cpm_status.runtime = end - start
if len(sols): # solutions found
if (len(sols) == solution_limit): # matched the set limit (if given)
self.cpm_status.exitstatus = ExitStatus.FEASIBLE
elif (time_limit is None) or (self.cpm_status.runtime < time_limit): # found all solutions
self.cpm_status.exitstatus = ExitStatus.OPTIMAL
else: # reached timeout
self.cpm_status.exitstatus = ExitStatus.FEASIBLE
else: # no solutions found
if (time_limit is None) or (self.cpm_status.runtime < time_limit): # unsat problem
self.cpm_status.exitstatus = ExitStatus.UNSATISFIABLE
else: # timeout
self.cpm_status.exitstatus = ExitStatus.UNKNOWN
# if no solutions, clear values of variables
if len(sols) == 0:
for var in self.user_vars:
var._value = None
# display if needed
if display is not None:
for sol in sols:
# map the solution to user vars
for cpm_var in self.user_vars:
value = sol.get_int_val(self.solver_var(cpm_var))
if isinstance(cpm_var, _BoolVarImpl):
cpm_var._value = bool(value)
else:
cpm_var._value = value
# print the desired display
if isinstance(display, Expression):
print(argval(display))
elif isinstance(display, list):
print(argvals(display))
else:
display() # callback
return len(sols)
[docs] def solver_var(self, cpm_var):
"""
Creates solver variable for cpmpy variable
or returns from cache if previously created
"""
if is_num(cpm_var): # shortcut, eases posting constraints
if not is_int(cpm_var):
raise ValueError(f"Choco only accepts integer constants, got {cpm_var} of type {type(cpm_var)}")
if cpm_var < -2147483646 or cpm_var > 2147483646:
raise ChocoBoundsException(
"Choco does not accept integer literals with bounds outside of range (-2147483646..2147483646)")
return int(cpm_var)
# special case, negative-bool-view
# work directly on var inside the view
if isinstance(cpm_var, NegBoolView):
return self.chc_model.bool_not_view(self.solver_var(cpm_var._bv))
# create if it does not exist
if cpm_var not in self._varmap:
if isinstance(cpm_var, _BoolVarImpl):
revar = self.chc_model.boolvar(name=str(cpm_var.name))
elif isinstance(cpm_var, _IntVarImpl):
if cpm_var.lb < -2147483646 or cpm_var.ub > 2147483646:
raise ChocoBoundsException(
"Choco does not accept variables with bounds outside of range (-2147483646..2147483646)")
revar = self.chc_model.intvar(cpm_var.lb, cpm_var.ub, name=str(cpm_var.name))
else:
raise NotImplementedError("Not a known var {}".format(cpm_var))
self._varmap[cpm_var] = revar
return self._varmap[cpm_var]
[docs] def objective(self, expr, minimize):
"""
Post the given expression to the solver as objective to minimize/maximize
Arguments:
expr: Expression, the CPMpy expression that represents the objective function
minimize: Bool, whether it is a minimization problem (True) or maximization problem (False)
``objective()`` can be called multiple times, only the last one is stored
.. note::
technical side note: constraints created during conversion of the objective
are premanently posted to the solver. Choco accepts variables to maximize or minimize
so it is needed to post constraints and create auxiliary variables
"""
# make objective function non-nested
obj_var = intvar(*get_bounds(expr))
self += obj_var == expr
self.obj = obj_var
self.minimize_obj = minimize # Choco has as default to maximize
[docs] def has_objective(self):
return self.obj is not None
def _to_var(self, val):
from pychoco.variables.intvar import IntVar
if is_int(val):
# Choco accepts only int32, not int64
if val < -2147483646 or val > 2147483646:
raise ChocoBoundsException(
"Choco does not accept integer literals with bounds outside of range (-2147483646..2147483646)")
return self.chc_model.intvar(int(val), int(val)) # convert to "variable"
elif isinstance(val, _NumVarImpl):
return self.solver_var(val) # use variable
else:
raise ValueError(f"Cannot convert {val} of type {type(val)} to Choco variable, expected int or NumVarImpl")
# elif isinstance(val, IntVar):
# return val
# return None
def _to_vars(self, vals):
if is_any_list(vals):
return [self._to_vars(v) for v in vals]
return self._to_var(vals)
[docs] def add(self, cpm_expr):
"""
Eagerly add a constraint to the underlying solver.
Any CPMpy expression given is immediately transformed (through `transform()`)
and then posted to the solver in this function.
This can raise 'NotImplementedError' for any constraint not supported after transformation
The variables used in expressions given to add are stored as 'user variables'. Those are the only ones
the user knows and cares about (and will be populated with a value after solve). All other variables
are auxiliary variables created by transformations.
:param cpm_expr: CPMpy expression, or list thereof
:type cpm_expr: Expression or list of Expression
:return: self
"""
# add new user vars to the set
get_variables(cpm_expr, collect=self.user_vars)
# ensure all vars are known to solver
# transform and post the constraints
for con in self.transform(cpm_expr):
c = self._get_constraint(con)
if c is not None: # Reification constraints are not posted
c.post()
return self
__add__ = add # avoid redirect in superclass
def _get_constraint(self, cpm_expr):
"""
Get a solver's constraint by a supported CPMpy constraint
:param cpm_expr: CPMpy expression
:type cpm_expr: Expression
"""
# Operators: base (bool), lhs=numexpr, lhs|rhs=boolexpr (reified ->)
if isinstance(cpm_expr, Operator):
# 'and'/n, 'or'/n, '->'/2
if cpm_expr.name == 'and':
return self.chc_model.and_(self.solver_vars(cpm_expr.args))
elif cpm_expr.name == 'or':
return self.chc_model.or_(self.solver_vars(cpm_expr.args))
elif cpm_expr.name == "->":
cond, subexpr = cpm_expr.args
if isinstance(cond, _BoolVarImpl) and isinstance(subexpr, _BoolVarImpl):
return self.chc_model.or_(self.solver_vars([~cond, subexpr]))
elif isinstance(cond, _BoolVarImpl):
return self._get_constraint(subexpr).implied_by(self.solver_var(cond))
elif isinstance(subexpr, _BoolVarImpl):
return self._get_constraint(cond).implies(self.solver_var(subexpr))
else:
ValueError(f"Unexpected implication: {cpm_expr}")
else:
raise NotImplementedError("Not a known supported Choco Operator '{}' {}".format(
cpm_expr.name, cpm_expr))
# Comparisons: both numeric and boolean ones
# numexpr `comp` bvar|const
elif isinstance(cpm_expr, Comparison):
lhs, rhs = cpm_expr.args
op = "=" if cpm_expr.name == "==" else cpm_expr.name
if is_boolexpr(lhs) and is_boolexpr(rhs): #boolean equality -- Reification
if isinstance(lhs, _BoolVarImpl) and isinstance(lhs, _BoolVarImpl):
return self.chc_model.all_equal(self.solver_vars([lhs, rhs]))
elif isinstance(lhs, _BoolVarImpl):
return self._get_constraint(rhs).reify_with(self.solver_var(lhs))
elif isinstance(rhs, _BoolVarImpl):
return self._get_constraint(lhs).reify_with(self.solver_var(rhs))
else:
raise ValueError(f"Unexpected reification {cpm_expr}")
elif isinstance(lhs, _NumVarImpl):
return self.chc_model.arithm(self.solver_var(lhs), op, self.solver_var(rhs))
elif isinstance(lhs, Operator) and lhs.name in {'sum','wsum','sub'}:
if lhs.name == 'sum':
return self.chc_model.sum(self.solver_vars(lhs.args), op, self.solver_var(rhs))
elif lhs.name == "sub":
a, b = self.solver_vars(lhs.args)
return self.chc_model.arithm(a, "-", b, op, self.solver_var(rhs))
elif lhs.name == 'wsum':
wgt, x = lhs.args
w = np.array(wgt).tolist()
x = self.solver_vars(lhs.args[1])
return self.chc_model.scalar(x, w, op, self.solver_var(rhs))
elif cpm_expr.name == '==':
chc_rhs = self._to_var(rhs) # result is always var
all_vars = {"min", "max", "abs", "div", "mod", "element", "nvalue"}
if lhs.name in all_vars:
chc_args = self._to_vars(lhs.args)
if lhs.name == 'min': # min(vars) = var
return self.chc_model.min(chc_rhs, chc_args)
elif lhs.name == 'max': # max(vars) = var
return self.chc_model.max(chc_rhs, chc_args)
elif lhs.name == 'abs': # abs(var) = var
assert len(chc_args) == 1, f"Expected one argument of abs constraint, but got {chc_args}"
return self.chc_model.absolute(chc_rhs, chc_args[0])
elif lhs.name == "div": # var / var = var
dividend, divisor = chc_args
return self.chc_model.div(dividend, divisor, chc_rhs)
elif lhs.name == 'mod': # var % var = var
dividend, divisor = chc_args
return self.chc_model.mod(dividend, divisor, chc_rhs)
elif lhs.name == "element": # varsvar[var] = var
# TODO: actually, Choco also supports ints[var] = var, but no mix of var and int in array
arr, idx = chc_args
return self.chc_model.element(chc_rhs, arr, idx)
elif lhs.name == "nvalue": # nvalue(vars) = var
# TODO: should look into leaving nvalue <= arg so can post atmost_nvalues here
return self.chc_model.n_values(chc_args, chc_rhs)
elif lhs.name == 'count': # count(vars, var/int) = var
arr, val = lhs.args
return self.chc_model.count(self.solver_var(val), self._to_vars(arr), chc_rhs)
elif lhs.name == "among":
arr, vals = lhs.args
return self.chc_model.among(chc_rhs, self._to_vars(arr), vals)
elif lhs.name == 'mul': # var * var/int = var/int
a,b = self.solver_vars(lhs.args)
if isinstance(a, int):
a,b = b,a # int arg should always be second
return self.chc_model.times(a,b, self.solver_var(rhs))
elif lhs.name == 'pow': # var ^ int = var
chc_rhs = self._to_var(rhs)
return self.chc_model.pow(*self.solver_vars(lhs.args),chc_rhs)
raise NotImplementedError(
"Not a known supported Choco left-hand-side '{}' {}".format(lhs.name, cpm_expr))
# base (Boolean) global constraints
elif isinstance(cpm_expr, GlobalConstraint):
# many globals require all variables as arguments
if cpm_expr.name in {"alldifferent", "alldifferent_except0", "allequal", "circuit", "inverse","increasing","decreasing","strictly_increasing","strictly_decreasing","lex_lesseq","lex_less"}:
chc_args = self._to_vars(cpm_expr.args)
if cpm_expr.name == 'alldifferent':
return self.chc_model.all_different(chc_args)
elif cpm_expr.name == 'alldifferent_except0':
return self.chc_model.all_different_except_0(chc_args)
elif cpm_expr.name == 'allequal':
return self.chc_model.all_equal(chc_args)
elif cpm_expr.name == "circuit":
return self.chc_model.circuit(chc_args)
elif cpm_expr.name == "inverse":
return self.chc_model.inverse_channeling(*chc_args)
elif cpm_expr.name == "increasing":
return self.chc_model.increasing(chc_args,0)
elif cpm_expr.name == "decreasing":
return self.chc_model.decreasing(chc_args,0)
elif cpm_expr.name == "strictly_increasing":
return self.chc_model.increasing(chc_args,1)
elif cpm_expr.name == "strictly_decreasing":
return self.chc_model.decreasing(chc_args,1)
elif cpm_expr.name in ["lex_lesseq", "lex_less"]:
if cpm_expr.name == "lex_lesseq":
return self.chc_model.lex_less_eq(*chc_args)
return self.chc_model.lex_less(*chc_args)
# Ready for when it is fixed in pychoco (https://github.com/chocoteam/pychoco/issues/30)
# elif cpm_expr.name == "lex_chain_less":
# return self.chc_model.lex_chain_less(chc_args)
# but not all
elif cpm_expr.name == 'table':
assert (len(cpm_expr.args) == 2) # args = [array, table]
array, table = self.solver_vars(cpm_expr.args)
return self.chc_model.table(array, table)
elif cpm_expr.name == 'negative_table':
assert (len(cpm_expr.args) == 2) # args = [array, table]
array, table = self.solver_vars(cpm_expr.args)
return self.chc_model.table(array, table, False)
elif cpm_expr.name == 'short_table':
assert (len(cpm_expr.args) == 2) # args = [array, table]
array, table = cpm_expr.args
table = np.array(table)
table[table == STAR] = np.nan
table = table.astype(float) # nan's require float dtype
# Choco requires a wildcard value not present in dom of args,
# take value lower than anything else
chc_star = int(min(np.nanmin(table), *get_bounds(array)[0]) -1) # should be an int
chc_table = np.nan_to_num(table, nan=chc_star).astype(int).tolist()
return self.chc_model.table(self.solver_vars(array), chc_table, universal_value=chc_star, algo="STR2+")
elif cpm_expr.name == "regular":
from pychoco.objects.automaton.finite_automaton import FiniteAutomaton
array, transitions, start, accepting = cpm_expr.args
for i, (lb, ub) in enumerate(zip(*get_bounds(array))):
if lb < 0 or ub > 65535:
raise NotSupportedError(f"Choco regular only supports variables within domain 0..65535, got {array[i]} with bounds {lb}..{ub}")
# convert to Automaton Choco object
automaton = FiniteAutomaton()
for node, i in cpm_expr.node_map.items(): automaton.add_state()
for src, label, dst in transitions:
automaton.add_transition(cpm_expr.node_map[src], cpm_expr.node_map[dst], label)
automaton.set_initial_state(cpm_expr.node_map[start])
automaton.set_final(*[cpm_expr.node_map[a] for a in accepting])
return self.chc_model.regular(self._to_vars(array), automaton)
elif cpm_expr.name == 'InDomain':
assert len(cpm_expr.args) == 2 # args = [array, list of vals]
expr, table = self.solver_vars(cpm_expr.args)
return self.chc_model.member(expr, table)
elif cpm_expr.name == "cumulative":
start, dur, end, demand, cap = cpm_expr.args
# start, end, demand and cap should be var
start, end, demand, cap = self._to_vars([start, end, demand, cap])
# duration can be var or int
dur = self.solver_vars(dur)
# Create task variables. Choco can create them only one by one
tasks = [self.chc_model.task(s, d, e) for s, d, e in zip(start, dur, end)]
return self.chc_model.cumulative(tasks, demand, cap)
elif cpm_expr.name == "precedence":
return self.chc_model.int_value_precede_chain(self._to_vars(cpm_expr.args[0]), cpm_expr.args[1])
elif cpm_expr.name == "gcc":
vars, vals, occ = cpm_expr.args
return self.chc_model.global_cardinality(self._to_vars(vars), self.solver_vars(vals), self._to_vars(occ), cpm_expr.closed)
else:
raise NotImplementedError(f"Unknown global constraint {cpm_expr}, should be decomposed! If you reach this, please report on github.")
# unlikely base case: Boolean variable
elif isinstance(cpm_expr, _BoolVarImpl):
return self.chc_model.and_([self.solver_var(cpm_expr)])
# unlikely base case: True or False
elif isinstance(cpm_expr, BoolVal):
# Choco does not allow to post True or False. Post "certainly True or False" constraints instead
if cpm_expr.args[0] is True:
return None
else:
if self.helper_var is None:
self.helper_var = self.chc_model.intvar(0, 0)
return self.chc_model.arithm(self.helper_var, "<", 0)
# a direct constraint, pass to solver
elif isinstance(cpm_expr, DirectConstraint):
c = cpm_expr.callSolver(self, self.chc_model)
return c
# else
raise NotImplementedError(cpm_expr) # if you reach this... please report on github