#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## globalconstraints.py
##
"""
Global constraints conveniently express non-primitive constraints.
Using global constraints
------------------------
Solvers can have specialised implementations for global constraints. CPMpy has :class:`~cpmpy.expressions.globalconstraints.GlobalConstraint`
expressions so that they can be passed to the solver as is when supported.
If a solver does not support a global constraint (see :ref:`Solver Interfaces <solver-interfaces>`) then it will be automatically
decomposed by calling its :func:`~cpmpy.expressions.globalconstraints.GlobalConstraint.decompose()` function.
The :func:`~cpmpy.expressions.globalconstraints.GlobalConstraint.decompose()` function returns two arguments:
- a list of simpler constraints replacing the global constraint
- if the decomposition introduces *new variables*, then the second argument has to be a list
of constraints that (totally) define those new variables
As a user you **should almost never subclass GlobalConstraint()** unless you know of a solver that
supports that specific global constraint, and that you will update its solver interface to support it.
For all other use cases, it sufficies to write your own helper function that immediately returns the
decomposition, e.g.:
.. code-block:: python
def alldifferent_except0(args):
return [ ((var1!= 0) & (var2 != 0)).implies(var1 != var2) for var1, var2 in all_pairs(args)]
Numeric global constraints
--------------------------
CPMpy also implements `Numeric Global Constraints`. For these, the CPMpy :class:`~cpmpy.expressions.globalconstraints.GlobalConstraint` does not
exactly match what is implemented in the solver, but for good reason!!
For example solvers may implement the global constraint ``Minimum(iv1, iv2, iv3) == iv4`` through an API
call ``addMinimumEquals([iv1,iv2,iv3], iv4)``.
However, CPMpy also wishes to support the expressions ``Minimum(iv1, iv2, iv3) > iv4`` as well as
``iv4 + Minimum(iv1, iv2, iv3)``.
Hence, the CPMpy global constraint only captures the ``Minimum(iv1, iv2, iv3)`` part, whose return type
is numeric and can be used in any other CPMpy expression. Only at the time of transforming the CPMpy
model to the solver API, will the expressions be decomposed and auxiliary variables introduced as needed
such that the solver only receives ``Minimum(iv1, iv2, iv3) == ivX`` expressions.
This is the burden of the CPMpy framework, not of the user who wants to express a problem formulation.
Subclassing GlobalConstraint
----------------------------
If you do wish to add a :class:`~cpmpy.expressions.globalconstraints.GlobalConstraint`, because it is supported by solvers or because you will do
advanced analysis and rewriting on it, then preferably define it with a standard decomposition, e.g.:
.. code-block:: python
class my_global(GlobalConstraint):
def __init__(self, args):
super().__init__("my_global", args)
def decompose(self):
return [self.args[0] != self.args[1]] # your decomposition
..
You can also implement a `.negate()` method if the global constraint has a better way to negate it than negating the decomposition.
The expression returned by `.negate()` should be equivalent to the negation of the global constraint, and is not allowed to introduce explicit `not` operators.
Instead, use cpmpy.transformations.negation.recurse_negation to push down the negation if you want to negate an expression.
If it is a :class:`~cpmpy.expressions.globalfunctions.GlobalFunction` meaning that its return type is numeric (see :class:`~cpmpy.expressions.globalfunctions.Minimum` and :class:`~cpmpy.expressions.globalfunctions.Element`)
then set `is_bool=False` in the super() constructor and preferably implement `.value()` accordingly.
Alternative decompositions
--------------------------
For advanced use cases where you want to use another decomposition than the standard decomposition
of a :class:`~cpmpy.expressions.globalconstraints.GlobalConstraint` expression, you can overwrite the :func:`~cpmpy.expressions.globalconstraints.GlobalConstraint.decompose` function of the class, e.g.:
.. code-block:: python
def my_circuit_decomp(self):
return [self.args[0] == 1], [] # does not actually enforce circuit
circuit.decompose = my_circuit_decomp # attach it, no brackets!
vars = intvar(1,9, shape=10)
constr = circuit(vars)
Model(constr).solve()
The above will use ``my_circuit_decomp``, if the solver does not
natively support :class:`~cpmpy.expressions.globalconstraints.Circuit`.
===============
List of classes
===============
.. autosummary::
:nosignatures:
AllDifferent
AllDifferentExcept0
AllDifferentExceptN
AllEqual
AllEqualExceptN
Circuit
Inverse
Table
ShortTable
NegativeTable
Regular
MDD
IfThenElse
InDomain
Xor
Cumulative
CumulativeOptional
NoOverlap
NoOverlapOptional
Precedence
GlobalCardinalityCount
Increasing
Decreasing
IncreasingStrict
DecreasingStrict
LexLess
LexLessEq
LexChainLess
LexChainLessEq
DirectConstraint
"""
import copy
import warnings
from collections import defaultdict
from typing import cast, Literal, Optional, Iterable, Any, TYPE_CHECKING
import numpy as np
import cpmpy as cp
from ..exceptions import TypeError
from .core import Expression, BoolVal, ExprLike, BoolExprLike, ListLike
from .variables import cpm_array, intvar, boolvar, _BoolVarImpl, NDVarArray, _NumVarImpl
from .utils import all_pairs, is_bool, STAR, get_bounds, argvals, is_any_list, flatlist, is_num, is_boolexpr, implies, argval
if TYPE_CHECKING:
from cpmpy.solvers.solver_interface import SolverInterface
# Base class GlobalConstraint
[docs]
class GlobalConstraint(Expression):
"""
Abstract superclass of GlobalConstraints
Like all expressions it has a ``.name`` and ``.args`` property.
Overwrites the ``.is_bool()`` method as all global constraints are Boolean.
"""
[docs]
def is_bool(self) -> bool:
"""
Returns whether the global constraint is a Boolean (return type) Operator.
Returns:
bool: True, global constraints are Boolean
"""
return True
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Returns a decomposition into (a conjunction of) smaller constraints.
The decomposition might create auxiliary variables,
it can also use other global constraints as long as
it does not create a circular dependency.
To ensure equivalence of decomposition, we split into constraints determining the value of the global constraint, and defining-constraints.
Defining constraints (totally) define new auxiliary variables needed for the decomposition, and can always be enforced at top-level.
The decomposition is not allowed to introduce explicit `not` operators.
Instead, use cpmpy.transformations.negation.recurse_negation to push down the negation if you want to negate an expression.
Tip: avoid creating auxiliary variables and use nested expressions instead!
(especially, don't create Booleans but use (iv == v) expressions instead, better for common subexpression elimination!)
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
raise NotImplementedError("Decomposition for", self, "not available")
[docs]
def decompose_positive(self) -> tuple[list[Expression], list[Expression]]:
"""
Positive decomposition of the global constraint, only valid when the constraint is posted toplevel or occurs in a positive nested context.
Defaults to the standard decomposition, but subclasses can implement a better version.
"""
return self.decompose()
[docs]
def get_bounds(self) -> tuple[int, int]:
"""
Returns the bounds of a Boolean global constraint.
Numerical global constraints should reimplement this.
"""
return 0, 1
[docs]
def negate(self) -> Expression:
"""
Returns the negation of this global constraint.
Defaults to ~self, but subclasses can implement a better version,
> Fages, François, and Sylvain Soliman. Reifying global constraints. Diss. INRIA, 2012.
"""
return ~self
# Global Constraints (with Boolean return type)
[docs]
def alldifferent(args):
"""
.. deprecated:: 0.9.0
Please use :class:`AllDifferent` instead.
"""
warnings.warn("Deprecated, use AllDifferent(v1,v2,...,vn) instead, will be removed in "
"stable version", DeprecationWarning)
return AllDifferent(*args) # unfold list as individual arguments
[docs]
class AllDifferent(GlobalConstraint):
"""
Enforces that all arguments have a different (distinct) value
"""
def __init__(self, *args: ExprLike|ListLike[ExprLike]):
"""
Arguments:
args (ExprLike|ListLike[ExprLike]): List of expressions or constants to be different from each other
"""
super().__init__("alldifferent", tuple(flatlist(args)))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the AllDifferent global constraint using pairwise disequality constraints.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
return [var1 != var2 for var1, var2 in all_pairs(self.args)], []
[docs]
def decompose_linear(self) -> tuple[list[Expression], list[Expression]]:
"""
Linear-friendly decomposition using sums over (arg[i] == val) expressions (which will become Boolean variables):
at most one integer variable can take each value in the domain.
For use with integer linear programming and pb/sat solvers.
"""
lbs, ubs = get_bounds(self.args)
lb, ub = min(lbs), max(ubs)
return [cp.sum((arg_i == val) for arg_i in self.args) <= 1 for val in range(lb, ub + 1)], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
vals = argvals(self.args)
if any(v is None for v in vals):
return None
return len(set(vals)) == len(self.args)
[docs]
class AllDifferentExceptN(GlobalConstraint):
"""
Enforces that all arguments, except those equal to a value in n, have a different (distinct) value.
Arguments:
arr (Sequence[Expression]): List of expressions to be different from each other, except those equal to a value in n
n (int or list[int]): Value or list of values that are excluded from satisfying the alldifferent condition
"""
def __init__(self, arr: ListLike[ExprLike], n: int|np.integer|list[int|np.integer]):
"""
Arguments:
arr (ListLike[ExprLike]): List of expressions or constants to be different from each other, except those equal to a value in n
n (int | np.integer | list[int | np.integer]): Value or list of values that are excluded from the distinctness constraint
"""
flatarr = flatlist(arr)
if not is_any_list(n):
n = cast(int, n)
n = [n] # ensure n is a list of ints
super().__init__("alldifferent_except_n", (flatarr, n))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the AllDifferentExceptN global constraint using pairwise constraints.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
cons = []
arr, n = self.args
for x,y in all_pairs(arr):
cond = (x == y)
if is_bool(cond):
cond = cp.BoolVal(cond)
cons.append(cond.implies(cp.any([x == a for a in n]))) # equivalent to (x in n) | (y in n) | (x != y)
return cons, []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
vals, exclude_vals = argvals(self.args)
if any(v is None for v in vals):
return None
vals = [v for v in vals if v not in frozenset(exclude_vals)]
return len(set(vals)) == len(vals)
[docs]
class AllDifferentExcept0(AllDifferentExceptN):
"""
Enforces that all arguments, except those equal to 0, have a different (distinct) value.
"""
def __init__(self, *args: ExprLike | ListLike[ExprLike]):
"""
Arguments:
args (ListLike[ExprLike]): List of expressions or constants to be different from each other, except those equal to 0
"""
super().__init__(flatlist(args), 0)
[docs]
def allequal(args):
"""
.. deprecated:: 0.9.0
Please use :class:`AllEqual` instead.
"""
warnings.warn("Deprecated, use AllEqual(v1,v2,...,vn) instead, will be removed in stable version",
DeprecationWarning)
return AllEqual(*args) # unfold list as individual arguments
[docs]
class AllEqual(GlobalConstraint):
"""
Enforces that all arguments have the same value
"""
def __init__(self, *args: ExprLike | ListLike[ExprLike]):
"""
Arguments:
args (ListLike[ExprLike]): List of expressions or constants to have the same value
"""
super().__init__("allequal", tuple(flatlist(args)))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the AllEqual global constraint using cascaded equality constraints.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
# arg0 == arg1, arg1 == arg2, arg2 == arg3... no need to post n^2 equalities
return [x == y for x, y in zip(self.args[:-1], self.args[1:])], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
vals = argvals(self.args)
if any(v is None for v in vals):
return None
return len(set(vals)) == 1
[docs]
class AllEqualExceptN(GlobalConstraint):
"""
Enforces that all arguments, except those equal to a value in n, have the same value.
"""
def __init__(self, arr: ListLike[ExprLike], n: int|np.integer|list[int|np.integer]):
"""
Arguments:
arr (ListLike[ExprLike]): List of expressions or constants to have the same value, except those equal to a value in n
n (int | np.integer | list[int | np.integer]): Value or list of values that are excluded from the equality constraint
"""
flatarr = flatlist(arr)
if not is_any_list(n):
n = cast(int, n)
n = [n] # ensure n is a list of ints
super().__init__("allequal_except_n", (flatarr, n))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the AllEqualExceptN global constraint using pairwise constraints.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
arr, n = self.args
constraints = []
for x, y in all_pairs(arr):
# x and y are equal, or one of them is equal to an excluded value
constraints.append(cp.any(x == a for a in n) | (x == y) | cp.any(y == a for a in n))
return constraints, []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
vals, exclude_vals = argvals(self.args)
if any(v is None for v in vals):
return None
vals = [v for v in vals if v not in frozenset(exclude_vals)]
return len(set(vals)) <= 1
[docs]
def circuit(args):
"""
.. deprecated:: 0.9.0
Please use :class:`Circuit` instead.
"""
warnings.warn("Deprecated, use Circuit(v1,v2,...,vn) instead, will be removed in stable version",
DeprecationWarning)
return Circuit(*args) # unfold list as individual arguments
[docs]
class Circuit(GlobalConstraint):
"""
Enforces that the sequence of variables form a circuit, where x[i] = j means that node j is the successor of node i.
"""
def __init__(self, *args: ExprLike | ListLike[ExprLike]):
"""
Arguments:
args (ListLike[ExprLike]): List of expressions or constants representing the successors of the nodes to form the circuit
"""
flatargs = flatlist(args)
if len(flatargs) < 2:
raise ValueError('Circuit constraint must be given a minimum of 2 variables')
super().__init__("circuit", tuple(flatargs))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the Circuit global constraint using auxiliary variables to reprsent the order in which we visit all the nodes.
Auxiliary variables are defined in the defining part of the decomposition, which is alwasy enforced top-level.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
# construct the chain of neighbors
succ = cp.cpm_array(self.args)
order = [succ[0]]
for i in range(1, len(succ)):
order.append(succ[order[i - 1]])
# element constraints can be partial
from cpmpy.transformations.safening import _no_partial_functions
changed, safe_order, toplevel, nbc = _no_partial_functions(order, safen_toplevel=frozenset(), is_toplevel=False)
if changed:
order = safe_order # operate on the safened order expressions
value = [order[-1] == 0, # return to start node
AllDifferent(order), # ensure no subcircuits
AllDifferent(succ) # redundant constraint, strengthens decomposition
]
return value + nbc, toplevel
[docs]
def decompose_linear_positive(self) -> tuple[list[Expression], list[Expression]]:
"""
Linear decomposition of the Circuit global constraint, inspired by Miller-Tucker-Zemlin formulation for TSPs.
This linear decomposition is only valid in positive context.
"""
succ = self.args
n = len(succ)
order = cp.intvar(0, n - 1, shape=n)
constraining : list[Expression] = []
constraining.extend(x >= 0 for x in succ) # lower bound on successors
constraining.extend(x < n for x in succ) # upper bound on successors
constraining.extend(cp.sum(succ[j] == i for j in range(n)) == 1 for i in range(n)) # each node i has exactly one predecessor
constraining.append(cp.AllDifferent(order)) # redundant constraint
constraining.append(order[0] == 0)
defining: list[Expression] = []
for i in range(n):
for j in range(n):
if i == j:
# forbid self-loops
constraining.append(succ[i] != j)
if j != 0:
# ensure no subtours, i -> j means order must increase along the edge (can not loop back, except to j=0)
defining.append((succ[i] == j) == (order[i] + 1 == order[j]))
return constraining, defining
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
idx = 0
visited = set()
arr = argvals(self.args)
if any(a is None for a in arr):
return None
while idx not in visited:
if not (0 <= idx < len(arr)):
return False # out of bounds
visited.add(idx)
idx = arr[idx]
return len(visited) == len(self.args) and idx == 0
[docs]
class Inverse(GlobalConstraint):
"""
Enforces that the forward and reverse arrays represent the inverse function of one another.
I.e., fwd[i] == x <==> rev[x] == i
Also known as channeling / assignment constraint.
"""
def __init__(self, fwd: ListLike[ExprLike], rev: ListLike[ExprLike]):
"""
Arguments:
fwd (ListLike[ExprLike]): List of expressions or constants representing the forward function
rev (ListLike[ExprLike]): List of expressions or constants representing the reverse function
"""
if len(fwd) != len(rev):
raise ValueError("Length of fwd and rev must be equal for Inverse constraint")
super().__init__("inverse", (fwd, rev))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the Inverse global constraint using Element global function constraints, and explicit safening.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
# we try to avoid in-function imports (needed when cyclic dependency),
# but decompose is typically only called once anyway, so here it is acceptable
from cpmpy.transformations.safening import _no_partial_functions
fwd, rev = self.args
rev = cpm_array(rev)
constraining = [rev[x] == i for i,x in enumerate(fwd)]
# Element constraints can be partial, so run safening transformation
changed, safe_constraining, toplevel, nbc = _no_partial_functions(constraining, is_toplevel=False,
safen_toplevel=frozenset())
if changed:
constraining = safe_constraining + nbc
return constraining, toplevel
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
fwd, rev = argvals(self.args)
if any(x is None for x in fwd) or any(y is None for y in rev):
return None
# explicit check for partial element constraints
if any(not (0 <= x < len(rev)) for x in fwd):
return False
if any(not (0 <= y < len(fwd)) for y in rev):
return False
return all(rev[x] == i for i, x in enumerate(fwd))
[docs]
class Table(GlobalConstraint):
"""
Enforces that the values of the variables in 'array' correspond to a row in 'table'.
"""
def __init__(self, array: ListLike[Expression], table: ListLike[ListLike[int]] | np.ndarray):
"""
Arguments:
array (ListLike[Expression]): List of expressions representing the array of variables
table (ListLike[ListLike[int]] | np.ndarray): List of lists of integers or 2D ndarray of ints representing the table.
"""
if isinstance(array, NDVarArray):
has_subexpr = array.has_subexpr() # fast shortcut
if array.ndim != 1: # reshape to 1D
array = array.reshape(-1)
else:
has_subexpr = False
for x in array: # C-style python
if isinstance(x, Expression) and not isinstance(x, (_NumVarImpl, BoolVal)):
has_subexpr = True
break
if not isinstance(table, np.ndarray): # Ensure it is a numpy array with integers
table = np.array(table, dtype=int)
elif table.dtype.kind != 'i': # dtype int
table = table.astype(int, copy=False)
assert table.ndim == 2, "Table's table must be a 2D array"
assert table.shape[1] == len(array), f"Table width {table.shape[1]} != array length {len(array)}"
# args: tuple[ListLike[Expression], np.ndarray]
super().__init__("table", (array, table), has_subexpr=has_subexpr)
@property
def args(self) -> tuple[ListLike[Expression], np.ndarray]:
""" READ-ONLY, the well-tuped arguments of this global constraint
"""
return self._args
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the Table global constraint. Enforces at least one row of the table is assigned to the array.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
arr, tab = self.args
return [cp.any([cp.all([ai == ri for ai, ri in zip(arr, row)]) for row in tab])], []
[docs]
def decompose_positive(self) -> tuple[list[Expression], list[Expression]]:
"""
Positive decomposition of the Table global constraint.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
arr, tab = self.args
row_selected = boolvar(shape=(len(tab),))
defining = []
for i, row in enumerate(tab):
subexpr = cp.all([x == v for x,v in zip(arr, row)])
defining.append(row_selected[i].implies(subexpr)) # implication-only decomposition
return [cp.any(row_selected)], defining
def _variable_ordering(self, heuristic:str="domain"):
"""
Heuristically order the variables to obtain a better MDD. The columns of the table are ordered accordingly.
Arguments:
heuristic (str): Name of the heuristic to use for ordering the variables.
Currently supported heuristic:
- "domain" : order by domain size (small to large)
Returns:
tuple[ListLike[Expression], (ListLike[ListLike[int]] | np.ndarray)]: The ordered array and table arguments
"""
arr, tab = self.args
if len(arr) == 0:
return arr, tab
if heuristic == "domain":
lbs, ubs = get_bounds(arr)
scores = [ub - lb + 1 for lb, ub in zip(lbs, ubs)]
else:
raise ValueError(f"Unsupported ordering heuristic: {heuristic}, chose from {['domain']}")
ordering = sorted(range(len(arr)), key=scores.__getitem__)
arr = [arr[i] for i in ordering]
tab = tab[:, ordering]
return arr, tab
[docs]
def decompose_linear(self, heuristic:str="domain") -> tuple[list[Expression], list[Expression]]:
"""
Linear-friendly decomposition of the Table global constraint using an MDD, which is subsequently decomposed into linear flow constraints.
Based on:
Bierlee, H., Piessens, W., Stuckey, P., & Guns, T. (CP 2026).
Table Constraints for Integer Programming.
In Leibniz International Proceedings in Informatics. Schloss Dagstuhl -- Leibniz-Zentrum fuer Informatik.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
if len(self.args[1]) == 0: # empty table, does not allow any assignments
return [cp.BoolVal(False)], []
arr, tab = self._variable_ordering(heuristic)
mdd: dict[int, dict[int, int]] = {}
ROOT = 0
SINK = -1
count = 1 # number of non-root & non-sink nodes
for row in tab.tolist(): # converts to Python ints
current = ROOT
for i, val in enumerate(row):
if current not in mdd.keys():
mdd[current] = {}
nxt: int | None
if i == len(row) - 1:
nxt = SINK
mdd[current][val] = nxt
else:
nxt = mdd[current].get(val)
if nxt is None:
nxt = count
count += 1
mdd[current][val] = nxt
current = nxt
transitions = [(id1, v, id2) for id1, pairs in mdd.items() for v, id2 in pairs.items()]
return [MDD(arr, transitions, start=ROOT)], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
arr, tab = self.args
arrval = np.asarray(argvals(arr))
if arrval.dtype == object and any(x is None for x in arrval.flat): # if not object, there is no None
return None
return bool(np.any(np.all(tab == arrval, axis=1)))
[docs]
def negate(self) -> Expression:
arr, tab = self.args
return NegativeTable(arr, tab)
[docs]
class ShortTable(GlobalConstraint):
"""
Extension of the `Table` constraint where the `table` matrix may contain wildcards (STAR), meaning there are
no restrictions for the corresponding variable in that tuple.
"""
def __init__(self, array: ListLike[Expression], table: ListLike[ListLike[int|Literal["*"]]] | np.ndarray):
"""
Arguments:
array (ListLike[Expression]): List of expressions representing the array of variables
table (ListLike[ListLike[int | '*']] | np.ndarray): List of lists or 2D ndarray; entries are integers or STAR ('*')
STAR represents a wildcard (corresponding variable can take any value).
"""
if isinstance(array, NDVarArray):
has_subexpr = array.has_subexpr() # fast shortcut
if array.ndim != 1: # reshape to 1D
array = array.reshape(-1)
else:
has_subexpr = False
for x in array: # C-style python
if isinstance(x, Expression) and not isinstance(x, (_NumVarImpl, BoolVal)):
has_subexpr = True
break
if not isinstance(table, np.ndarray):
table = np.array(table, dtype=object) # object, otherwise np makes it all string
assert table.ndim == 2, "ShortTable's table must be a 2D array"
assert table.shape[1] == len(array), f"ShortTable width {table.shape[1]} != array length {len(array)}"
# args: tuple[ListLike[Expression], np.ndarray]
super().__init__("short_table", (array, table), has_subexpr=has_subexpr)
@property
def args(self) -> tuple[ListLike[Expression], np.ndarray]:
""" READ-ONLY, the well-tuped arguments of this global constraint
"""
return self._args
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the ShortTable global constraint. Enforces at least one row of the table is assigned to the array.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
arr, tab = self.args
return [cp.any([cp.all([ai == ri for ai, ri in zip(arr, row) if ri != STAR]) for row in tab])], []
[docs]
def decompose_positive(self) -> tuple[list[Expression], list[Expression]]:
"""
Positive decomposition of the ShortTable global constraint.
Similar to `element` from Gleb's paper: "Improved Linearization of Constraint
Programming Models"
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
arr, tab = self.args
row_selected = boolvar(shape=(len(tab),))
defining = []
for i, row in enumerate(tab):
subexpr = cp.all([ai == ri for ai, ri in zip(arr, row) if ri != STAR])
defining.append(row_selected[i].implies(subexpr)) # implication-only decomposition
return [cp.sum(row_selected) == 1], defining
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
arr, tab = self.args
arrval = np.asarray(argvals(arr))
if arrval.dtype == object and any(x is None for x in arrval.flat): # if not object, there is no None
return None
arrval = arrval.astype(int, copy=False)
non_star = (tab != STAR)
for row, mask in zip(tab, non_star):
if (row[mask].astype(int, copy=False) == arrval[mask]).all():
return True
return False
[docs]
class NegativeTable(GlobalConstraint):
"""
The values of the variables in 'array' do not correspond to any row in 'table'.
"""
def __init__(self, array: ListLike[Expression], table: ListLike[ListLike[int]] | np.ndarray):
"""
Arguments:
array (ListLike[Expression]): List of expressions representing the array of variables
table (ListLike[ListLike[int]] | np.ndarray): List of lists of integers or 2D ndarray of ints representing the table.
"""
if isinstance(array, NDVarArray):
has_subexpr = array.has_subexpr() # fast shortcut
if array.ndim != 1: # reshape to 1D
array = array.reshape(-1)
else:
has_subexpr = False
for x in array: # C-style python
if isinstance(x, Expression) and not isinstance(x, (_NumVarImpl, BoolVal)):
has_subexpr = True
break
if not isinstance(table, np.ndarray): # Ensure it is a numpy array
table = np.array(table, dtype=int)
elif table.dtype.kind != 'i': # dtype int
table = table.astype(int, copy=False)
assert table.ndim == 2, "NegativeTable's table must be a 2D array"
assert table.shape[1] == len(array), f"NegativeTable width {table.shape[1]} != array length {len(array)}"
# args: tuple[ListLike[Expression], np.ndarray]
super().__init__("negative_table", (array, table), has_subexpr=has_subexpr)
@property
def args(self) -> tuple[ListLike[Expression], np.ndarray]:
""" READ-ONLY, the well-tuped arguments of this global constraint
"""
return self._args
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the NegativeTable global constraint.
Enforces that the values of the variables in 'array' do not correspond to any row in 'table'.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
arr, tab = self.args
return [cp.all([cp.any([ai != ri for ai, ri in zip(arr, row)]) for row in tab])], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
arr, tab = self.args
arrval = np.asarray(argvals(arr))
if arrval.dtype == object and any(x is None for x in arrval.flat): # if not object, there is no None
return None
return not bool(np.any(np.all(tab == arrval, axis=1)))
[docs]
def negate(self) -> Expression:
arr, tab = self.args
return Table(arr, tab)
[docs]
class Regular(GlobalConstraint):
"""
Regular-constraint (or Automaton-constraint)
Takes as input a sequence of variables and a automaton representation using a transition table.
The constraint is satisfied if the sequence of variables corresponds to an accepting path in the automaton.
The automaton is defined by a list of transitions, a starting node and a list of accepting nodes.
The transitions are represented as a list of tuples, where each tuple is of the form (id1, value, id2).
An id is an integer or string representing a state in the automaton, and value is an integer representing the value of the variable in the sequence.
The starting node is an integer or string representing the starting state of the automaton.
The accepting nodes are a list of integers or strings representing the accepting states of the automaton.
Example: an automaton that accepts the language 0*10* (exactly 1 variable taking value 1) is defined as:
cp.Regular(array = cp.intvar(0,1, shape=4),
transitions = [("A",0,"A"), ("A",1,"B"), ("B",0,"C"), ("C",0,"C")],
start = "A",
accepting = ["C"])
"""
def __init__(self, array: ListLike[Expression], transitions: ListLike[tuple[int|str, int, int|str]], start: int|str, accepting: ListLike[int|str]):
"""
Arguments:
array (ListLike[Expression]): List of expressions representing the input sequence
transitions (ListLike[tuple[int | str, int, int | str]]): List of transition triples (source, value, destination)
start (int | str): Starting node id
accepting (ListLike[int | str]): List of accepting node ids
"""
array = flatlist(array)
if not all(isinstance(x, Expression) for x in array):
raise TypeError("The first argument of a regular constraint should only contain variables/expressions")
if not is_any_list(transitions):
raise TypeError("The second argument of a regular constraint should be a list of transitions")
_node_type = type(transitions[0][0])
for s,v,e in transitions:
if not isinstance(s, _node_type) or not isinstance(e, _node_type) or not isinstance(v, int):
raise TypeError(f"The second argument of a regular constraint should be a list of transitions ({_node_type}, int, {_node_type})")
if not isinstance(start, _node_type):
raise TypeError("The third argument of a regular constraint should be a node id")
if not (is_any_list(accepting) and all(isinstance(e, _node_type) for e in accepting)):
raise TypeError("The fourth argument of a regular constraint should be a list of node ids")
super().__init__("regular", (list(array), list(transitions), start, list(accepting)))
node_set = set()
self.trans_dict = {}
for s, v, e in transitions:
node_set.update([s,e])
self.trans_dict[(s, v)] = e
self.nodes = sorted(node_set)
# normalize node_ids to be 0..n-1, allows for smaller domains
self.node_map = {n: i for i, n in enumerate(self.nodes)}
[docs]
def decompose_positive(self) -> tuple[list[Expression], list[Expression]]:
return self.decompose(complete=False)
[docs]
def decompose(self, complete=True) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the Regular global constraint.
Encodes the automaton by encoding the transition table into `class:cpmpy.expressions.globalconstraints.Table` constraints.
Then enforces that the last state is accepting.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
# Decompose to transition table using Table constraints
arr, transitions, start, accepting = self.args
if len(accepting) == 0:
return [cp.BoolVal(False)], [] # no accepting states, cannot be satisfied
lbs, ubs = get_bounds(arr)
lb, ub = min(lbs), max(ubs)
transitions = [[self.node_map[n_in], v, self.node_map[n_out]] for n_in, v, n_out in transitions]
if complete:
# add a sink node for transitions that are not defined. When the Regular constraint is in positive context, this is not needed
sink = len(self.nodes)
self.nodes.append(sink)
self.node_map[sink] = sink
transitions.extend([[self.node_map[n], v, sink] for n in self.nodes for v in range(lb, ub + 1) if (n, v) not in self.trans_dict])
# keep track of current state when traversing the array
state_vars = intvar(0, len(self.nodes)-1, shape=len(arr))
id_start = self.node_map[start]
# optimization: we know the entry node of the automaton, results in smaller table
defining: list[Expression] = [Table([arr[0], state_vars[0]], [[v, e] for s, v, e in transitions if s == id_start])]
# define the rest of the automaton using transition table
defining.extend(Table([state_vars[i - 1], arr[i], state_vars[i]], transitions) for i in range(1, len(arr)))
# constraint is satisfied iff last state is accepting
return [InDomain(state_vars[-1], [self.node_map[e] for e in accepting])], defining
[docs]
def decompose_linear_positive(self) -> tuple[list[Expression], list[Expression]]:
return self.decompose_linear(complete=False)
[docs]
def decompose_linear(self, complete=True) -> tuple[list[Expression], list[Expression]]:
"""
Deterministic Finite Automata (DFA) MIP decomposition using flow constraints based on
Côté et al. (2007): "Modeling the Regular Constraint with Integer Programming"
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
arr, transitions, start, accepting = self.args
if len(accepting) == 0:
return [cp.BoolVal(False)], [] # no accepting states, cannot be satisfied
# Collect all nodes and all transition values in the DFA
nodes_set = set()
values_set = set()
for src, value, dst in transitions:
nodes_set.add(src)
nodes_set.add(dst)
values_set.add(value)
nodes = sorted(nodes_set)
values = sorted(values_set)
node_idx = {node: idx for idx, node in enumerate(nodes)}
value_idx = {value: idx for idx, value in enumerate(values)}
flow_in = defaultdict(list)
flow_out = defaultdict(list)
# Determine the flow in and out of each node based on the transitions
for src, value, dst in transitions:
id1 = node_idx[src]
id2 = node_idx[dst]
v = value_idx[value]
flow_out[id1].append((v, id2))
flow_in[id2].append((v, id1))
if complete:
# When no edge in a given node for a given value exists, add an edge to a new non-accepting sink node
snk = len(nodes)
for id1 in range(len(nodes)):
existing_values = {val for (val, _) in flow_out[id1]}
for v in range(len(values)):
if v not in existing_values:
flow_out[id1].append((v, snk))
flow_in[snk].append((v, id1))
# The non-accepting sink node loops back to itself for all values
for v in range(len(values)):
flow_out[snk].append((v, snk))
flow_in[snk].append((v, snk))
nodes.append(snk)
defining = []
constraining = []
S = node_idx[start]
E = [node_idx[a] for a in accepting]
# Variable s[i,j,q] is true iff at position i in the array, we take the transition from node q with value j
s = cp.boolvar(shape=(len(arr), len(values), len(nodes)))
sf = cp.boolvar(shape=(len(E),))
# Start node has one unit of flow
for q in range(len(nodes)):
if q == S:
constraining.append(cp.sum(s[0, j, S] for j, _ in flow_out[S]) == 1)
else:
defining.append(cp.sum(s[0, j, q] for j in range(len(values))) == 0)
# Enforce flow constraints: flow in = flow out
for i in range(1, len(arr)):
for q in range(len(nodes)):
defining.append(cp.sum(s[i - 1, j, q_] for (j, q_) in flow_in[q]) == cp.sum(s[i, j, q] for (j, _) in flow_out[q]))
defining.append(cp.sum(s[i - 1, j, q_] for (j, q_) in flow_in[q]) <= 1) # redundant constraint
defining.append(cp.sum(s[i, j, q] for (j, _) in flow_out[q]) <= 1) # redundant constraint
# Accepting end nodes have one unit of flow in total
for q in range(len(nodes)):
if q in E:
defining.append(cp.sum(s[-1, j, q_] for (j, q_) in flow_in[q]) == sf[E.index(q)])
else:
constraining.append(cp.sum(s[-1, j, q_] for (j, q_) in flow_in[q]) == 0)
constraining.append(cp.sum(sf) == 1)
# Channelling constraints between the flow variables and the direct encoding variables
for i in range(len(arr)):
for j in range(len(values)):
defining.append(cp.sum(s[i, j, q] for q in range(len(nodes))) == (arr[i] == values[j]))
return constraining, defining
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
arr, transitions, start, accepting = self.args
arrval = argvals(arr)
if any(x is None for x in arrval):
return None
curr_node = start
for v in arrval:
if (curr_node, v) in self.trans_dict:
curr_node = self.trans_dict[curr_node, v]
else:
return False
return curr_node in accepting
[docs]
class MDD(GlobalConstraint):
"""
An MDD is a Multi-valued Decision Diagram, represented as an acyclic layered graph, with a single root node, and a single accepting sink node.
The constraint takes as input an array of `n` integer variables and an MDD with `n+1` layers, represented through a table of "(from_node, value, to_node)" entries, one for every arc in the MDD.
The MDD constraint is satisfied when the values in the array correspond to a path in the MDD starting from the root node, and where the first variable
in the array takes the value of the first edge, the second from the second edge, etc., ending in the accepting sink node.
The transitions/edges are given by a `n x 3` matrix, or more precisely a list of `n` tuples `(node_id1, value, node_id2)`.
A node_id is an integer or string representing a state in the MDD, and value is an integer representing the value of the variable in the sequence.
If not given explicitly, the root node is the node_id1 of the first entry in the transition table (i.e., transitions[0][0]).
The root node is at level 0, the sink node is the only node on level n.
Example: the following MDD accepts the solutions [1,1,1], [2,2,1] and [2,3,2] for the three variables x,y,z. The root node is "A" and the sink node is "F".
cp.MDD(array = [x,y,z],
transitions = [("A", 1, "B"),
("A", 2, "C"),
("B", 1, "D"),
("C", 2, "D"),
("C", 3, "E"),
("D", 1, "F"),
("E", 2, "F")])
"""
def __init__(self, array: ListLike[Expression], transitions: ListLike[tuple[int|str, int, int|str]], start: Optional[int|str] = None, reduce: bool = True):
"""
Arguments:
array (ListLike[Expression]): List of expressions representing the input sequence
transitions (ListLike[tuple[int | str, int, int | str]]): List of transition triples (node_id1, value, node_id2)
start (Optional[int | str]): Root node_id, if None, the root node is assumed to be the first node in the transition table (i.e., transitions[0][0])
reduce (bool, default=True): During decomposition, whether to reduce the MDD as a first decomposition step
by merging nodes with equivalent suffixes, reducing the size of the MDD
"""
array = flatlist(array)
if not all(isinstance(x, Expression) for x in array):
raise TypeError("The first argument of an MDD constraint should only contain variables/expressions")
_node_type = type(transitions[0][0])
for id1, v, id2 in transitions:
if not isinstance(id1, _node_type) or not isinstance(v, int) or not isinstance(id2, _node_type):
raise TypeError(
f"The second argument of an MDD constraint should be a list of transitions ({_node_type}, int, {_node_type})")
super().__init__("mdd", (array,))
self.root_node = transitions[0][0] if start is None else start
self.mapping: dict[int | str, dict[int, int | str]] = defaultdict(dict) # mapping from source node and transition value to destination node
for id1, v, id2 in transitions:
self.mapping[id1][v] = id2
self.levels = {self.root_node: 0}
current_nodes = [self.root_node]
for level in range(len(array)):
new_nodes = []
for id1 in current_nodes:
for _,id2 in self.mapping[id1].items():
new_nodes.append(id2)
self.levels[id2] = level + 1
current_nodes = new_nodes
# Check that there is exactly one sink node on level n (with n the number of integer variables)
sink_nodes = [node for node, level in self.levels.items() if level == len(array)]
assert len(sink_nodes) == 1
self.sink_node = sink_nodes[0]
# store whether the MDD should be reduced during decomposition
self.reduce = reduce
def _reduce(self):
"""
Auxiliary function that reduces the original MDD by merging nodes with equivalent suffixes
Alters the mapping in-place.
"""
arr = self.args[0]
substitutions = {}
# Loop backwards over MDD levels, from sink to root node
for i in reversed(range(len(arr))):
level_nodes = [n for (n,lvl) in self.levels.items() if lvl == i]
# Mapping is redirected to representative (potentially merged) nodes of the next layer in the MDD
for node in level_nodes:
for value in self.mapping[node]:
dst = self.mapping[node][value]
self.mapping[node][value] = substitutions.get(dst, dst) # If no substitution, keep original destination node
groups = defaultdict(list) # All nodes with the same transition function are grouped together, and can be merged
for node in level_nodes:
transition_function = self.mapping[node]
# Ordered tuple of (value, destination node) pairs, serves as a unique signature for equivalent transition functions
signature = tuple(sorted(transition_function.items()))
groups[signature].append(node)
for equiv_nodes in groups.values():
# First node chosen as representative node, others are merged with it
rep = equiv_nodes[0]
for node in equiv_nodes[1:]:
substitutions[node] = rep
self.mapping.pop(node, None)
self.levels.pop(node, None)
def _get_complete_mdd(self) -> tuple[dict[int | str, dict[int, int | str]], set[tuple[int | str, int]]]:
"""
Auxiliary function that extends the MDD with invalid edges, which are directed to the sink node.
Returns:
tuple[dict[int | str, dict[int, int | str]], set[tuple[int | str, int]]]:
A tuple containing the extended mapping of the MDD and a set of invalid edges (source node, transition value) that are added to the MDD.
"""
arr = self.args[0]
invalid_edges = set()
extended_mapping = copy.deepcopy(self.mapping)
for id1 in self.mapping.keys():
level = self.levels[id1]
domain = range(arr[level].lb, arr[level].ub + 1)
for v in domain:
if v not in self.mapping[id1]:
extended_mapping[id1][v] = self.sink_node
invalid_edges.add((id1, v))
return extended_mapping, invalid_edges
[docs]
def decompose_positive(self) -> tuple[list[Expression], list[Expression]]:
return self.decompose(complete=False)
[docs]
def decompose(self, complete=True) -> tuple[list[Expression], list[Expression]]:
"""
Flow decomposition of the MDD global constraint.
Enforces that the condition is satisfied, by ensuring that the flow in equals the flow out for every node.
To do this, we introduce auxiliary boolean variables for every edge in the MDD, use them in flow constraints,
and link them to all variable assignments in the array.
Returns:
tuple[list[Expression], list[Expression]]:
A tuple containing the constraints representing the constraint value and the defining constraints.
"""
arr = self.args[0]
# Reduce the MDD if requested
if self.reduce:
self._reduce()
if complete:
# MDD is extended with invalid edges, which are directed to the sink node
mapping, invalid_edges_set = self._get_complete_mdd()
invalid_edges = frozenset(invalid_edges_set)
else:
mapping = self.mapping
invalid_edges = frozenset()
# Ingoing and outgoing flow for each node (key: node ID, value: list of edge variables)
# The default is an empty list, representing no ingoing / outgoing flow.
flow_in: dict[int | str, list[Expression]] = defaultdict(list)
flow_out: dict[int | str, list[Expression]] = defaultdict(list)
# Used to link edge variables to direct encoding variables in a later step
edge_vars = defaultdict(list)
invalid_edge_vars = []
# Determine flow in and flow out for each node, and make a boolvar for each edge
for id1, edges in mapping.items():
for value, id2 in edges.items():
edge_var = cp.boolvar()
level = self.levels[id1]
flow_out[id1].append(edge_var)
flow_in[id2].append(edge_var)
edge_vars[(level, value)].append(edge_var)
if (id1, value) in invalid_edges:
invalid_edge_vars.append(edge_var)
defining = []
constraining = []
# Enforce flow constraints: flow in = flow out, at most one activated in/out edge
for node, level in self.levels.items():
incoming = flow_in[node]
outgoing = flow_out[node]
if level == 0:
constraining.append(cp.sum(outgoing) == 1) # root
elif level == len(arr):
defining.append(cp.sum(incoming) == 1) # sink
else:
defining.append(cp.sum(incoming) == cp.sum(outgoing)) #enforce flow for internal nodes
defining.append(cp.sum(incoming) <= 1) # redundant constraint: at most one incoming edge
defining.append(cp.sum(outgoing) <= 1) # redundant constraint: at most one outgoing edge
# Enforce that when arr[i] == v, exactly one of the edges at level i with label v is true, otherwise none can be true
for (level, value), vars_ in edge_vars.items():
defining.append(cp.sum(vars_) == (arr[level] == value))
constraining.append(cp.sum(invalid_edge_vars) == 0)
# When the MDD is extended to a complete MDD by means of invalid edges, there is always a solution to the flow problem.
# The only constraining constraints are therefore that the root flow is equal to 1, and that no invalid edge has any flow.
return constraining, defining
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
arr = self.args[0]
argvals = [argval(a) for a in arr]
curr_node = self.root_node
if any(v is None for v in argvals):
return None
for curr_v in argvals:
if curr_node in self.mapping:
if curr_v in self.mapping[curr_node]:
curr_node = self.mapping[curr_node][curr_v]
else:
return False
else:
return False
return True # can only have reached end node
def __repr__(self) -> str:
"Print the MDD and the internally stored table"
table = [[id1, v, id2] for id1, edges in self.mapping.items() for v, id2 in edges.items()]
return f"MDD({self.args[0]}, {table}, {self.root_node})"
# syntax of the form 'if b then x == 9 else x == 0' is not supported (no override possible)
# same semantic as CPLEX IfThenElse constraint
# https://www.ibm.com/docs/en/icos/12.9.0?topic=methods-ifthenelse-method
[docs]
class IfThenElse(GlobalConstraint):
"""
Enforces a conditional expression of the form: if condition then if_true else if_false.
`condition`, `if_true` and `if_false` are be boolean expressions.
"""
def __init__(self, condition: BoolExprLike, if_true: BoolExprLike, if_false: BoolExprLike):
"""
Arguments:
condition (BoolExprLike): Boolean expression or constant
if_true (BoolExprLike): Boolean expression or constant
if_false (BoolExprLike): Boolean expression or constant
"""
if not is_boolexpr(condition) or not is_boolexpr(if_true) or not is_boolexpr(if_false):
raise TypeError(f"only boolean expression allowed in IfThenElse: Instead got "
f"{condition, if_true, if_false}")
super().__init__("ite", (condition, if_true, if_false))
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
condition, if_true, if_false = argvals(self.args)
if condition is None or if_true is None or if_false is None:
return None
if condition:
return if_true
else:
return if_false
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the IfThenElse global constraint.
Enforces that the condition is satisfied.
"
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
from cpmpy.transformations.negation import recurse_negation
condition, if_true, if_false = self.args
if is_bool(condition):
condition = cp.BoolVal(condition) # ensure it is a CPMpy expression
return [condition.implies(if_true),
recurse_negation(condition).implies(if_false)], []
def __repr__(self) -> str:
condition, if_true, if_false = self.args
return "If {} Then {} Else {}".format(condition, if_true, if_false)
[docs]
class InDomain(GlobalConstraint):
"""
Enforces the expression is assigned to a value in the given domain.
"""
def __init__(self, expr: Expression, arr: Iterable[int|np.integer]):
"""
Arguments:
expr (Expression): Expression to be assigned to a value in the given domain
arr (Iterable[int | np.integer]): Iterable of integer constants representing the domain
"""
if not isinstance(arr, np.ndarray):
arr = np.array(arr, dtype=int)
assert arr.ndim == 1, "The second argument of an InDomain constraint should be a 1D array of integer constants"
has_subexpr = expr.has_subexpr()
# args: tuple[Expression, np.ndarray]
super().__init__("InDomain", (expr, arr), has_subexpr=has_subexpr)
@property
def args(self) -> tuple[Expression, np.ndarray]:
""" READ-ONLY, the well-tuped arguments of this global constraint
"""
return self._args
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the InDomain global constraint.
Enforces that the expression is assigned to a value in the given domain.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
expr, arr = self.args
lb, ub = expr.get_bounds()
arr_set = frozenset(arr)
return [expr != val for val in range(lb, ub + 1) if val not in arr_set], []
[docs]
def decompose_linear(self) -> tuple[list[Expression], list[Expression]]:
"""
Linear decomposition of the InDomain global constraint.
Avoids != constraints and instead decomposes into a large disjunction.
If `expr` is a variable (the most common case), `cpmpy.transformations.linearize.linearize_reified_varvals` will then encode this variable with a direct encoding
"""
expr, arr = self.args
lb, ub = expr.get_bounds()
arr_set = frozenset(arr)
return [cp.any([expr == val for val in arr_set])], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
expr, arr = self.args
exprval = expr.value()
if exprval is None:
return None
return bool(np.any(arr == exprval))
def __repr__(self) -> str:
expr, arr = self.args
return "{} in {}".format(expr, arr)
[docs]
def negate(self) -> Expression:
expr, arr = self.args
lb, ub = expr.get_bounds()
# complement of arr
arr_set = frozenset(arr)
return InDomain(expr, [v for v in range(lb,ub+1) if v not in arr_set])
[docs]
class Xor(GlobalConstraint):
"""
Enforces the exclusive-or relation of the arguments.
Supports n-ary xor-constraints, which are treated as cascaed binary xor-constraints.
Equivalent to `sum(args) % 2 == 1`
"""
def __init__(self, arg_list: ListLike[BoolExprLike]):
"""
Arguments:
arg_list (ListLike[BoolExprLike]): List of expressions or constants, to be xor'ed
"""
if not all(is_boolexpr(arg) for arg in arg_list):
raise TypeError("Only Boolean arguments allowed in Xor global constraint: {}".format(arg_list))
# convention for commutative binary operators:
# swap if right is constant and left is not
arg_list = list(arg_list)
if len(arg_list) == 2 and is_num(arg_list[1]):
arg_list[0], arg_list[1] = arg_list[1], arg_list[0]
super().__init__("xor", tuple(arg_list))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the Xor global constraint.
Recursively decomposes the constraint into a chain of sums.
E.g., xor(a,b,c) :: (((a + b) == 1) + c) == 1
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
from cpmpy.transformations.negation import recurse_negation
# lets first simplify the Xor by removing all constants:
# True Xor x :: ~x and False Xor x :: x
new_args: list[Expression] = []
parity = False # base case
for a in self.args:
if isinstance(a, Expression) and not isinstance(a, BoolVal):
new_args.append(a)
else: # a constant, don't store but update parity
if a: # True Xor x :: ~x
parity = not parity
if len(new_args) == 0:
return [BoolVal(parity)], []
if parity: # negate first Boolean variable
changed = False
for i, a in enumerate(new_args): # index into new_args (constants already removed)
if isinstance(a, _BoolVarImpl):
new_args[i] = ~a # a is var, ok to be negated
changed = True
break
if not changed: # no variables, negate first argument
new_args[0] = recurse_negation(new_args[0]) # decompose cannot introduce negation, so push down into arg
# There are multiple decompositions possible,
# recursively using sum allows it to be efficient for all solvers.
# E.g., xor(a,b,c) :: (((a + b) == 1) + c) == 1
prev: Expression = new_args[0]
for a in new_args[1:]:
prev = (prev + a == 1) # recursive pairwise Xor decomposition
return [prev], []
[docs]
def value(self) -> Optional[bool]:
arrvals = argvals(self.args)
if any(a is None for a in arrvals):
return None
return sum(arrvals) % 2 == 1
def __repr__(self) -> str:
if len(self.args) == 2:
return "{} xor {}".format(*self.args)
return "xor({})".format(self.args)
[docs]
def negate(self) -> Expression:
from cpmpy.transformations.negation import recurse_negation
# negate one of the arguments, ideally a variable
new_args = list(self.args) # takes shallow copy
changed = False
for i, a in enumerate(self.args):
if isinstance(a, _BoolVarImpl):
new_args[i] = ~a # a is var, ok to be negated
changed = True
break
if not changed: # did not find a Boolean variable to negate
# pick first arg, and push down negation
new_args[0] = recurse_negation(new_args[0]) # .negate() cannot introduce negation, so push down into arg
return Xor(new_args)
[docs]
class Cumulative(GlobalConstraint):
"""
Enforces that a set of tasks is scheduled such that the capacity of the resource is never exceeded and enforces:
- duration >= 0
- demand >= 0
- start + duration == end
Equivalent to :class:`~cpmpy.expressions.globalconstraints.NoOverlap` when demand and capacity are equal to 1.
Supports both varying demand across tasks or equal demand for all jobs.
"""
def __init__(self, start: ListLike[ExprLike], duration: ListLike[ExprLike], end: Optional[ListLike[ExprLike]] = None, demand: Optional[ListLike[ExprLike]|ExprLike] = None, capacity: Optional[ExprLike] = None):
"""
Arguments:
start (ListLike[ExprLike]): Start times of the tasks
duration (ListLike[ExprLike]): Durations of the tasks
end (ListLike[ExprLike] | None): Optional end times of the tasks
demand (ListLike[ExprLike] | ExprLike): Per-task demands or a single constant demand, required
capacity (ExprLike): Capacity of the resource, required
Technical note: demand/capacity marked as Optional because it comes after an Optional argument
"""
if not is_any_list(start):
raise TypeError("start should be a list")
if not is_any_list(duration):
raise TypeError("duration should be a list")
if end is not None and not is_any_list(end):
raise TypeError("end should be a list if it is provided")
if demand is None: # marked optional due to 'end' being optional and parameters after that must be optional too
raise TypeError("demand should be provided but was None")
if capacity is None: # marked optional due to 'end' being optional and parameters after that must be optional too
raise TypeError("capacity should be provided but was None")
if len(start) != len(duration):
raise ValueError("Start and duration should have equal length")
if end is not None and len(start) != len(end):
raise ValueError(f"Start and end should have equal length, but got {len(start)} and {len(end)}")
demand_list = []
if is_any_list(demand):
demand_list = list(demand)
if len(demand_list) != len(start):
raise ValueError(f"Demand should be supplied for each task or be single constant, but got {len(demand_list)} and {len(start)}")
else: # constant demand
demand_list = [demand] * len(start)
if end is None:
super(Cumulative, self).__init__("cumulative", (list(start), list(duration), demand_list, capacity))
else:
super(Cumulative, self).__init__("cumulative", (list(start), list(duration), list(end), demand_list, capacity))
[docs]
def decompose(self, how:str="auto") -> tuple[list[Expression], list[Expression]]:
"""
Decompose the Cumulative constraint
Support time-based decomposition or task-based decomposition.
By default, we heuristically select the best decomposition based on the number of tasks and the horizon.
Arguments:
how (str): how the cumulative constraint should be decomposed, can be "time", "task", or "auto" (default)
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
if how not in ["time", "task", "auto"]:
raise ValueError(f"how can only be time, task, or auto (default), but got {how}")
start= self.args[0]
lbs, ubs = get_bounds(start)
horizon = max(ubs) - min(lbs)
if (how == "time") or (how == "auto" and len(start) <= horizon):
return self._time_decomposition()
elif (how == "task") or (how == "auto" and len(start) > horizon):
return self._task_decomposition()
raise Exception # should not be reached
def _consistency_constraints(self) -> list[Expression]:
"""
Helper function to enforce consistency constraints, used in the decomposition.
Consistency constraints enforce that:
- duration >= 0
- demand >= 0
- start + duration == end
"""
cons = []
if len(self.args) == 4:
start, duration, demand, capacity = self.args
else:
start, duration, end, demand, capacity = self.args
cons += [start[i] + duration[i] == end[i] for i in range(len(start))]
cons += [d >= 0 for d in duration] # enforce non-negative durations
cons += [h >= 0 for h in demand] # enforce non-negative demand
return cons
def _task_decomposition(self) -> tuple[list[Expression], list[Expression]]:
"""
Task-based decomposition of the cumulative constraint.
Schutt, Andreas, et al. "Why cumulative decomposition is not as bad as it sounds."
International Conference on Principles and Practice of Constraint Programming. Springer, Berlin, Heidelberg, 2009.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
cons = self._consistency_constraints()
if len(self.args) == 4:
start, duration, demand, capacity = self.args
end = [start[i] + duration[i] for i in range(len(start))]
else:
start, duration, end, demand, capacity = self.args
# demand doesn't exceed capacity
# tasks are uninterruptible, so we only need to check each starting point of each task
# I.e., for each task, we check if it can be started, given the tasks that are already running.
for t in range(len(start)):
st = start[t]
demand_at_start_of_t = []
for j in range(len(start)):
if t != j:
demand_at_start_of_t.append(demand[j] * ((start[j] <= st) & (end[j] > st)))
cons.append((demand[t] + sum(demand_at_start_of_t)) <= capacity)
return cons, []
def _time_decomposition(self) -> tuple[list[Expression], list[Expression]]:
"""
Time-resource decomposition of the cumulative constraint.
Schutt, Andreas, et al. "Why cumulative decomposition is not as bad as it sounds."
International Conference on Principles and Practice of Constraint Programming. Springer, Berlin, Heidelberg, 2009.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
cons = self._consistency_constraints()
if len(self.args) == 4:
start, duration, demand, capacity = self.args
end = [start[i] + duration[i] for i in range(len(start))]
else:
start, duration, end, demand, capacity = self.args
# demand doesn't exceed capacity
# for each time-step, we check if the running demand does not exceed the capacity
lbs, ubs = get_bounds(start)
lb, ub = min(lbs), max(ubs)
for t in range(lb,ub+1):
cons += [cp.sum(d * ((s <= t) & (e > t)) for s,e,d in zip(start, end, demand)) <= capacity]
return cons, []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
if len(self.args) == 4:
start, duration,demand, capacity = argvals(self.args)
if any(a is None for a in start + duration + demand + [capacity]):
return None
end = [start[i] + duration[i] for i in range(len(start))]
else:
start, duration,end, demand, capacity = argvals(self.args)
if any(a is None for a in start + duration + end + demand + [capacity]):
return None
if any(d < 0 for d in duration):
return False
if any(s + d != e for s,d,e in zip(start, duration,end)):
return False
if any(d < 0 for d in demand):
return False
# ensure demand doesn't exceed capacity
lb, ub = min(start), max(end)
np_start, np_end = np.asanyarray(start), np.asanyarray(end) # eases check below
for t in range(lb, ub+1):
if capacity < sum(demand * ((np_start <= t) & (np_end > t))):
return False
return True
[docs]
class CumulativeOptional(GlobalConstraint):
"""
Generalization of the Cumulative constraint which allows for optional tasks.
A task is only scheduled if the corresponing is_present variable is set to True.
If the task is present, the constraint enforces that:
- duration >= 0
- demand >= 0
- start + duration == end
If the task is not present, the constraint does not enforce any of the above.
Equivalent to :class:`~cpmpy.expressions.globalconstraints.NoOverlapOptional` when demand and capacity are equal to 1.
Supports both varying demand across tasks or equal demand for all jobs.
"""
def __init__(self, start: ListLike[ExprLike],
duration: ListLike[ExprLike],
end: Optional[ListLike[ExprLike]] = None,
demand: Optional[ListLike[ExprLike]|ExprLike] = None,
capacity: Optional[ExprLike] = None,
is_present: Optional[ListLike[BoolExprLike]] = None):
"""
Arguments:
start (ListLike[ExprLike]): Start times of the tasks
duration (ListLike[ExprLike]): Durations of the tasks
end (ListLike[ExprLike] | None): Optional end times of the tasks
demand (ListLike[ExprLike] | ExprLike): Per-task demands or a single constant demand, required
capacity (ExprLike): Capacity of the resource, required
is_present (ListLike[BoolExprLike]): Presence of the tasks
Technical note: demand/capacity marked as Optional because it comes after an Optional argument
"""
if not is_any_list(start):
raise TypeError("start should be a list")
if not is_any_list(duration):
raise TypeError("duration should be a list")
if end is not None and not is_any_list(end):
raise TypeError("end should be a list if it is provided")
if demand is None:
raise TypeError("demand should be provided but was None")
if capacity is None:
raise TypeError("capacity should be provided but was None")
if is_present is None:
raise TypeError("is_present should be provided but was None")
if len(start) != len(duration):
raise ValueError("Start and duration should have equal length")
if len(start) != len(is_present):
raise ValueError("Start and is_present should have equal length")
if end is not None and len(start) != len(end):
raise ValueError(f"Start and end should have equal length, but got {len(start)} and {len(end)}")
demand_list = []
if is_any_list(demand):
demand_list = list(demand)
if len(demand_list) != len(start):
raise ValueError(f"Demand should be supplied for each task or be single constant, but got {len(demand_list)} and {len(start)}")
else: # constant demand
demand_list = [demand] * len(start)
if end is None:
super().__init__("cumulative_optional", (list(start), list(duration), demand_list, capacity, list(is_present)))
else:
super().__init__("cumulative_optional", (list(start), list(duration), list(end), demand_list, capacity, list(is_present)))
[docs]
def decompose(self, how:str="auto") -> tuple[list[Expression], list[Expression]]:
"""
Decompose the Cumulative constraint
Support time-based decomposition or task-based decomposition.
By default, we heuristically select the best decomposition based on the number of tasks and the horizon.
Arguments:
how (str): how the cumulative constraint should be decomposed, can be "time", "task", or "auto" (default)
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
if how not in ["time", "task", "auto"]:
raise ValueError(f"how can only be time, task, or auto (default), but got {how}")
start, *args = self.args
lbs, ubs = get_bounds(start)
horizon = max(ubs) - min(lbs)
if (how == "time") or (how == "auto" and len(start) <= horizon):
return self._time_decomposition()
elif (how == "task") or (how == "auto" and len(start) > horizon):
return self._task_decomposition()
raise Exception # should not be reached
def _consistency_constraints(self) -> list[Expression]:
"""
Helper function to enforce concistency constraints, used in the decomposition.
Consistency constraints enforce that:
- duration >= 0 if the task is present
- demand >= 0 if the task is present
- start + duration == end if the task is present
"""
cons = []
if len(self.args) == 5:
start, duration, demand, capacity, is_present = self.args
else:
start, duration, end, demand, capacity, is_present = self.args
cons += [implies(is_present[i], start[i] + duration[i] == end[i]) for i in range(len(start))]
cons += [implies(p,d >= 0) for d, p in zip(duration, is_present)] # enforce non-negative durations when present
cons += [implies(p,h >= 0) for h, p in zip(demand, is_present)] # enforce non-negative demand when present
return cons
def _task_decomposition(self) -> tuple[list[Expression], list[Expression]]:
"""
Task-based decomposition of the cumulative constraint.
Schutt, Andreas, et al. "Why cumulative decomposition is not as bad as it sounds."
International Conference on Principles and Practice of Constraint Programming. Springer, Berlin, Heidelberg, 2009.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
cons = self._consistency_constraints()
if len(self.args) == 5:
start, duration, demand, capacity, is_present = self.args
end = [start[i] + duration[i] for i in range(len(start))]
else:
start, duration, end, demand, capacity, is_present = self.args
# demand of tasks that are present doesn't exceed capacity
# tasks are uninterruptible, so we only need to check each starting point of each task
# I.e., for each task, we check if it can be started, given the tasks that are already running.
for t in range(len(start)):
st = start[t]
demand_at_start_of_t = []
for j in range(len(start)):
if t != j:
demand_at_start_of_t.append(demand[j] * (is_present[j] & (start[j] <= st) & (end[j] > st)))
cons.append(implies(is_present[t], (demand[t] + sum(demand_at_start_of_t)) <= capacity))
return cons, []
def _time_decomposition(self) -> tuple[list[Expression], list[Expression]]:
"""
Time-resource decomposition of the cumulative constraint.
Schutt, Andreas, et al. "Why cumulative decomposition is not as bad as it sounds."
International Conference on Principles and Practice of Constraint Programming. Springer, Berlin, Heidelberg, 2009.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
cons = self._consistency_constraints()
if len(self.args) == 5:
start, duration, demand, capacity, is_present = self.args
end = [start[i] + duration[i] for i in range(len(start))]
else:
start, duration, end, demand, capacity, is_present = self.args
# demand of tasks that are presentdoesn't exceed capacity
# for each time-step, we check if the running demand does not exceed the capacity
lbs, ubs = get_bounds(start)
lb, ub = min(lbs), max(ubs)
for t in range(lb,ub+1):
cons += [cp.sum(d * (present & (s <= t) & (e > t)) for s,e,d,present in zip(start, end, demand, is_present)) <= capacity]
return cons, []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
if len(self.args) == 5:
start, duration,demand, capacity, is_present = argvals(self.args)
if any(a is None for a in start + duration + demand + [capacity] + is_present):
return None
end = [s + d for s,d in zip(start, duration)]
else:
start, duration,end, demand, capacity, is_present = argvals(self.args)
if any(a is None for a in start + duration + end + demand + [capacity] + is_present):
return None
if any(p and d < 0 for d,p in zip(duration,is_present)):
return False
if any(p and s + d != e for s,d,e,p in zip(start, duration,end, is_present)):
return False
if any(p and d < 0 for d,p in zip(demand, is_present)):
return False
# ensure demand doesn't exceed capacity
lb, ub = min(start), max(end)
np_start, np_end, np_present = np.asanyarray(start), np.asanyarray(end), np.asanyarray(is_present) # eases check below
for t in range(lb, ub+1):
if capacity < sum(demand * (np_present & (np_start <= t) & (np_end > t))):
return False
return True
[docs]
class NoOverlap(GlobalConstraint):
"""
Enforces that a set of tasks are scheduled without overlapping, and enforces:
- duration >= 0
- start + duration == end
"""
def __init__(self, start: ListLike[ExprLike], duration: ListLike[ExprLike], end: Optional[ListLike[ExprLike]] = None):
"""
Arguments:
start (ListLike[ExprLike]): Start times of the tasks
duration (ListLike[ExprLike]): Durations of the tasks
end (ListLike[ExprLike] | None): Optional end times of the tasks
"""
if not is_any_list(start):
raise TypeError("start should be a list")
if not is_any_list(duration):
raise TypeError("duration should be a list")
if end is not None and not is_any_list(end):
raise TypeError("end should be a list if it is provided")
if len(start) != len(duration):
raise ValueError("Start and duration should have equal length")
if end is not None and len(start) != len(end):
raise ValueError(f"Start and end should have equal length, but got {len(start)} and {len(end)}")
if end is None:
super().__init__("no_overlap", (list(start), list(duration)))
else:
super().__init__("no_overlap", (list(start), list(duration), list(end)))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the NoOverlap constraint, using pairwise no-overlap constraints.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
cons = []
if len(self.args) == 2:
start, duration = self.args
end = [start[i] + duration[i] for i in range(len(start))]
else:
start, duration, end = self.args
cons += [start[i] + duration[i] == end[i] for i in range(len(start))]
cons += [d >= 0 for d in duration]
for (s1, e1), (s2, e2) in all_pairs(zip(start, end)):
cons.append((e1 <= s2) | (e2 <= s1))
return cons, []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
if len(self.args) == 2:
start, duration = argvals(self.args)
if any(a is None for a in start+duration):
return None
end = [s + d for s,d in zip(start, duration)]
else:
start, duration,end = argvals(self.args)
if any(a is None for a in [start, duration,end]):
return None
if any(d < 0 for d in duration):
return False
if any(s + d != e for s,d,e in zip(start, duration,end)):
return False
for (s1,d1), (s2,d2) in all_pairs(zip(start,duration)):
if s1 + d1 > s2 and s2 + d2 > s1:
return False
return True
[docs]
class NoOverlapOptional(GlobalConstraint):
"""
Generalization of the NoOverlap constraint which allows for optional tasks.
A task is only scheduled if the corresponing is_present variable is set to True.
The constraint enforces that all present tasks are scheduled without overlapping, and for each present task, the constraint enforces that:
- duration >= 0
- demand >= 0
- start + duration == end
if the task is not present, it does not enforce any of the above.
"""
def __init__(self, start: ListLike[ExprLike], duration: ListLike[ExprLike], end: Optional[ListLike[ExprLike]] = None, is_present: Optional[ListLike[BoolExprLike]] = None):
"""
Arguments:
start (ListLike[ExprLike]): List of Expression objects representing the start times of the tasks
duration (ListLike[ExprLike]): List of Expression objects representing the durations of the tasks
end (ListLike[ExpLike] | None): optional, list of Expression objects representing the end times of the tasks
is_present (ListLike[BoolExprLike]): List of Boolean Expression objects representing the presence of the tasks
"""
if not is_any_list(start):
raise TypeError("start should be a list")
if not is_any_list(duration):
raise TypeError("duration should be a list")
if end is not None and not is_any_list(end):
raise TypeError("end should be a list if it is provided")
if is_present is None or not is_any_list(is_present):
raise ValueError("is_present should be provided and should be a list")
if len(start) != len(duration):
raise ValueError("Start and duration should have equal length")
if len(start) != len(is_present):
raise ValueError("Start and is_present should have equal length")
if end is not None and len(start) != len(end):
raise ValueError(f"Start and end should have equal length, but got {len(start)} and {len(end)}")
if end is None:
super().__init__("no_overlap_optional", (list(start), list(duration), list(is_present)))
else:
super().__init__("no_overlap_optional", (list(start), list(duration), list(end), list(is_present)))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the NoOverlap constraint, using pairwise no-overlap constraints.
Returns:
tuple[Sequence[Expression], Sequence[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
cons = []
if len(self.args) == 3:
start, duration, is_present = self.args
end = [start[i] + duration[i] for i in range(len(start))]
else:
start, duration, end, is_present = self.args
cons += [implies(is_present[i], start[i] + duration[i] == end[i]) for i in range(len(start))]
for (s1, e1, p1), (s2, e2, p2) in all_pairs(zip(start, end, is_present)):
cons += [implies(p1 & p2, (e1 <= s2) | (e2 <= s1))]
return cons, []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
if len(self.args) == 3:
start, duration,is_present = argvals(self.args)
if any(a is None for a in start + duration + is_present):
return None
end = [s + d for s,d in zip(start, duration)]
else:
start, duration,end, is_present = argvals(self.args)
if any(a is None for a in start + duration + end + is_present):
return None
if any(p and d < 0 for d,p in zip(duration,is_present)):
return False
if any(p and s + d != e for s,d,e,p in zip(start, duration,end, is_present)):
return False
for (s1,d1,p1), (s2,d2,p2) in all_pairs(zip(start,duration,is_present)):
if p1 and p2 and (s1 + d1 > s2) and (s2 + d2 > s1):
return False
return True
[docs]
class Precedence(GlobalConstraint):
"""
Enforces a precedence relationship between a set of variables.
Given an array of variables X and a list of values P, values in P must appear in X in the order specified by P.
I.e., if X[i] = P[j+1], then there exists a X[i'] = P[j] with i' < i
Examples:
- X = [1,2,1,3] satisfies the precedence [1,2,3].
- X = [4,1,2,1,3] also satisfies the precedence, as values not appearing in P can appear in any order.
- X = [2,1,3] does not satisfy the precedence, as 1 does not appear before 2.
"""
def __init__(self, vars: ListLike[ExprLike], precedence: ListLike[int|np.integer]):
"""
Arguments:
vars (ListLike[ExprLike]): List of expressions or constants representing the variables
precedence (ListLike[int | np.integer]): List of integer precedence values
"""
if not is_any_list(vars):
raise TypeError("Precedence expects a list of variables as first argument, but got", vars)
if not is_any_list(precedence) or not all(is_num(p) for p in precedence):
raise TypeError("Precedence expects a list of values as second argument, but got", precedence)
super().__init__("precedence", (list(vars), list(precedence)))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition based on:
Law, Yat Chiu, and Jimmy HM Lee. "Global constraints for integer and set value precedence."
Principles and Practice of Constraint Programming–CP 2004: 10th International Conference, CP 2004
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
args, precedence = self.args
args = cpm_array(args)
constraints = []
for s,t in zip(precedence[:-1], precedence[1:]):
# constraint 1 from paper
constraints.append(args[0] != t)
# constraint 2 from paper
for j in range(1,len(args)):
lhs = args[j] == t
if is_bool(lhs): # args[j] and t could both be constants
lhs = BoolVal(lhs)
constraints.append(lhs.implies(cp.any(args[:j] == s)))
return constraints, []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
args, precedence = self.args
vals = argvals(args)
if any(v is None for v in vals):
return None
vals = np.array(vals)
for s,t in zip(precedence[:-1], precedence[1:]):
if vals[0] == t:
return False
for j in range(len(args)):
if vals[j] == t and sum(vals[:j] == s) == 0:
return False
return True
[docs]
class GlobalCardinalityCount(GlobalConstraint):
"""
Enforces that the number of occurrences of each value `vals[i]` in the list of variables `vars` is equal to `occ[i]`.
"""
def __init__(self, vars: ListLike[ExprLike], vals: ListLike[int|np.integer], occ: ListLike[ExprLike], closed: bool = False):
"""
Arguments:
vars (ListLike[ExprLike]): List of expressions or constants representing the variables
vals (ListLike[int | np.integer]): List of integer values
occ (ListLike[ExprLike]): List of expressions or constants representing the number of occurrences of each value
closed (bool): Whether the constraint is closed, if true, `vars` can only take values in `vals`
"""
if not is_any_list(vars):
raise TypeError("GlobalCardinalityCount expects a list of variables, but got", vars)
if not is_any_list(vals) or not all(is_num(v) for v in vals):
raise TypeError("GlobalCardinalityCount expects a list of values, but got", vals)
if not is_any_list(occ):
raise TypeError("GlobalCardinalityCount expects a list of variables as occurrences, but got", occ)
if len(vals) != len(occ):
raise ValueError(f"Number of values and occurrences must be equal, but got {len(vals)} and {len(occ)}")
super().__init__("gcc", (list(vars), list(vals), list(occ)))
self.closed = closed
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the GlobalCardinalityCount constraint.
Uses a conjunction of Count global function constraints.
Same as the one MiniZinc uses:
https://github.com/MiniZinc/libminizinc/blob/master/share/minizinc/std/fzn_global_cardinality.mzn
"""
vars, vals, occ = self.args
counts = [cp.Count(vars, v) for v in vals]
constraints = [cnt == o for cnt, o in zip(counts, occ)]
if self.closed:
constraints.extend([InDomain(v, vals) for v in vars])
# redundant constraint
constraints.append(cp.sum(counts) == len(vars))
else:
# redundant constraint
constraints.append(cp.sum(counts) <= len(vars))
return constraints, []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
vars, vals, occ = self.args
vars, occ = argvals([vars, occ])
if any(x is None for x in vars + occ):
return None
vals = np.array(vals)
for val, cnt in zip(vals, occ):
if sum(vars == val) != cnt:
return False
if self.closed and any(v not in frozenset(vals) for v in vars):
return False
return True
[docs]
class Increasing(GlobalConstraint):
"""
Enforces that the expressions are assigned to (non-strictly) increasing values.
"""
def __init__(self, *args: ExprLike | ListLike[ExprLike]):
"""
Arguments:
args (ListLike[ExprLike]): List of expressions or constants to be assigned to increasing values
"""
super().__init__("increasing", tuple(flatlist(args)))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the Increasing constraint.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
args = self.args
return [args[i] <= args[i+1] for i in range(len(args)-1)], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
args = argvals(self.args)
if any(x is None for x in args):
return None
return all(args[i] <= args[i+1] for i in range(len(args)-1))
[docs]
class Decreasing(GlobalConstraint):
"""
Enforces that the expressions are assigned to (non-strictly) decreasing values.
"""
def __init__(self, *args: ExprLike | ListLike[ExprLike]):
"""
Arguments:
args (ListLike[ExprLike]): List of expressions or constants to be assigned to decreasing values
"""
super().__init__("decreasing", tuple(flatlist(args)))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the Decreasing constraint.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
args = self.args
return [args[i] >= args[i+1] for i in range(len(args)-1)], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
args = argvals(self.args)
if any(x is None for x in args):
return None
return all(args[i] >= args[i+1] for i in range(len(args)-1))
[docs]
class IncreasingStrict(GlobalConstraint):
"""
Enforces that the expressions are assigned to strictly increasing values.
"""
def __init__(self, *args: ExprLike | ListLike[ExprLike]):
"""
Arguments:
args (ListLike[ExprLike]): List of expressions or constants to be assigned to strictly increasing values
"""
super().__init__("strictly_increasing", tuple(flatlist(args)))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the IncreasingStrict constraint.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
args = self.args
return [args[i] < args[i+1] for i in range(len(args)-1)], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
args = argvals(self.args)
if any(x is None for x in args):
return None
args = argvals(self.args)
return all(args[i] < args[i+1] for i in range(len(args)-1))
[docs]
class DecreasingStrict(GlobalConstraint):
"""
Enforces that the expressions are assigned to strictly decreasing values.
"""
def __init__(self, *args: ExprLike | ListLike[ExprLike]):
"""
Arguments:
args (ListLike[ExprLike]): List of expressions or constants to be assigned to strictly decreasing values
"""
super().__init__("strictly_decreasing", tuple(flatlist(args)))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the DecreasingStrict constraint.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
args = self.args
return [(args[i] > args[i+1]) for i in range(len(args)-1)], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
args = argvals(self.args)
if any(x is None for x in args):
return None
args = argvals(self.args)
return all(args[i] > args[i+1] for i in range(len(args)-1))
[docs]
class LexLess(GlobalConstraint):
"""
Enforces that the first list is lexicographically smaller than the second list.
"""
def __init__(self, list1: ListLike[ExprLike], list2: ListLike[ExprLike]):
"""
Arguments:
list1 (ListLike[ExprLike]): First List of expressions or constants to be compared lexicographically
list2 (ListLike[ExprLike]): Second List of expressions or constants to be compared lexicographically
"""
if len(list1) != len(list2):
raise ValueError(f"The 2 lists given in LexLess must have the same size: list1 length is {len(list1)} and list2 length is {len(list2)}")
super().__init__("lex_less", (list1, list2))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Implementation inspired by Hakan Kjellerstrand (http://hakank.org/cpmpy/cpmpy_hakank.py)
The decomposition creates auxiliary Boolean variables and constraints that
collectively ensure X is lexicographically less than Y
The auxiliary boolean vars are defined to represent if the given lists are lexicographically ordered
(less or equal) up to the given index.
Decomposition enforces through the constraining part that the first boolean variable needs to be true, and thus
through the defining part it is enforced that if it is not strictly lexicographically less in a given index,
then next index must be lexicographically less or equal. It needs to be strictly less in at least one index.
The use of auxiliary Boolean variables bvar ensures that the constraints propagate immediately,
maintaining arc-consistency. Each bvar[i] enforces the lexicographic ordering at each position, ensuring that
every value in the domain of X[i] can be extended to a consistent value in the domain of $Y_i$ for all
subsequent positions.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
X, Y = cpm_array(self.args)
if len(X) == 0 == len(Y):
return [cp.BoolVal(False)], [] # based on the decomp, it's false...
bvar = boolvar(shape=(len(X) + 1))
# Constraint ensuring that each element in X is less than or equal to the corresponding element in Y,
# until a strict inequality is encountered.
defining = []
defining.extend(bvar == ((X <= Y) & ((X < Y) | bvar[1:]))) # vectorized expression, treat as list
# enforce the last element to be true iff (X[-1] < Y[-1]), enforcing strict lexicographic order
defining.append(bvar[-1] == (X[-1] < Y[-1]))
constraining = [bvar[0]]
return constraining, defining
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
X, Y = argvals(self.args)
if any(val is None for val in X + Y):
return None
return any((X[i] < Y[i]) & all(X[j] <= Y[j] for j in range(i)) for i in range(len(X)))
[docs]
class LexLessEq(GlobalConstraint):
"""
Enforces that the first list is lexicographically smaller than or equal to the second list.
"""
def __init__(self, list1: ListLike[ExprLike], list2: ListLike[ExprLike]):
"""
Arguments:
list1 (ListLike[ExprLike]): First List of expressions or constants to be compared lexicographically
list2 (ListLike[ExprLike]): Second List of expressions or constants to be compared lexicographically
"""
if len(list1) != len(list2):
raise ValueError(f"The 2 lists given in LexLessEq must have the same size: list1 length is {len(list1)} and list2 length is {len(list2)}")
super().__init__("lex_lesseq", (list1, list2))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Implementation inspired by Hakan Kjellerstrand (http://hakank.org/cpmpy/cpmpy_hakank.py)
The decomposition creates auxiliary Boolean variables and constraints that
collectively ensure X is lexicographically less than Y
The auxiliary boolean vars are defined to represent if the given lists are lexicographically ordered
(less or equal) up to the given index.
Decomposition enforces through the constraining part that the first boolean variable needs to be true, and thus
through the defining part it is enforced that if it is not strictly lexicographically less in a given index,
then next index must be lexicographically less or equal.
The use of auxiliary Boolean variables bvar ensures that the constraints propagate immediately,
maintaining arc-consistency. Each bvar[i] enforces the lexicographic ordering at each position, ensuring that
every value in the domain of X[i] can be extended to a consistent value in the domain of $Y_i$ for all
subsequent positions.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
X, Y = cpm_array(self.args)
if len(X) == 0 == len(Y):
return [cp.BoolVal(False)], [] # based on the decomp, it's false...
bvar = boolvar(shape=(len(X) + 1))
defining = []
defining.extend(bvar == ((X <= Y) & ((X < Y) | bvar[1:]))) # vectorized expression, treat as list
defining.append(bvar[-1] == (X[-1] <= Y[-1]))
constraining = [bvar[0]]
return constraining, defining
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
X, Y = argvals(self.args)
if any(val is None for val in X + Y):
return None
return any((X[i] < Y[i]) & all(X[j] <= Y[j] for j in range(i)) for i in range(len(X))) | all(X[i] == Y[i] for i in range(len(X)))
[docs]
class LexChainLess(GlobalConstraint):
"""
Enforces that all rows of the matrix are lexicographically ordered.
"""
def __init__(self, X: ListLike[ListLike[ExprLike]]):
"""
Arguments:
X (ListLike[ListLike[ExprLike]]): Matrix (List of lists) of expressions or constants to be compared lexicographically
"""
Xarr = np.array(X) # also checks length of each row is equal
if Xarr.ndim != 2:
raise ValueError(f"The matrix given in LexChainLess must be 2D, but got {Xarr.ndim} dimensions")
super().__init__("lex_chain_less", tuple(Xarr.tolist()))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
"""
Decomposition of the LexChainLess constraint.
Returns:
tuple[list[Expression], list[Expression]]: A tuple containing the constraints representing the constraint value and the defining constraints
"""
X = self.args
return [LexLess(prev_row, curr_row) for prev_row, curr_row in zip(X, X[1:])], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
X = argvals(self.args)
if any(val is None for val in flatlist(X)):
return None
return all(LexLess(prev_row, curr_row).value() for prev_row, curr_row in zip(X, X[1:]))
[docs]
class LexChainLessEq(GlobalConstraint):
"""
Enforces that all rows of the matrix are lexicographically ordered (less or equal)
"""
def __init__(self, X: ListLike[ListLike[ExprLike]]):
"""
Arguments:
X (ListLike[ListLike[ExprLike]]): Matrix (List of lists) of expressions or constants to be compared lexicographically
"""
Xarr = np.array(X) # also checks length of each row is equal
if Xarr.ndim != 2:
raise ValueError(f"The matrix given in LexChainLessEq must be 2D, but got {Xarr.ndim} dimensions")
super().__init__("lex_chain_lesseq", tuple(Xarr.tolist()))
[docs]
def decompose(self) -> tuple[list[Expression], list[Expression]]:
""" Decompose to a series of LexLessEq constraints between subsequent rows
"""
X = self.args
return [LexLessEq(prev_row, curr_row) for prev_row, curr_row in zip(X, X[1:])], []
[docs]
def value(self) -> Optional[bool]:
"""
Returns:
Optional[bool]: True if the global constraint is satisfied, False otherwise, or None if any argument is not assigned
"""
X = argvals(self.args)
if any(val is None for val in flatlist(X)):
return None
return all(LexLessEq(prev_row, curr_row).value() for prev_row, curr_row in zip(X, X[1:]))
[docs]
class DirectConstraint(Expression):
"""
A ``DirectConstraint`` will directly call a function of the underlying solver when added to a CPMpy solver
It can not be reified, it is not flattened, it can not contain other CPMpy expressions than variables.
When added to a CPMpy solver, it will literally just directly call a function on the underlying solver,
replacing CPMpy variables by solver variables along the way.
See the documentation of the solver (constructor) for details on how that solver handles them.
If you want/need to use what the solver returns (e.g. an identifier for use in other constraints),
then use :func:`~cpmpy.expressions.variables.directvar` instead, or access the solver object from the solver interface directly.
"""
def __init__(self, name: str, arguments: tuple[Any, ...], novar: Optional[ListLike[int]] = None):
"""
name (str): Name of the solver function that you wish to call
arguments (tuple[Any, ...]): Tuple of arguments to pass to the solver function with name `name`
novar (Optional[ListLike[int]]): Optional List of indices (offset 0) of arguments in `arguments` that contain no variables,
that can be passed 'as is' without scanning for variables
"""
if not isinstance(arguments, tuple):
arguments = (arguments,) # force tuple
super().__init__(name, arguments)
self.novar = novar
[docs]
def is_bool(self) -> bool:
""" is it a Boolean (return type) Operator?
"""
return True
[docs]
def callSolver(self, CPMpy_solver: "SolverInterface", Native_solver: Any) -> Any:
"""
Call the `directname()` function of the native solver,
with stored arguments replacing CPMpy variables with solver variables as needed.
SolverInterfaces will call this function when this constraint is added.
:param CPMpy_solver: a CPM_solver object, that has a `solver_vars()` function
:param Native_solver: the python interface to some specific solver
:return: the response of the solver when calling the function
"""
# get the solver function, will raise an AttributeError if it does not exist
solver_function = getattr(Native_solver, self.name)
solver_args = list(self.args) # takes a copy
for i in range(len(solver_args)):
if self.novar is None or i not in self.novar:
# it may contain variables, replace
if is_any_list(solver_args[i]):
solver_args[i] = CPMpy_solver.solver_vars(solver_args[i])
else:
solver_args[i] = CPMpy_solver.solver_var(solver_args[i])
# len(native_args) should match nr of arguments of `native_function`
return solver_function(*solver_args)