You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

212 lines
6.1 KiB
Python

5 years ago
#! coding: utf-8
from .. import assert_raises
from .. import config
from .. import eq_
from .. import fixtures
from .. import ne_
from .. import provide_metadata
from ..config import requirements
from ..schema import Column
from ..schema import Table
from ... import exc
from ... import Integer
from ... import literal_column
from ... import select
from ... import String
from ...util import compat
class ExceptionTest(fixtures.TablesTest):
"""Test basic exception wrapping.
DBAPIs vary a lot in exception behavior so to actually anticipate
specific exceptions from real round trips, we need to be conservative.
"""
run_deletes = "each"
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"manual_pk",
metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("data", String(50)),
)
@requirements.duplicate_key_raises_integrity_error
def test_integrity_error(self):
with config.db.connect() as conn:
trans = conn.begin()
conn.execute(
self.tables.manual_pk.insert(), {"id": 1, "data": "d1"}
)
assert_raises(
exc.IntegrityError,
conn.execute,
self.tables.manual_pk.insert(),
{"id": 1, "data": "d1"},
)
trans.rollback()
def test_exception_with_non_ascii(self):
with config.db.connect() as conn:
try:
# try to create an error message that likely has non-ascii
# characters in the DBAPI's message string. unfortunately
# there's no way to make this happen with some drivers like
# mysqlclient, pymysql. this at least does produce a non-
# ascii error message for cx_oracle, psycopg2
conn.execute(select([literal_column(u"méil")]))
assert False
except exc.DBAPIError as err:
err_str = str(err)
assert str(err.orig) in str(err)
# test that we are actually getting string on Py2k, unicode
# on Py3k.
if compat.py2k:
assert isinstance(err_str, str)
else:
assert isinstance(err_str, str)
class IsolationLevelTest(fixtures.TestBase):
__backend__ = True
__requires__ = ("isolation_level",)
def _get_non_default_isolation_level(self):
levels = requirements.get_isolation_levels(config)
default = levels["default"]
supported = levels["supported"]
s = set(supported).difference(["AUTOCOMMIT", default])
if s:
return s.pop()
else:
config.skip_test("no non-default isolation level available")
def test_default_isolation_level(self):
eq_(
config.db.dialect.default_isolation_level,
requirements.get_isolation_levels(config)["default"],
)
def test_non_default_isolation_level(self):
non_default = self._get_non_default_isolation_level()
with config.db.connect() as conn:
existing = conn.get_isolation_level()
ne_(existing, non_default)
conn.execution_options(isolation_level=non_default)
eq_(conn.get_isolation_level(), non_default)
conn.dialect.reset_isolation_level(conn.connection)
eq_(conn.get_isolation_level(), existing)
class AutocommitTest(fixtures.TablesTest):
run_deletes = "each"
__requires__ = ("autocommit",)
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table(
"some_table",
metadata,
Column("id", Integer, primary_key=True, autoincrement=False),
Column("data", String(50)),
test_needs_acid=True,
)
def _test_conn_autocommits(self, conn, autocommit):
trans = conn.begin()
conn.execute(
self.tables.some_table.insert(), {"id": 1, "data": "some data"}
)
trans.rollback()
eq_(
conn.scalar(select([self.tables.some_table.c.id])),
1 if autocommit else None,
)
conn.execute(self.tables.some_table.delete())
def test_autocommit_on(self):
conn = config.db.connect()
c2 = conn.execution_options(isolation_level="AUTOCOMMIT")
self._test_conn_autocommits(c2, True)
c2.dialect.reset_isolation_level(c2.connection)
self._test_conn_autocommits(conn, False)
def test_autocommit_off(self):
conn = config.db.connect()
self._test_conn_autocommits(conn, False)
def test_turn_autocommit_off_via_default_iso_level(self):
conn = config.db.connect()
conn.execution_options(isolation_level="AUTOCOMMIT")
self._test_conn_autocommits(conn, True)
conn.execution_options(
isolation_level=requirements.get_isolation_levels(config)[
"default"
]
)
self._test_conn_autocommits(conn, False)
class EscapingTest(fixtures.TestBase):
@provide_metadata
def test_percent_sign_round_trip(self):
"""test that the DBAPI accommodates for escaped / nonescaped
percent signs in a way that matches the compiler
"""
m = self.metadata
t = Table("t", m, Column("data", String(50)))
t.create(config.db)
with config.db.begin() as conn:
conn.execute(t.insert(), dict(data="some % value"))
conn.execute(t.insert(), dict(data="some %% other value"))
eq_(
conn.scalar(
select([t.c.data]).where(
t.c.data == literal_column("'some % value'")
)
),
"some % value",
)
eq_(
conn.scalar(
select([t.c.data]).where(
t.c.data == literal_column("'some %% other value'")
)
),
"some %% other value",
)