Skip to content

simplify pool recycle logic #2985

@sqlalchemy-bot

Description

@sqlalchemy-bot

Migrated issue, originally created by Michael Bayer (@zzzeek)

using a simple invalidation time we can do away with all the "pool replacement" logic. the current logic is subject to a pretty obvious race condition, where as many connections all hit a disconnect wall, all of the Connection objects hosting them will simultaneously call upon self.engine.dispose(). this means we could have N pools generated and immediately chucked within a disconnect cycle.

the patch below removes all of that and replaces with a simple timeout which incurs no overhead and no race conditions. the only difference is that the "bad" connections hang around until they are invalidated on checkout.

diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index 888a15f..20b5227 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -1084,9 +1084,7 @@ class Connection(Connectable):
                 del self._is_disconnect
                 dbapi_conn_wrapper = self.connection
                 self.invalidate(e)
-                if not hasattr(dbapi_conn_wrapper, '_pool') or \
-                        dbapi_conn_wrapper._pool is self.engine.pool:
-                    self.engine.dispose()
+                self.engine.pool._invalidate(dbapi_conn_wrapper)
             if self.should_close_with_result:
                 self.close()
 
@@ -1496,7 +1494,7 @@ class Engine(Connectable, log.Identified):
         the engine are not affected.
 
         """
-        self.pool = self.pool._replace()
+        self.pool.dispose()
 
     def _execute_default(self, default):
         with self.contextual_connect() as conn:
diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py
index 473b665..4a07e78 100644
--- a/lib/sqlalchemy/orm/strategies.py
+++ b/lib/sqlalchemy/orm/strategies.py
@@ -528,7 +528,6 @@ class LazyLoader(AbstractRelationshipLoader):
     def _emit_lazyload(self, strategy_options, session, state, ident_key, passive):
         q = session.query(self.mapper)._adapt_all_clauses()
 
-
         if self.parent_property.secondary is not None:
             q = q.select_from(self.mapper, self.parent_property.secondary)
 
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index af9b8fc..f78825e 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -210,6 +210,7 @@ class Pool(log.Identified):
         self._threadconns = threading.local()
         self._creator = creator
         self._recycle = recycle
+        self._invalidate_time = 0
         self._use_threadlocal = use_threadlocal
         if reset_on_return in ('rollback', True, reset_rollback):
             self._reset_on_return = reset_rollback
@@ -276,6 +277,22 @@ class Pool(log.Identified):
 
         return _ConnectionRecord(self)
 
+    def _invalidate(self, connection):
+        """Mark all connections established within the generation
+        of the given connection as invalidated.
+
+        If this pool's last invalidate time is before when the given
+        connection was created, update the timestamp til now.  Otherwise,
+        no action is performed.
+
+        Connections with a start time prior to this pool's invalidation
+        time will be recycled upon next checkout.
+        """
+        rec = getattr(connection, "_connection_record", None)
+        if not rec or self._invalidate_time < rec.starttime:
+            self._invalidate_time = time.time()
+
+
     def recreate(self):
         """Return a new :class:`.Pool`, of the same class as this one
         and configured with identical creation arguments.
@@ -301,17 +318,6 @@ class Pool(log.Identified):
 
         raise NotImplementedError()
 
