"""
Decompose global constraints not supported by the solver.
"""
import copy
import warnings # for deprecation warning
from .normalize import toplevel_list
from ..expressions.globalconstraints import GlobalConstraint
from ..expressions.globalfunctions import GlobalFunction
from ..expressions.core import Expression, Comparison, Operator
from ..expressions.variables import intvar, cpm_array, NDVarArray
from ..expressions.utils import is_any_list, eval_comparison
from ..expressions.python_builtins import all
from .flatten_model import flatten_constraint, normalized_numexpr
[docs]def decompose_in_tree(lst_of_expr, supported=set(), supported_reified=set(), _toplevel=None, nested=False, csemap=None):
"""
Decomposes any global constraint not supported by the solver.
Accepts a list of CPMpy expressions as input and returns a list of CPMpy expressions.
:param supported: a set of supported global constraints or global functions
:param supported_reified: a set of supported reified global constraints (globals with Boolean return type only)
:param _toplevel: a list of constraints that should be added toplevel, carried as pass by reference to recursive calls
:param nested: if True, new constraints will have been added to the `_toplevel` list too
Special care taken for unsupported global constraints in reified contexts and for numeric global constraints
in a comparison.
Supported numerical global functions remain in the expression tree as is. They can be rewritten using
:func:`cpmpy.transformations.reification.reify_rewrite`
The following ``bv -> NumExpr <comp> Var/Const`` can be rewritten as ``[bv -> IV0 <comp> Var/Const, NumExpr == IV0]``.
So even if numerical constraints are not supported in reified context, we can rewrite them to non-reified versions if they are total.
"""
if _toplevel is None:
_toplevel = []
# swap the arguments of a comparison while maintaining its semantics
flipmap = {"==": "==", "!=": "!=", "<": ">", "<=": ">=", ">": "<", ">=": "<="}
newlist = [] # decomposed constraints will go here
for expr in lst_of_expr:
if is_any_list(expr):
assert nested is True, "Cannot have nested lists without passing trough an expression, make sure to run " \
"cpmpy.transformations.normalize.toplevel_list first. "
if isinstance(expr, NDVarArray): # NDVarArray is also an expression,
# so we can call has_subexpr on it for a possible early-exit
if expr.has_subexpr():
newexpr = decompose_in_tree(expr, supported, supported_reified, _toplevel, nested=True, csemap=csemap)
newlist.append(cpm_array(newexpr))
else:
newlist.append(expr)
else: # a normal list-like (list, tuple, np.ndarray), must be called recursively and check all elements
newexpr = decompose_in_tree(expr, supported, supported_reified, _toplevel, nested=True, csemap=csemap)
newlist.append(newexpr)
continue
elif isinstance(expr, Operator):
if not expr.has_subexpr(): # Only recurse if there are nested expressions
newlist.append(expr)
continue
if any(isinstance(a,GlobalFunction) for a in expr.args):
expr, base_con = normalized_numexpr(expr, csemap=csemap)
_toplevel.extend(base_con) # should be added toplevel
# recurse into arguments, recreate through constructor (we know it stores no other state)
args = decompose_in_tree(expr.args, supported, supported_reified, _toplevel, nested=True, csemap=csemap)
newlist.append(Operator(expr.name, args))
elif isinstance(expr, GlobalConstraint) or isinstance(expr, GlobalFunction):
# Can't early-exit here, need to check if constraint in itself is even supported
if nested and expr.is_bool():
# special case: reified (Boolean) global
is_supported = (expr.name in supported_reified)
else:
is_supported = (expr.name in supported)
if is_supported:
# If no nested expressions, don't recurse the arguments
if not expr.has_subexpr():
newlist.append(expr)
continue
# Recursively decompose the subexpression arguments
else:
expr = copy.copy(expr)
expr.update_args(decompose_in_tree(expr.args, supported, supported_reified, _toplevel, nested=True, csemap=csemap))
newlist.append(expr)
else:
if expr.is_bool():
assert isinstance(expr, GlobalConstraint)
# boolean global constraints
dec = expr.decompose()
if not isinstance(dec, tuple):
warnings.warn(f"Decomposing an old-style global ({expr}) that does not return a tuple, which is "
"deprecated. Support for old-style globals will be removed in stable version",
DeprecationWarning)
dec = (dec, [])
decomposed, define = dec
_toplevel.extend(define) # definitions should be added toplevel
# the `decomposed` expression might contain other global constraints, check it
decomposed = decompose_in_tree(decomposed, supported, supported_reified, _toplevel, nested=nested, csemap=csemap)
newlist.append(all(decomposed))
else:
# global function, replace by a fresh variable and decompose the equality to this
assert isinstance(expr, GlobalFunction)
lb,ub = expr.get_bounds()
if csemap is not None and expr in csemap:
aux = csemap[expr]
else:
aux = intvar(lb, ub)
if csemap is not None:
csemap[expr] = aux
# NOTE Do we need to decompose here (the expr's args)? As done in the Comparison's section?
dec = expr.decompose_comparison("==", aux)
if not isinstance(dec, tuple):
warnings.warn(f"Decomposing an old-style global ({expr}) that does not return a tuple, which is "
"deprecated. Support for old-style globals will be removed in stable version",
DeprecationWarning)
dec = (dec, [])
auxdef, otherdef = dec
_toplevel.extend(auxdef + otherdef) # all definitions should be added toplevel
newlist.append(aux) # replace original expression by aux
elif isinstance(expr, Comparison):
if not expr.has_subexpr(): # Only recurse if there are nested expressions
newlist.append(expr)
continue
# if one of the two children is a (numeric) global constraint, we can decompose the comparison directly
# otherwise e.g., min(x,y,z) == a would become `min(x,y,z).decompose_comparison('==',aux) + [aux == a]`
lhs, rhs = expr.args
exprname = expr.name # can change when rhs needs decomp
decomp_lhs = isinstance(lhs, GlobalFunction) and not lhs.name in supported
decomp_rhs = isinstance(rhs, GlobalFunction) and not rhs.name in supported
if not decomp_lhs:
if not decomp_rhs:
# nothing special, create a fresh version and recurse into arguments
expr = copy.copy(expr)
expr.update_args(decompose_in_tree(expr.args, supported, supported_reified, _toplevel, nested=True, csemap=csemap))
newlist.append(expr)
else:
# only rhs needs decomposition, so flip comparison to make lhs needing the decomposition
exprname = flipmap[expr.name]
lhs, rhs = rhs, lhs
decomp_lhs, decomp_rhs = True, False # continue into next 'if'
if decomp_lhs:
if lhs.has_subexpr():
# recurse into lhs args and decompose nested subexpressions
lhs = copy.copy(lhs)
lhs.update_args(decompose_in_tree(lhs.args, supported, supported_reified, _toplevel, nested=True, csemap=csemap))
# decompose comparison of lhs and rhs
dec = lhs.decompose_comparison(exprname, rhs)
if not isinstance(dec, tuple):
warnings.warn(f"Decomposing an old-style global ({lhs}) that does not return a tuple, which is "
f"deprecated. Support for old-style globals will be removed in stable version",
DeprecationWarning)
dec = (dec, [])
decomposed, define = dec
_toplevel.extend(define) # definitions should be added toplevel
# the `decomposed` expression (and rhs) might contain other global constraints, check it
decomposed = decompose_in_tree(decomposed, supported, supported_reified, _toplevel, nested=True, csemap=csemap)
newlist.append(all(decomposed))
else: # constants, variables, direct constraints
newlist.append(expr)
if nested:
return newlist
if len(_toplevel) == 0:
return toplevel_list(newlist)
else:
# we are toplevel and some new constraints are introduced, decompose new constraints!
return toplevel_list(newlist) + decompose_in_tree(toplevel_list(_toplevel), supported, supported_reified, nested=False, csemap=csemap)
# DEPRECATED!
# old way of doing decompositions post-flatten
# will be removed in any future version!
[docs]def decompose_global(lst_of_expr, supported=set(), supported_reif=set()):
"""
.. deprecated:: 0.9.16
Please use :func:`decompose_in_tree()` instead.
"""
warnings.warn("Deprecated, use `decompose_in_tree()` instead, will be removed in stable version", DeprecationWarning)
"""
DEPRECATED!!! USE `decompose_in_tree()` instead!
Decomposes any global constraint not supported by the solver
Accepts a list of flat constraints as input
Returns a list of flat constraints
- supported: a set of supported global constraints or global functions
- supported_reified: a set of supported global constraints within a reification
Unsupported global constraints are decomposed into equivalent simpler constraints.
Numerical global constraints are left reified even if not part of the `supported_reified` set
as they can be rewritten using `cpmpy.transformations.reification.reify_rewrite`...
... reified partial functions are decomposed when unsupported by `supported_reified`
The following `bv -> NumExpr <comp> Var/Const` can always be rewritten as ...
[bv -> IV0 <comp> Var/Const, NumExpr == IV0].
So even if numerical constraints are not supported in reified context, we can rewrite them to non-reified versions.
TODO: decide if we want to do the rewrite of unsupported globals here or leave it to `reify_rewrite`.
Currently, we left it for `reify_rewrite`
"""
def _is_supported(cpm_expr, reified):
if isinstance(cpm_expr, GlobalConstraint):
if reified and cpm_expr.name not in supported_reif:
return False
if not reified and cpm_expr.name not in supported:
return False
if isinstance(cpm_expr, Comparison) and isinstance(cpm_expr.args[0], GlobalFunction):
if not cpm_expr.args[0].name in supported:
# reified numerical global constraints can be rewritten to non-reified versions
# so only have to check for 'supported' set
return False
if reified and not cpm_expr.args[0].is_total() and cpm_expr.args[0].name not in supported_reif:
# edge case for partial functions as they cannot be rewritten to non-reified versions
# have to decompose to total functions
# ASSUMPTION: the decomposition for partial global functions is total! (for now only Element)
return False
return True
assert supported_reif <= supported, "`supported_reif` set is assumed to be a subset of the `supported` set"
if not is_any_list(lst_of_expr):
lst_of_expr= [lst_of_expr]
newlist = []
for cpm_expr in lst_of_expr:
assert isinstance(cpm_expr, Expression), f"Expected CPMpy expression but got {cpm_expr}, run 'cpmpy.transformations.normalize.toplevel_list' first"
decomp_idx = None
if hasattr(cpm_expr, "decompose") and not _is_supported(cpm_expr, reified=False):
cpm_expr = cpm_expr.decompose() # base boolean global constraints
elif isinstance(cpm_expr, Comparison):
lhs, rhs = cpm_expr.args
if cpm_expr.name == "==" and not _is_supported(lhs, reified=True): # rhs will always be const/var
decomp_idx = 0 # this means it is an unsupported reified boolean global
if not _is_supported(cpm_expr, reified=False):
cpm_expr = do_decompose(cpm_expr) # unsupported numerical constraint in comparison
decomp_idx = None
elif isinstance(cpm_expr, Operator) and cpm_expr.name == "->":
lhs, rhs = cpm_expr.args # BV -> Expr or Expr -> BV as flat normal form is required
if not _is_supported(rhs, reified=True):
decomp_idx = 1 # reified (numerical) global constraint, probably most frequent case
elif not _is_supported(lhs, reified=True):
decomp_idx = 0
if decomp_idx is not None:
cpm_expr = copy.deepcopy(cpm_expr) # need deepcopy as we are changing args of list inplace
cpm_expr.args[decomp_idx] = all(do_decompose(cpm_expr.args[decomp_idx]))
cpm_expr = [cpm_expr]
if isinstance(cpm_expr, list): # some decomposition happened, have to run again as decomp can contain new global
flat = flatten_constraint(cpm_expr) # most of the time will already be flat... do check here?
newlist.extend(decompose_global(flat, supported=supported))
else:
# default
newlist.append(cpm_expr)
return newlist
[docs]def do_decompose(cpm_expr):
"""
.. deprecated:: 0.9.13
Please use :func:`decompose_in_tree()` instead.
"""
warnings.warn("Deprecated, never meant to be used outside this transformation; will be removed in stable version",
DeprecationWarning)
"""
DEPRECATED
Helper function for decomposing global constraints
- cpm_expr: Global constraint or comparison containing a global constraint on lhs
"""
if isinstance(cpm_expr, Comparison):
lhs, rhs = cpm_expr.args
if lhs.is_bool():
return [eval_comparison(cpm_expr.name, all(lhs.decompose()), rhs)]
else:
dec = lhs.decompose_comparison(cpm_expr.name, rhs)
# new style is a tuple of size 2, old style is a single list (which captures less cases)
if isinstance(dec, tuple):
dec = dec[0]+dec[1]
return dec
dec = cpm_expr.decompose()
# new style is a tuple of size 2, old style is a single list (which captures less cases)
if isinstance(dec, tuple):
dec = dec[0]+dec[1]
return dec