# postgresql/psycopg2.py # Copyright (C) 2005-2013 the SQLAlchemy authors and contributors # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php """ .. dialect:: postgresql+psycopg2 :name: psycopg2 :dbapi: psycopg2 :connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...] :url: http://pypi.python.org/pypi/psycopg2/ psycopg2 Connect Arguments ----------------------------------- psycopg2-specific keyword arguments which are accepted by :func:`.create_engine()` are: * ``server_side_cursors``: Enable the usage of "server side cursors" for SQL statements which support this feature. What this essentially means from a psycopg2 point of view is that the cursor is created using a name, e.g. ``connection.cursor('some name')``, which has the effect that result rows are not immediately pre-fetched and buffered after statement execution, but are instead left on the server and only retrieved as needed. SQLAlchemy's :class:`~sqlalchemy.engine.ResultProxy` uses special row-buffering behavior when this feature is enabled, such that groups of 100 rows at a time are fetched over the wire to reduce conversational overhead. Note that the ``stream_results=True`` execution option is a more targeted way of enabling this mode on a per-execution basis. * ``use_native_unicode``: Enable the usage of Psycopg2 "native unicode" mode per connection. True by default. * ``isolation_level``: This option, available for all Posgtresql dialects, includes the ``AUTOCOMMIT`` isolation level when using the psycopg2 dialect. See :ref:`psycopg2_isolation_level`. Unix Domain Connections ------------------------ psycopg2 supports connecting via Unix domain connections. When the ``host`` portion of the URL is omitted, SQLAlchemy passes ``None`` to psycopg2, which specifies Unix-domain communication rather than TCP/IP communication:: create_engine("postgresql+psycopg2://user:password@/dbname") By default, the socket file used is to connect to a Unix-domain socket in ``/tmp``, or whatever socket directory was specified when PostgreSQL was built. This value can be overridden by passing a pathname to psycopg2, using ``host`` as an additional keyword argument:: create_engine("postgresql+psycopg2://user:password@/dbname?host=/var/lib/postgresql") See also: `PQconnectdbParams `_ Per-Statement/Connection Execution Options ------------------------------------------- The following DBAPI-specific options are respected when used with :meth:`.Connection.execution_options`, :meth:`.Executable.execution_options`, :meth:`.Query.execution_options`, in addition to those not specific to DBAPIs: * isolation_level - Set the transaction isolation level for the lifespan of a :class:`.Connection` (can only be set on a connection, not a statement or query). See :ref:`psycopg2_isolation_level`. * stream_results - Enable or disable usage of psycopg2 server side cursors - this feature makes use of "named" cursors in combination with special result handling methods so that result rows are not fully buffered. If ``None`` or not set, the ``server_side_cursors`` option of the :class:`.Engine` is used. Unicode ------- By default, the psycopg2 driver uses the ``psycopg2.extensions.UNICODE`` extension, such that the DBAPI receives and returns all strings as Python Unicode objects directly - SQLAlchemy passes these values through without change. Psycopg2 here will encode/decode string values based on the current "client encoding" setting; by default this is the value in the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. Typically, this can be changed to ``utf-8``, as a more useful default:: #client_encoding = sql_ascii # actually, defaults to database # encoding client_encoding = utf8 A second way to affect the client encoding is to set it within Psycopg2 locally. SQLAlchemy will call psycopg2's ``set_client_encoding()`` method (see: http://initd.org/psycopg/docs/connection.html#connection.set_client_encoding) on all new connections based on the value passed to :func:`.create_engine` using the ``client_encoding`` parameter:: engine = create_engine("postgresql://user:pass@host/dbname", client_encoding='utf8') This overrides the encoding specified in the Postgresql client configuration. .. versionadded:: 0.7.3 The psycopg2-specific ``client_encoding`` parameter to :func:`.create_engine`. SQLAlchemy can also be instructed to skip the usage of the psycopg2 ``UNICODE`` extension and to instead utilize it's own unicode encode/decode services, which are normally reserved only for those DBAPIs that don't fully support unicode directly. Passing ``use_native_unicode=False`` to :func:`.create_engine` will disable usage of ``psycopg2.extensions.UNICODE``. SQLAlchemy will instead encode data itself into Python bytestrings on the way in and coerce from bytes on the way back, using the value of the :func:`.create_engine` ``encoding`` parameter, which defaults to ``utf-8``. SQLAlchemy's own unicode encode/decode functionality is steadily becoming obsolete as more DBAPIs support unicode fully along with the approach of Python 3; in modern usage psycopg2 should be relied upon to handle unicode. Transactions ------------ The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations. .. _psycopg2_isolation_level: Psycopg2 Transaction Isolation Level ------------------------------------- As discussed in :ref:`postgresql_isolation_level`, all Postgresql dialects support setting of transaction isolation level both via the ``isolation_level`` parameter passed to :func:`.create_engine`, as well as the ``isolation_level`` argument used by :meth:`.Connection.execution_options`. When using the psycopg2 dialect, these options make use of psycopg2's ``set_isolation_level()`` connection method, rather than emitting a Postgresql directive; this is because psycopg2's API-level setting is always emitted at the start of each transaction in any case. The psycopg2 dialect supports these constants for isolation level: * ``READ COMMITTED`` * ``READ UNCOMMITTED`` * ``REPEATABLE READ`` * ``SERIALIZABLE`` * ``AUTOCOMMIT`` .. versionadded:: 0.8.2 support for AUTOCOMMIT isolation level when using psycopg2. NOTICE logging --------------- The psycopg2 dialect will log Postgresql NOTICE messages via the ``sqlalchemy.dialects.postgresql`` logger:: import logging logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO) HSTORE type ------------ The psycopg2 dialect will make use of the ``psycopg2.extensions.register_hstore()`` extension when using the HSTORE type. This replaces SQLAlchemy's pure-Python HSTORE coercion which takes effect for other DBAPIs. """ from __future__ import absolute_import import re import logging from ... import util, exc import decimal from ... import processors from ...engine import result as _result from ...sql import expression from ... import types as sqltypes from .base import PGDialect, PGCompiler, \ PGIdentifierPreparer, PGExecutionContext, \ ENUM, ARRAY, _DECIMAL_TYPES, _FLOAT_TYPES,\ _INT_TYPES from .hstore import HSTORE logger = logging.getLogger('sqlalchemy.dialects.postgresql') class _PGNumeric(sqltypes.Numeric): def bind_processor(self, dialect): return None def result_processor(self, dialect, coltype): if self.asdecimal: if coltype in _FLOAT_TYPES: return processors.to_decimal_processor_factory(decimal.Decimal) elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: # pg8000 returns Decimal natively for 1700 return None else: raise exc.InvalidRequestError( "Unknown PG numeric type: %d" % coltype) else: if coltype in _FLOAT_TYPES: # pg8000 returns float natively for 701 return None elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: return processors.to_float else: raise exc.InvalidRequestError( "Unknown PG numeric type: %d" % coltype) class _PGEnum(ENUM): def __init__(self, *arg, **kw): super(_PGEnum, self).__init__(*arg, **kw) # Py2K if self.convert_unicode: self.convert_unicode = "force" # end Py2K class _PGArray(ARRAY): def __init__(self, *arg, **kw): super(_PGArray, self).__init__(*arg, **kw) # Py2K # FIXME: this check won't work for setups that # have convert_unicode only on their create_engine(). if isinstance(self.item_type, sqltypes.String) and \ self.item_type.convert_unicode: self.item_type.convert_unicode = "force" # end Py2K class _PGHStore(HSTORE): def bind_processor(self, dialect): if dialect._has_native_hstore: return None else: return super(_PGHStore, self).bind_processor(dialect) def result_processor(self, dialect, coltype): if dialect._has_native_hstore: return None else: return super(_PGHStore, self).result_processor(dialect, coltype) # When we're handed literal SQL, ensure it's a SELECT-query. Since # 8.3, combining cursors and "FOR UPDATE" has been fine. SERVER_SIDE_CURSOR_RE = re.compile( r'\s*SELECT', re.I | re.UNICODE) _server_side_id = util.counter() class PGExecutionContext_psycopg2(PGExecutionContext): def create_cursor(self): # TODO: coverage for server side cursors + select.for_update() if self.dialect.server_side_cursors: is_server_side = \ self.execution_options.get('stream_results', True) and ( (self.compiled and isinstance(self.compiled.statement, expression.Selectable) \ or \ ( (not self.compiled or isinstance(self.compiled.statement, expression.TextClause)) and self.statement and SERVER_SIDE_CURSOR_RE.match(self.statement)) ) ) else: is_server_side = \ self.execution_options.get('stream_results', False) self.__is_server_side = is_server_side if is_server_side: # use server-side cursors: # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) return self._dbapi_connection.cursor(ident) else: return self._dbapi_connection.cursor() def get_result_proxy(self): # TODO: ouch if logger.isEnabledFor(logging.INFO): self._log_notices(self.cursor) if self.__is_server_side: return _result.BufferedRowResultProxy(self) else: return _result.ResultProxy(self) def _log_notices(self, cursor): for notice in cursor.connection.notices: # NOTICE messages have a # newline character at the end logger.info(notice.rstrip()) cursor.connection.notices[:] = [] class PGCompiler_psycopg2(PGCompiler): def visit_mod_binary(self, binary, operator, **kw): return self.process(binary.left, **kw) + " %% " + \ self.process(binary.right, **kw) def post_process_text(self, text): return text.replace('%', '%%') class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): def _escape_identifier(self, value): value = value.replace(self.escape_quote, self.escape_to_quote) return value.replace('%', '%%') class PGDialect_psycopg2(PGDialect): driver = 'psycopg2' # Py2K supports_unicode_statements = False # end Py2K default_paramstyle = 'pyformat' supports_sane_multi_rowcount = False execution_ctx_cls = PGExecutionContext_psycopg2 statement_compiler = PGCompiler_psycopg2 preparer = PGIdentifierPreparer_psycopg2 psycopg2_version = (0, 0) _has_native_hstore = False colspecs = util.update_copy( PGDialect.colspecs, { sqltypes.Numeric: _PGNumeric, ENUM: _PGEnum, # needs force_unicode sqltypes.Enum: _PGEnum, # needs force_unicode ARRAY: _PGArray, # needs force_unicode HSTORE: _PGHStore, } ) def __init__(self, server_side_cursors=False, use_native_unicode=True, client_encoding=None, use_native_hstore=True, **kwargs): PGDialect.__init__(self, **kwargs) self.server_side_cursors = server_side_cursors self.use_native_unicode = use_native_unicode self.use_native_hstore = use_native_hstore self.supports_unicode_binds = use_native_unicode self.client_encoding = client_encoding if self.dbapi and hasattr(self.dbapi, '__version__'): m = re.match(r'(\d+)\.(\d+)(?:\.(\d+))?', self.dbapi.__version__) if m: self.psycopg2_version = tuple( int(x) for x in m.group(1, 2, 3) if x is not None) def initialize(self, connection): super(PGDialect_psycopg2, self).initialize(connection) self._has_native_hstore = self.use_native_hstore and \ self._hstore_oids(connection.connection) \ is not None @classmethod def dbapi(cls): import psycopg2 return psycopg2 @util.memoized_property def _isolation_lookup(self): extensions = __import__('psycopg2.extensions').extensions return { 'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT, 'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED, 'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, 'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ, 'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE } def set_isolation_level(self, connection, level): try: level = self._isolation_lookup[level.replace('_', ' ')] except KeyError: raise exc.ArgumentError( "Invalid value '%s' for isolation_level. " "Valid isolation levels for %s are %s" % (level, self.name, ", ".join(self._isolation_lookup)) ) connection.set_isolation_level(level) def on_connect(self): from psycopg2 import extras, extensions fns = [] if self.client_encoding is not None: def on_connect(conn): conn.set_client_encoding(self.client_encoding) fns.append(on_connect) if self.isolation_level is not None: def on_connect(conn): self.set_isolation_level(conn, self.isolation_level) fns.append(on_connect) if self.dbapi and self.use_native_unicode: def on_connect(conn): extensions.register_type(extensions.UNICODE, conn) fns.append(on_connect) if self.dbapi and self.use_native_hstore: def on_connect(conn): hstore_oids = self._hstore_oids(conn) if hstore_oids is not None: oid, array_oid = hstore_oids if util.py2k: extras.register_hstore(conn, oid=oid, array_oid=array_oid, unicode=True) else: extras.register_hstore(conn, oid=oid, array_oid=array_oid) fns.append(on_connect) if fns: def on_connect(conn): for fn in fns: fn(conn) return on_connect else: return None @util.memoized_instancemethod def _hstore_oids(self, conn): if self.psycopg2_version >= (2, 4): from psycopg2 import extras oids = extras.HstoreAdapter.get_oids(conn) if oids is not None and oids[0]: return oids[0:2] return None def create_connect_args(self, url): opts = url.translate_connect_args(username='user') if 'port' in opts: opts['port'] = int(opts['port']) opts.update(url.query) return ([], opts) def is_disconnect(self, e, connection, cursor): if isinstance(e, self.dbapi.Error): str_e = str(e).partition("\n")[0] for msg in [ # these error messages from libpq: interfaces/libpq/fe-misc.c # and interfaces/libpq/fe-secure.c. # TODO: these are sent through gettext in libpq and we can't # check within other locales - consider using connection.closed 'terminating connection', 'closed the connection', 'connection not open', 'could not receive data from server', # psycopg2 client errors, psycopg2/conenction.h, psycopg2/cursor.h 'connection already closed', 'cursor already closed', # not sure where this path is originally from, it may # be obsolete. It really says "losed", not "closed". 'losed the connection unexpectedly' ]: idx = str_e.find(msg) if idx >= 0 and '"' not in str_e[:idx]: return True return False dialect = PGDialect_psycopg2