You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
335 lines
8.9 KiB
Python
335 lines
8.9 KiB
Python
from __future__ import with_statement
|
|
|
|
import operator
|
|
from nose import SkipTest
|
|
from ..util import decorator
|
|
from . import config
|
|
from .. import util
|
|
import contextlib
|
|
|
|
|
|
class skip_if(object):
|
|
def __init__(self, predicate, reason=None):
|
|
self.predicate = _as_predicate(predicate)
|
|
self.reason = reason
|
|
|
|
_fails_on = None
|
|
|
|
@property
|
|
def enabled(self):
|
|
return not self.predicate()
|
|
|
|
@contextlib.contextmanager
|
|
def fail_if(self, name='block'):
|
|
try:
|
|
yield
|
|
except Exception, ex:
|
|
if self.predicate():
|
|
print ("%s failed as expected (%s): %s " % (
|
|
name, self.predicate, str(ex)))
|
|
else:
|
|
raise
|
|
else:
|
|
if self.predicate():
|
|
raise AssertionError(
|
|
"Unexpected success for '%s' (%s)" %
|
|
(name, self.predicate))
|
|
|
|
def __call__(self, fn):
|
|
@decorator
|
|
def decorate(fn, *args, **kw):
|
|
if self.predicate():
|
|
if self.reason:
|
|
msg = "'%s' : %s" % (
|
|
fn.__name__,
|
|
self.reason
|
|
)
|
|
else:
|
|
msg = "'%s': %s" % (
|
|
fn.__name__, self.predicate
|
|
)
|
|
raise SkipTest(msg)
|
|
else:
|
|
if self._fails_on:
|
|
with self._fails_on.fail_if(name=fn.__name__):
|
|
return fn(*args, **kw)
|
|
else:
|
|
return fn(*args, **kw)
|
|
return decorate(fn)
|
|
|
|
def fails_on(self, other, reason=None):
|
|
self._fails_on = skip_if(other, reason)
|
|
return self
|
|
|
|
|
|
class fails_if(skip_if):
|
|
def __call__(self, fn):
|
|
@decorator
|
|
def decorate(fn, *args, **kw):
|
|
with self.fail_if(name=fn.__name__):
|
|
return fn(*args, **kw)
|
|
return decorate(fn)
|
|
|
|
|
|
def only_if(predicate, reason=None):
|
|
predicate = _as_predicate(predicate)
|
|
return skip_if(NotPredicate(predicate), reason)
|
|
|
|
|
|
def succeeds_if(predicate, reason=None):
|
|
predicate = _as_predicate(predicate)
|
|
return fails_if(NotPredicate(predicate), reason)
|
|
|
|
|
|
class Predicate(object):
|
|
@classmethod
|
|
def as_predicate(cls, predicate):
|
|
if isinstance(predicate, skip_if):
|
|
return predicate.predicate
|
|
elif isinstance(predicate, Predicate):
|
|
return predicate
|
|
elif isinstance(predicate, list):
|
|
return OrPredicate([cls.as_predicate(pred) for pred in predicate])
|
|
elif isinstance(predicate, tuple):
|
|
return SpecPredicate(*predicate)
|
|
elif isinstance(predicate, basestring):
|
|
return SpecPredicate(predicate, None, None)
|
|
elif util.callable(predicate):
|
|
return LambdaPredicate(predicate)
|
|
else:
|
|
assert False, "unknown predicate type: %s" % predicate
|
|
|
|
|
|
class BooleanPredicate(Predicate):
|
|
def __init__(self, value, description=None):
|
|
self.value = value
|
|
self.description = description or "boolean %s" % value
|
|
|
|
def __call__(self):
|
|
return self.value
|
|
|
|
def _as_string(self, negate=False):
|
|
if negate:
|
|
return "not " + self.description
|
|
else:
|
|
return self.description
|
|
|
|
def __str__(self):
|
|
return self._as_string()
|
|
|
|
|
|
class SpecPredicate(Predicate):
|
|
def __init__(self, db, op=None, spec=None, description=None):
|
|
self.db = db
|
|
self.op = op
|
|
self.spec = spec
|
|
self.description = description
|
|
|
|
_ops = {
|
|
'<': operator.lt,
|
|
'>': operator.gt,
|
|
'==': operator.eq,
|
|
'!=': operator.ne,
|
|
'<=': operator.le,
|
|
'>=': operator.ge,
|
|
'in': operator.contains,
|
|
'between': lambda val, pair: val >= pair[0] and val <= pair[1],
|
|
}
|
|
|
|
def __call__(self, engine=None):
|
|
if engine is None:
|
|
engine = config.db
|
|
|
|
if "+" in self.db:
|
|
dialect, driver = self.db.split('+')
|
|
else:
|
|
dialect, driver = self.db, None
|
|
|
|
if dialect and engine.name != dialect:
|
|
return False
|
|
if driver is not None and engine.driver != driver:
|
|
return False
|
|
|
|
if self.op is not None:
|
|
assert driver is None, "DBAPI version specs not supported yet"
|
|
|
|
version = _server_version(engine)
|
|
oper = hasattr(self.op, '__call__') and self.op \
|
|
or self._ops[self.op]
|
|
return oper(version, self.spec)
|
|
else:
|
|
return True
|
|
|
|
def _as_string(self, negate=False):
|
|
if self.description is not None:
|
|
return self.description
|
|
elif self.op is None:
|
|
if negate:
|
|
return "not %s" % self.db
|
|
else:
|
|
return "%s" % self.db
|
|
else:
|
|
if negate:
|
|
return "not %s %s %s" % (
|
|
self.db,
|
|
self.op,
|
|
self.spec
|
|
)
|
|
else:
|
|
return "%s %s %s" % (
|
|
self.db,
|
|
self.op,
|
|
self.spec
|
|
)
|
|
|
|
def __str__(self):
|
|
return self._as_string()
|
|
|
|
|
|
class LambdaPredicate(Predicate):
|
|
def __init__(self, lambda_, description=None, args=None, kw=None):
|
|
self.lambda_ = lambda_
|
|
self.args = args or ()
|
|
self.kw = kw or {}
|
|
if description:
|
|
self.description = description
|
|
elif lambda_.__doc__:
|
|
self.description = lambda_.__doc__
|
|
else:
|
|
self.description = "custom function"
|
|
|
|
def __call__(self):
|
|
return self.lambda_(*self.args, **self.kw)
|
|
|
|
def _as_string(self, negate=False):
|
|
if negate:
|
|
return "not " + self.description
|
|
else:
|
|
return self.description
|
|
|
|
def __str__(self):
|
|
return self._as_string()
|
|
|
|
|
|
class NotPredicate(Predicate):
|
|
def __init__(self, predicate):
|
|
self.predicate = predicate
|
|
|
|
def __call__(self, *arg, **kw):
|
|
return not self.predicate(*arg, **kw)
|
|
|
|
def __str__(self):
|
|
return self.predicate._as_string(True)
|
|
|
|
|
|
class OrPredicate(Predicate):
|
|
def __init__(self, predicates, description=None):
|
|
self.predicates = predicates
|
|
self.description = description
|
|
|
|
def __call__(self, *arg, **kw):
|
|
for pred in self.predicates:
|
|
if pred(*arg, **kw):
|
|
self._str = pred
|
|
return True
|
|
return False
|
|
|
|
_str = None
|
|
|
|
def _eval_str(self, negate=False):
|
|
if self._str is None:
|
|
if negate:
|
|
conjunction = " and "
|
|
else:
|
|
conjunction = " or "
|
|
return conjunction.join(p._as_string(negate=negate)
|
|
for p in self.predicates)
|
|
else:
|
|
return self._str._as_string(negate=negate)
|
|
|
|
def _negation_str(self):
|
|
if self.description is not None:
|
|
return "Not " + (self.description % {"spec": self._str})
|
|
else:
|
|
return self._eval_str(negate=True)
|
|
|
|
def _as_string(self, negate=False):
|
|
if negate:
|
|
return self._negation_str()
|
|
else:
|
|
if self.description is not None:
|
|
return self.description % {"spec": self._str}
|
|
else:
|
|
return self._eval_str()
|
|
|
|
def __str__(self):
|
|
return self._as_string()
|
|
|
|
_as_predicate = Predicate.as_predicate
|
|
|
|
|
|
def _is_excluded(db, op, spec):
|
|
return SpecPredicate(db, op, spec)()
|
|
|
|
|
|
def _server_version(engine):
|
|
"""Return a server_version_info tuple."""
|
|
|
|
# force metadata to be retrieved
|
|
conn = engine.connect()
|
|
version = getattr(engine.dialect, 'server_version_info', ())
|
|
conn.close()
|
|
return version
|
|
|
|
|
|
def db_spec(*dbs):
|
|
return OrPredicate(
|
|
Predicate.as_predicate(db) for db in dbs
|
|
)
|
|
|
|
|
|
def open():
|
|
return skip_if(BooleanPredicate(False, "mark as execute"))
|
|
|
|
|
|
def closed():
|
|
return skip_if(BooleanPredicate(True, "marked as skip"))
|
|
|
|
|
|
@decorator
|
|
def future(fn, *args, **kw):
|
|
return fails_if(LambdaPredicate(fn, *args, **kw), "Future feature")
|
|
|
|
|
|
def fails_on(db, reason=None):
|
|
return fails_if(SpecPredicate(db), reason)
|
|
|
|
|
|
def fails_on_everything_except(*dbs):
|
|
return succeeds_if(
|
|
OrPredicate([
|
|
SpecPredicate(db) for db in dbs
|
|
])
|
|
)
|
|
|
|
|
|
def skip(db, reason=None):
|
|
return skip_if(SpecPredicate(db), reason)
|
|
|
|
|
|
def only_on(dbs, reason=None):
|
|
return only_if(
|
|
OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
|
|
)
|
|
|
|
|
|
def exclude(db, op, spec, reason=None):
|
|
return skip_if(SpecPredicate(db, op, spec), reason)
|
|
|
|
|
|
def against(*queries):
|
|
return OrPredicate([
|
|
Predicate.as_predicate(query)
|
|
for query in queries
|
|
])()
|