#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## pumpkin.py
##
"""
Interface to Pumpkin's API
Pumpkin is a combinatorial optimisation solver developed by the ConSol Lab at TU Delft.
It is based on the (lazy clause generation) constraint programming paradigm.
(see https://github.com/consol-lab/pumpkin)
Always use :func:`cp.SolverLookup.get("pumpkin") <cpmpy.solvers.utils.SolverLookup.get>` to instantiate the solver object.
===============
Installation
===============
Requires that the 'pumpkin-solver' python package is installed:
.. code-block:: console
$ pip install pumpkin-solver
The rest of this documentation is for advanced users
===============
List of classes
===============
.. autosummary::
:nosignatures:
CPM_pumpkin
==============
Module details
==============
"""
import warnings
from typing import Optional, List
from os.path import join
import numpy as np
from packaging.version import Version
from cpmpy.exceptions import NotSupportedError
from .solver_interface import SolverInterface, SolverStatus, ExitStatus
from ..expressions.core import Expression, Comparison, Operator, BoolVal
from ..expressions.globalconstraints import Cumulative, GlobalConstraint
from ..expressions.variables import _BoolVarImpl, NegBoolView, _IntVarImpl, _NumVarImpl, intvar, boolvar
from ..expressions.utils import is_num, is_any_list, get_bounds
from ..transformations.get_variables import get_variables
from ..transformations.linearize import canonical_comparison
from ..transformations.normalize import toplevel_list
from ..transformations.decompose_global import decompose_in_tree, decompose_objective
from ..transformations.flatten_model import flatten_constraint, get_or_make_var
from ..transformations.comparison import only_numexpr_equality
from ..transformations.reification import reify_rewrite, only_bv_reifies, only_implies
from ..transformations.safening import no_partial_functions, safen_objective
import time
[docs]
class CPM_pumpkin(SolverInterface):
"""
Interface to Pumpkin's API
Creates the following attributes (see parent constructor for more):
- ``pum_solver``: the pumpkin.Model() object
"""
supported_global_constraints = frozenset({"alldifferent", "cumulative", "no_overlap", "table", "negative_table", "InDomain",
"min", "max", "abs", "div", "element"})
supported_reified_global_constraints = frozenset()
[docs]
@staticmethod
def supported():
# try to import the package
try:
import pumpkin_solver as psp
pum_version = CPM_pumpkin.version()
if Version(pum_version) < Version("0.2.2"):
warnings.warn(f"CPMpy uses features only available from Pumpkin version >=0.2.2 "
f"but you have version {pum_version}")
return False
return True
except ModuleNotFoundError:
return False
except Exception as e:
raise e
[docs]
@staticmethod
def version() -> Optional[str]:
"""
Returns the installed version of the solver's Python API.
"""
from importlib.metadata import version, PackageNotFoundError
try:
return version('pumpkin-solver')
except PackageNotFoundError:
return None
def __init__(self, cpm_model=None, subsolver=None):
"""
Constructor of the native solver object
Arguments:
cpm_model: Model(), a CPMpy Model() (optional)
subsolver: None, not used
"""
if not self.supported():
raise ModuleNotFoundError("CPM_pumpkin: Install the python package 'cpmpy[pumpkin]' to use this solver interface.")
from pumpkin_solver import Model
assert subsolver is None
# initialise the native solver object
self.pum_solver = Model()
self.predicate_map = {} # cache predicates for reuse
# for objective
self._objective = None
self.objective_is_min = True
# initialise everything else and post the constraints/objective
super().__init__(name="pumpkin", cpm_model=cpm_model)
@property
def native_model(self):
"""
Returns the solver's underlying native model (for direct solver access).
"""
return self.pum_solver
[docs]
def solve(self, time_limit:Optional[float]=None, prove=False, proof_name="proof.drcp", proof_location=".", assumptions:Optional[List[_BoolVarImpl]]=None):
"""
Call the Pumpkin solver
Arguments:
time_limit (float, optional): maximum solve time in seconds
prove: whether to produce a DRCP proof (.lits file and .drcp proof file).
proof_name: name for the the proof files.
proof_location: location for the proof files (default to current working directory).
assumptions: CPMpy Boolean variables (or their negation) that are assumed to be true.
For repeated solving, and/or for use with :func:`s.get_core() <cpmpy.solvers.pumpkin.CPM_pumpkin.get_core>`: if the model is UNSAT,
`get_core()` returns a small subset of assumption variables that are unsat together.
"""
# Again, I don't know why this is necessary, but the PyO3 modules seem to be a bit wonky.
from pumpkin_solver import BoolExpression as PumpkinBool, IntExpression as PumpkinInt
from pumpkin_solver import SatisfactionResult, SatisfactionUnderAssumptionsResult
from pumpkin_solver.optimisation import OptimisationResult, Direction
# ensure all vars are known to solver
self.solver_vars(list(self.user_vars))
# parse and dispatch the arguments
if time_limit is not None and time_limit < 0:
raise ValueError("Time limit cannot be negative, but got {time_limit}")
kwargs = dict(timeout=time_limit)
if self.has_objective():
assert assumptions is None, "Optimization under assumptions is not supported"
solve_func = self.pum_solver.optimise
kwargs.update(proof=join(proof_location, proof_name) if prove else None,
objective=self.solver_var(self._objective),
direction=Direction.Minimise if self.objective_is_min else Direction.Maximise)
elif assumptions is not None:
assert not prove, "Proof-logging under assumptions is not supported"
pum_assumptions = [self.to_predicate(a, tag=None) for a in assumptions]
self.assump_map = dict(zip(pum_assumptions, assumptions))
solve_func = self.pum_solver.satisfy_under_assumptions
kwargs.update(assumptions=pum_assumptions)
else:
solve_func = self.pum_solver.satisfy
kwargs.update(proof=join(proof_location, proof_name) if prove else None)
self._pum_core = None
start_time = time.time() # when did solving start
# call the solver, with parameters
result = solve_func(**kwargs)
# new status, translate runtime
self.cpm_status = SolverStatus(self.name)
self.cpm_status.runtime = time.time() - start_time
# translate solver exit status to CPMpy exit status
if self.has_objective(): # check result after optimisation
if isinstance(result, OptimisationResult.Optimal):
self.cpm_status.exitstatus = ExitStatus.OPTIMAL
elif isinstance(result, OptimisationResult.Satisfiable):
self.cpm_status.exitstatus = ExitStatus.FEASIBLE
elif isinstance(result, OptimisationResult.Unsatisfiable):
self.cpm_status.exitstatus = ExitStatus.UNSATISFIABLE
elif isinstance(result, OptimisationResult.Unknown):
self.cpm_status.exitstatus = ExitStatus.UNKNOWN
else:
raise ValueError(f"Unknown Pumpkin-result: {result} of type {type(result)}, please report on github...")
elif assumptions is not None: # check result under assumptions
if isinstance(result, SatisfactionUnderAssumptionsResult.Satisfiable):
self.cpm_status.exitstatus = ExitStatus.FEASIBLE
elif isinstance(result, SatisfactionUnderAssumptionsResult.Unsatisfiable):
self.cpm_status.exitstatus = ExitStatus.UNSATISFIABLE
self._pum_core = [] # empty core, no required assumptions to prove UNSAT
elif isinstance(result, SatisfactionUnderAssumptionsResult.UnsatisfiableUnderAssumptions):
self.cpm_status.exitstatus = ExitStatus.UNSATISFIABLE
self._pum_core = result._0
elif isinstance(result, SatisfactionUnderAssumptionsResult.Unknown):
self.cpm_status.exitstatus = ExitStatus.UNKNOWN
else:
raise ValueError(f"Unknown Pumpkin-result: {result} of type {type(result)}, please report on github...")
else: # satisfaction result without assumptions
if isinstance(result, SatisfactionResult.Satisfiable):
self.cpm_status.exitstatus = ExitStatus.FEASIBLE
elif isinstance(result, SatisfactionResult.Unsatisfiable):
self.cpm_status.exitstatus = ExitStatus.UNSATISFIABLE
elif isinstance(result, SatisfactionResult.Unknown):
self.cpm_status.exitstatus = ExitStatus.UNKNOWN
else:
raise ValueError(f"Unknown Pumpkin-result: {result} of type {type(result)}, please report on github...")
# translate solution (of user vars only)
has_sol = self._solve_return(self.cpm_status)
if has_sol:
solution = result._0
# fill in variable values
for cpm_var in self.user_vars:
sol_var = self.solver_var(cpm_var)
if isinstance(sol_var, PumpkinInt):
cpm_var._value = solution.int_value(sol_var)
elif isinstance(sol_var, PumpkinBool):
cpm_var._value = solution.bool_value(sol_var)
else:
raise NotSupportedError("Only boolean and integer variables are supported.")
# translate solution values
if self.has_objective():
self.objective_value_ = solution.int_value(self.solver_var(self._objective))
else: # wipe results
for cpm_var in self.user_vars:
cpm_var._value = None
return has_sol
[docs]
def solver_var(self, cpm_var):
"""
Creates solver variable for cpmpy variable
or returns from cache if previously created
"""
if is_num(cpm_var):
return cpm_var
# special case, negative-bool-view
# work directly on var inside the view
if isinstance(cpm_var, NegBoolView):
return self.solver_var(cpm_var._bv).negate()
# create if it does not exist
if isinstance(cpm_var, _NumVarImpl):
if cpm_var not in self._varmap:
if isinstance(cpm_var, _BoolVarImpl):
revar = self.pum_solver.new_boolean_variable(name=str(cpm_var))
elif isinstance(cpm_var, _IntVarImpl):
revar = self.pum_solver.new_integer_variable(cpm_var.lb, cpm_var.ub, name=str(cpm_var))
else:
raise NotImplementedError("Not a known var {}".format(cpm_var))
self._varmap[cpm_var] = revar
# return from cache
return self._varmap[cpm_var]
raise ValueError(f"Not a known var {cpm_var}")
[docs]
def objective(self, expr, minimize=True):
"""
Post the given expression to the solver as objective to minimize/maximize
Arguments:
expr: Expression, the CPMpy expression that represents the objective function
minimize: Bool, whether it is a minimization problem (True) or maximization problem (False)
'objective()' can be called multiple times, only the last one is stored
.. note::
technical side note: any constraints created during conversion of the objective
are premanently posted to the solver
"""
# save user variables
get_variables(expr, self.user_vars)
# transform objective
obj, safe_cons = safen_objective(expr)
obj, decomp_cons = decompose_objective(obj,
supported=self.supported_global_constraints,
supported_reified=self.supported_reified_global_constraints,
csemap=self._csemap)
obj_var, obj_cons = get_or_make_var(obj) # do not pass csemap here, we will still transform obj_var == obj...
if expr.is_bool():
ivar = intvar(0,1)
obj_cons += [ivar == obj_var]
obj_var = ivar
self.add(decomp_cons + obj_cons)
# save objective function
self._objective = obj_var
self.objective_is_min = minimize
[docs]
def has_objective(self):
return self._objective is not None
# `__add__()` first calls `transform()`
[docs]
def to_predicate(self, cpm_expr, tag=None):
"""
Convert a CPMpy expression to a Pumpkin predicate (comparison with constant)
"""
from pumpkin_solver import Comparator, Predicate
if isinstance(cpm_expr, _BoolVarImpl):
if isinstance(cpm_expr, NegBoolView):
lhs, comp, rhs = cpm_expr._bv, Comparator.LessThanOrEqual, 0
else:
lhs, comp, rhs = cpm_expr, Comparator.GreaterThanOrEqual, 1
elif isinstance(cpm_expr, Comparison):
lhs, rhs = cpm_expr.args
assert is_num(rhs), "rhs of comparison must be a constant to be a predicate"
if isinstance(lhs, Operator): # can be sum with single arg
if lhs.name == "sum" and len(lhs.args) == 1:
lhs = lhs.args[0]
elif lhs.name == "wsum" and len(lhs.args[0]) == 1:
lhs = lhs.args[0][0] * lhs.args[1][0]
elif lhs.name == "mul" and is_num(lhs.args[0]):
lhs = lhs.args[0] * lhs.args[1]
else:
raise ValueError(f"Lhs of predicate should be a sum with 1 argument, wsum with 1 arg, or mul with const, but got {lhs}")
if cpm_expr.name == "==": comp = Comparator.Equal
if cpm_expr.name == "<=": comp = Comparator.LessThanOrEqual
if cpm_expr.name == ">=": comp = Comparator.GreaterThanOrEqual
if cpm_expr.name == "!=": comp = Comparator.NotEqual
if cpm_expr.name == "<": comp, rhs = Comparator.LessThanOrEqual, rhs - 1
if cpm_expr.name == ">": comp, rhs = Comparator.GreaterThanOrEqual, rhs + 1
else:
raise ValueError(f"Cannot convert CPMpy expression {cpm_expr} to a predicate")
# do we already have this predicate?
# (actually, cse might already catch these...)
if (lhs, comp, rhs) not in self.predicate_map:
if tag is None:
tag = self.pum_solver.new_constraint_tag()
pred = Predicate(self.to_pum_ivar(lhs, tag=tag), comp, rhs)
self.predicate_map[(lhs, comp, rhs)] = pred
return self.predicate_map[(lhs, comp, rhs)]
[docs]
def to_pum_ivar(self, cpm_var, tag=None):
"""
Helper function to convert (boolean) variables and constants to Pumpkin integer expressions
"""
if is_any_list(cpm_var):
return [self.to_pum_ivar(v, tag=tag) for v in cpm_var]
elif isinstance(cpm_var, _BoolVarImpl):
return self.pum_solver.boolean_as_integer(self.solver_var(cpm_var), tag=tag)
elif is_num(cpm_var):
return self.solver_var(intvar(cpm_var, cpm_var))
# can also be a scaled variable (multiplication view)
elif isinstance(cpm_var, Operator) and cpm_var.name == "mul":
const, cpm_var = cpm_var.args
if not is_num(const):
raise ValueError(f"Cannot create view from non-constant multiplier {const} * {cpm_var}")
return self.to_pum_ivar(cpm_var, tag=tag).scaled(const)
else:
return self.solver_var(cpm_var)
def _sum_args(self, expr, negate=False, tag=None):
"""
Helper function to convert CPMpy sum-like operators into pumpkin-compatible arguments.
expr is expected to be a `sum`, `wsum` or `sub` operator.
:return: Returns a list of Pumpkin integer expressions
"""
args = []
if tag is None: raise ValueError("Expected tag to be provided but got None")
if isinstance(expr, Operator) and expr.name == "sum":
for cpm_var in expr.args:
pum_var = self.solver_var(cpm_var)
if cpm_var.is_bool(): # have convert to integer
pum_var = self.pum_solver.boolean_as_integer(pum_var, tag=tag)
args.append(pum_var.scaled(-1 if negate else 1))
elif isinstance(expr, Operator) and expr.name == "wsum":
for w, cpm_var in zip(*expr.args):
if w == 0: continue # exclude
pum_var = self.solver_var(cpm_var)
if cpm_var.is_bool(): # have convert to integer
pum_var = self.pum_solver.boolean_as_integer(pum_var, tag=tag)
args.append(pum_var.scaled(-w if negate else w))
elif isinstance(expr, Operator) and expr.name == "sub":
x, y = self.solver_vars(expr.args)
if expr.args[0].is_bool():
x = self.pum_solver.boolean_as_integer(x, tag=tag)
if expr.args[1].is_bool():
y = self.pum_solver.boolean_as_integer(y, tag=tag)
args = [x.scaled(-1 if negate else 1), y.scaled(1 if negate else -1)]
else:
raise ValueError(f"Unknown expression to convert in sum-arguments: {expr}")
return args
def _is_predicate(self, cpm_expr):
"""
Check if a CPMpy expression can be converted to a Pumpkin predicate
"""
if isinstance(cpm_expr, _NumVarImpl):
return True
elif isinstance(cpm_expr, Operator):
if cpm_expr.name == "sum" and len(cpm_expr.args) == 1:
return True
if cpm_expr.name == "wsum" and len(cpm_expr.args[0]) == 1:
return True
if cpm_expr.name == "mul" and is_num(cpm_expr.args[0]):
return True
return False
def _get_constraint(self, cpm_expr, tag=None):
"""
Convert a CPMpy expression into a Pumpkin constraint
Expects a transformed CPMpy expression, this logic is implemented as a separate function so we can support reification in `add()`
"""
from pumpkin_solver import constraints
if tag is None:
tag = self.pum_solver.new_constraint_tag()
if isinstance(cpm_expr, _BoolVarImpl):
# base case, just var or ~var, post as clause
return [constraints.Clause([self.solver_var(cpm_expr)], constraint_tag=tag)]
elif isinstance(cpm_expr, Operator):
if cpm_expr.name == "or":
return [constraints.Clause(self.solver_vars(cpm_expr.args), constraint_tag=tag)]
raise NotImplementedError("Pumpkin: operator not (yet) supported", cpm_expr)
elif isinstance(cpm_expr, Comparison):
lhs, rhs = cpm_expr.args
assert isinstance(lhs, Expression), f"Expected a CPMpy expression on lhs but got {lhs} of type {type(lhs)}"
if self._is_predicate(lhs):
pred = self.to_predicate(cpm_expr, tag=tag)
return [constraints.Clause([self.pum_solver.predicate_as_boolean(pred, tag=tag)], constraint_tag=tag)]
if cpm_expr.name == "==":
if "sum" in lhs.name or lhs.name == "sub":
return [constraints.Equals(self._sum_args(lhs, tag=tag), rhs, constraint_tag=tag)]
pum_rhs = self.to_pum_ivar(rhs, tag=tag) # other operators require IntExpression
if lhs.name == "div":
return [constraints.Division(*self.to_pum_ivar(lhs.args, tag=tag), pum_rhs, constraint_tag=tag)]
elif lhs.name == "mul":
return [constraints.Times(*self.to_pum_ivar(lhs.args, tag=tag), pum_rhs, constraint_tag=tag)]
elif lhs.name == "abs":
return [constraints.Absolute(self.to_pum_ivar(lhs.args[0], tag=tag), pum_rhs, constraint_tag=tag)]
elif lhs.name == "min":
return [constraints.Minimum(self.to_pum_ivar(lhs.args, tag=tag), pum_rhs, constraint_tag=tag)]
elif lhs.name == "max":
return [constraints.Maximum(self.to_pum_ivar(lhs.args, tag=tag), pum_rhs, constraint_tag=tag)]
elif lhs.name == "element":
arr, idx = lhs.args
return [constraints.Element(self.to_pum_ivar(idx, tag=tag),
self.to_pum_ivar(arr, tag=tag),
pum_rhs, constraint_tag=tag)]
else:
raise NotImplementedError("Unknown lhs of comparison", cpm_expr)
elif cpm_expr.name == "<=":
return [constraints.LessThanOrEquals(self._sum_args(lhs, tag=tag), rhs, constraint_tag=tag)]
elif cpm_expr.name == "<":
return [constraints.LessThanOrEquals(self._sum_args(lhs, tag=tag), rhs-1,constraint_tag=tag)]
elif cpm_expr.name == ">=":
return [constraints.LessThanOrEquals(self._sum_args(lhs, negate=True, tag=tag), -rhs, constraint_tag=tag)]
elif cpm_expr.name == ">":
return [constraints.LessThanOrEquals(self._sum_args(lhs, negate=True, tag=tag), -rhs-1, constraint_tag=tag)]
elif cpm_expr.name == "!=":
return [constraints.NotEquals(self._sum_args(lhs, tag=tag), rhs, constraint_tag=tag)]
raise ValueError("Unknown comparison", cpm_expr)
elif isinstance(cpm_expr, GlobalConstraint):
if cpm_expr.name == "alldifferent":
return [constraints.AllDifferent(self.solver_vars(cpm_expr.args), constraint_tag=tag)]
elif cpm_expr.name == "cumulative":
start, dur, end, demand, cap = cpm_expr.args
assert all(is_num(d) for d in dur), "Pumpkin only accepts Cumulative with fixed durations"
assert all(is_num(d) for d in demand), "Pumpkin only accepts Cumulative with fixed demand"
assert is_num(cap), "Pumpkin only accepts Cumulative with fixed capacity"
pum_cons = [constraints.Cumulative(self.solver_vars(start),dur, demand, cap, constraint_tag=tag)]
if end is not None:
pum_cons += [self._get_constraint(c, tag=tag)[0] for c in self.transform([s + d == e for s,d,e in zip(start, dur, end)])]
return pum_cons
elif cpm_expr.name == "no_overlap":
start, dur, end = cpm_expr.args
return self._get_constraint(Cumulative(start, dur, end, demand=1, capacity=1), tag=tag)
elif cpm_expr.name == "table":
arr, table = cpm_expr.args
return [constraints.Table(self.to_pum_ivar(arr, tag=tag),
np.array(table).tolist(), # ensure Python list
constraint_tag=tag)
]
elif cpm_expr.name == "negative_table":
arr, table = cpm_expr.args
return [constraints.NegativeTable(self.to_pum_ivar(arr, tag=tag),
np.array(table).tolist(),# ensure Python list
constraint_tag=tag)
]
elif cpm_expr.name == "InDomain":
val, domain = cpm_expr.args
return [constraints.Table(self.to_pum_ivar([val], tag=tag),
[[d] for d in domain], # each domain value is its own row
constraint_tag=tag)
]
else:
raise NotImplementedError(f"Unknown global constraint {cpm_expr}")
elif isinstance(cpm_expr, BoolVal): # unlikely base case
a = boolvar() # dummy variable
if cpm_expr.value() is True:
return self._get_constraint(Operator("sum", [a]) >= 1, tag=tag)
else:
return self._get_constraint(Operator("sum", [a]) <= -1, tag=tag)
else:
raise ValueError("Unexpected constraint:", cpm_expr)
[docs]
def add(self, cpm_expr_orig):
"""
Eagerly add a constraint to the underlying solver.
Any CPMpy expression given is immediately transformed (through `transform()`)
and then posted to the solver in this function.
This can raise 'NotImplementedError' for any constraint not supported after transformation
The variables used in expressions given to add are stored as 'user variables'. Those are the only ones
the user knows and cares about (and will be populated with a value after solve). All other variables
are auxiliary variables created by transformations.
:param cpm_expr: CPMpy expression, or list thereof
:type cpm_expr: Expression or list of Expression
:return: self
"""
# add new user vars to the set
get_variables(cpm_expr_orig, collect=self.user_vars)
for cpm_expr in self.transform(cpm_expr_orig):
if isinstance(cpm_expr, Operator) and cpm_expr.name == "->": # found implication
bv, subexpr = cpm_expr.args
for pum_cons in self._get_constraint(subexpr):
self.pum_solver.add_implication(pum_cons, self.solver_var(bv))
else:
for pum_cons in self._get_constraint(cpm_expr):
self.pum_solver.add_constraint(pum_cons)
return self
__add__ = add # avoid redirect in superclass
[docs]
def get_core(self):
"""
For use with :func:`s.solve(assumptions=[...]) <cpmpy.solvers.pumpkin.CPM_pumpkin.solve>`. Only meaningful if the solver returned UNSAT. In that case, get_core() returns a small subset of assumption variables that are unsat together.
CPMpy will return only those variables that are False (in the UNSAT core)
.. note::
Note that there is no guarantee that the core is minimal, though this interface does open up the possibility to add more advanced Minimal Unsatisfiabile Subset algorithms on top. All contributions welcome!
"""
assert self._pum_core is not None, "Can only get core if the last solve-call was unsatisfiable under assumptions"
return [self.assump_map[pred] for pred in self._pum_core]