-    def _replace(self):
-        """Dispose + recreate this pool.
-
-        Subclasses may employ special logic to
-        move threads waiting on this pool to the
-        new one.
-
-        """
-        self.dispose()
-        return self.recreate()
-
     def connect(self):
         """Return a DBAPI connection from the pool.
 
@@ -483,6 +489,7 @@ class _ConnectionRecord(object):
         self.connection = None
 
     def get_connection(self):
+        recycle = False
         if self.connection is None:
             self.connection = self.__connect()
             self.info.clear()
@@ -493,6 +500,15 @@ class _ConnectionRecord(object):
             self.__pool.logger.info(
                     "Connection %r exceeded timeout; recycling",
                     self.connection)
+            recycle = True
+        elif self.__pool._invalidate_time > self.starttime:
+            self.__pool.logger.info(
+                    "Connection %r invalidated due to pool invalidation; recycling",
+                    self.connection
+                    )
+            recycle = True
+
+        if recycle:
             self.__close()
             self.connection = self.__connect()
             self.info.clear()
@@ -911,8 +927,6 @@ class QueuePool(Pool):
         try:
             wait = use_overflow and self._overflow >= self._max_overflow
             return self._pool.get(wait, self._timeout)
-        except sqla_queue.SAAbort as aborted:
-            return aborted.context._do_get()
         except sqla_queue.Empty:
             if use_overflow and self._overflow >= self._max_overflow:
                 if not wait:
@@ -974,12 +988,6 @@ class QueuePool(Pool):
         self._overflow = 0 - self.size()
         self.logger.info("Pool disposed. %s", self.status())
 
-    def _replace(self):
-        self.dispose()
-        np = self.recreate()
-        self._pool.abort(np)
-        return np
-
     def status(self):
         return "Pool size: %d  Connections in pool: %d "\
                 "Current Overflow: %d Current Checked out "\
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index fc6f3dc..cde19b3 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -1069,7 +1069,8 @@ class QueuePoolTest(PoolTestBase):
                 # inside the queue, before we invalidate the other
                 # two conns
                 time.sleep(.2)
-                p2 = p._replace()
+                p._invalidate(c2)
+                c2.invalidate()
 
                 for t in threads:
                     t.join(join_timeout)
@@ -1079,19 +1080,18 @@ class QueuePoolTest(PoolTestBase):
     @testing.requires.threading_with_mock
     def test_notify_waiters(self):
         dbapi = MockDBAPI()
+
         canary = []
-        def creator1():
+        def creator():
             canary.append(1)
             return dbapi.connect()
-        def creator2():
-            canary.append(2)
-            return dbapi.connect()
-        p1 = pool.QueuePool(creator=creator1,
+        p1 = pool.QueuePool(creator=creator,
                            pool_size=1, timeout=None,
                            max_overflow=0)
-        p2 = pool.NullPool(creator=creator2)
+        #p2 = pool.NullPool(creator=creator2)
         def waiter(p):
             conn = p.connect()
+            canary.append(2)
             time.sleep(.5)
             conn.close()
 
@@ -1104,12 +1104,14 @@ class QueuePoolTest(PoolTestBase):
             threads.append(t)
         time.sleep(.5)
         eq_(canary, [1])
-        p1._pool.abort(p2)
+
+        c1.invalidate()
+        p1._invalidate(c1)
 
         for t in threads:
             t.join(join_timeout)
 
-        eq_(canary, [1, 2, 2, 2, 2, 2])
+        eq_(canary, [1, 1, 2, 2, 2, 2, 2])
 
     def test_dispose_closes_pooled(self):
         dbapi = MockDBAPI()
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index ba336a1..a3ad9c5 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -146,16 +146,20 @@ class MockReconnectTest(fixtures.TestBase):
         # close shouldnt break
 
         conn.close()
-        is_not_(self.db.pool, db_pool)
-
-        # ensure all connections closed (pool was recycled)
 
+        # ensure one connection closed...
         eq_(
             [c.close.mock_calls for c in self.dbapi.connections],
-            [[call()], [call()]]
+            [[call()], []]
         )
 
         conn = self.db.connect()
+
+        eq_(
+            [c.close.mock_calls for c in self.dbapi.connections],
+            [[call()], [call()], []]
+        )
+
         conn.execute(select([1]))
         conn.close()
 
@@ -534,8 +538,6 @@ class RealReconnectTest(fixtures.TestBase):
         # invalidate() also doesn't screw up
         assert_raises(exc.DBAPIError, engine.connect)
 
-        # pool was recreated
-        assert engine.pool is not p1
 
     def test_null_pool(self):
         engine = \

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions