QGIS API Documentation 3.43.0-Master (3ee7834ace6)
qgsconnectionpool.h
Go to the documentation of this file.
1/***************************************************************************
2 qgsconnectionpool.h
3 ---------------------
4 begin : February 2014
5 copyright : (C) 2014 by Martin Dobias
6 email : wonder dot sk at gmail dot com
7 ***************************************************************************
8 * *
9 * This program is free software; you can redistribute it and/or modify *
10 * it under the terms of the GNU General Public License as published by *
11 * the Free Software Foundation; either version 2 of the License, or *
12 * (at your option) any later version. *
13 * *
14 ***************************************************************************/
15
16#ifndef QGSCONNECTIONPOOL_H
17#define QGSCONNECTIONPOOL_H
18
19#define SIP_NO_FILE
20
21#include "qgis.h"
22#include "qgsapplication.h"
23#include "qgsfeedback.h"
24#include "qgslogger.h"
25
26#include <QCoreApplication>
27#include <QMap>
28#include <QMutex>
29#include <QSemaphore>
30#include <QStack>
31#include <QTime>
32#include <QTimer>
33#include <QThread>
34#include <QElapsedTimer>
35
36#define CONN_POOL_EXPIRATION_TIME 60 // in seconds
37#define CONN_POOL_SPARE_CONNECTIONS 2 // number of spare connections in case all the base connections are used but we have a nested request with the risk of a deadlock
38
39
63template <typename T>
65{
66 public:
67
68 struct Item
69 {
70 T c;
72 };
73
77 QgsConnectionPoolGroup( const QString &ci )
78 : connInfo( ci )
79 , sem( QgsApplication::instance()->maxConcurrentConnectionsPerPool() + CONN_POOL_SPARE_CONNECTIONS )
80 {
81 }
82
84 {
85 QgsDebugMsgLevel( QStringLiteral( "Destroying connection pool group" ), 2 );
86 for ( const Item &item : std::as_const( conns ) )
87 {
88 qgsConnectionPool_ConnectionDestroy( item.c );
89 }
90 }
91
94
102 T acquire( int timeout, bool requestMayBeNested )
103 {
104 QgsDebugMsgLevel( QStringLiteral( "Trying to acquire connection" ), 2 );
105 const int requiredFreeConnectionCount = requestMayBeNested ? 1 : 3;
106 // we are going to acquire a resource - if no resource is available, we will block here
107 if ( timeout >= 0 )
108 {
109 if ( !sem.tryAcquire( requiredFreeConnectionCount, timeout ) )
110 {
111 QgsDebugMsgLevel( QStringLiteral( "Failed to acquire semaphore" ), 2 );
112 return nullptr;
113 }
114 }
115 else
116 {
117 // we should still be able to use tryAcquire with a negative timeout here, but
118 // tryAcquire is broken on Qt > 5.8 with negative timeouts - see
119 // https://bugreports.qt.io/browse/QTBUG-64413
120 // https://lists.osgeo.org/pipermail/qgis-developer/2017-November/050456.html
121 sem.acquire( requiredFreeConnectionCount );
122 }
123 sem.release( requiredFreeConnectionCount - 1 );
124
125 // quick (preferred) way - use cached connection
126 {
127 QMutexLocker locker( &connMutex );
128
129 if ( !conns.isEmpty() )
130 {
131 QgsDebugMsgLevel( QStringLiteral( "Trying to use existing connection" ), 2 );
132 Item i = conns.pop();
133 if ( !qgsConnectionPool_ConnectionIsValid( i.c ) )
134 {
135 QgsDebugMsgLevel( QStringLiteral( "Connection is not valid, destroying" ), 2 );
136 qgsConnectionPool_ConnectionDestroy( i.c );
137 QgsDebugMsgLevel( QStringLiteral( "Creating new connection" ), 2 );
138 qgsConnectionPool_ConnectionCreate( connInfo, i.c );
139 }
140
141
142 // no need to run if nothing can expire
143 if ( conns.isEmpty() )
144 {
145 // will call the slot directly or queue the call (if the object lives in a different thread)
146 QMetaObject::invokeMethod( expirationTimer->parent(), "stopExpirationTimer" );
147 }
148
149 QgsDebugMsgLevel( QStringLiteral( "Acquired connection" ), 2 );
150 acquiredConns.append( i.c );
151
152 return i.c;
153 }
154 }
155
156 QgsDebugMsgLevel( QStringLiteral( "Creating new connection" ), 2 );
157 T c;
158 qgsConnectionPool_ConnectionCreate( connInfo, c );
159 if ( !c )
160 {
161 // we didn't get connection for some reason, so release the lock
162 sem.release();
163 QgsDebugMsgLevel( QStringLiteral( "Failed to create new connection" ), 2 );
164 return nullptr;
165 }
166
167 connMutex.lock();
168 QgsDebugMsgLevel( QStringLiteral( "Acquired connection with name: %1" ).arg( qgsConnectionPool_ConnectionToName( c ) ), 2 );
169 acquiredConns.append( c );
170 connMutex.unlock();
171 return c;
172 }
173
174 void release( T conn )
175 {
176 QgsDebugMsgLevel( QStringLiteral( "Releasing connection" ), 2 );
177 connMutex.lock();
178 acquiredConns.removeAll( conn );
179 if ( !qgsConnectionPool_ConnectionIsValid( conn ) )
180 {
181 QgsDebugMsgLevel( QStringLiteral( "Destroying invalid connection" ), 2 );
182 qgsConnectionPool_ConnectionDestroy( conn );
183 }
184 else
185 {
186 Item i;
187 i.c = conn;
188 i.lastUsedTime = QTime::currentTime();
189 conns.push( i );
190
191 if ( !expirationTimer->isActive() )
192 {
193 // will call the slot directly or queue the call (if the object lives in a different thread)
194 QMetaObject::invokeMethod( expirationTimer->parent(), "startExpirationTimer" );
195 }
196 }
197
198 connMutex.unlock();
199
200 sem.release(); // this can unlock a thread waiting in acquire()
201 }
202
204 {
205 QgsDebugMsgLevel( QStringLiteral( "Invalidating connections for group" ), 2 );
206 connMutex.lock();
207 for ( const Item &i : std::as_const( conns ) )
208 {
209 qgsConnectionPool_ConnectionDestroy( i.c );
210 }
211 conns.clear();
212 for ( T c : std::as_const( acquiredConns ) )
213 qgsConnectionPool_InvalidateConnection( c );
214 connMutex.unlock();
215 }
216
217 protected:
218
219 void initTimer( QObject *parent )
220 {
221 expirationTimer = new QTimer( parent );
222 expirationTimer->setInterval( CONN_POOL_EXPIRATION_TIME * 1000 );
223 QObject::connect( expirationTimer, SIGNAL( timeout() ), parent, SLOT( handleConnectionExpired() ) );
224
225 // just to make sure the object belongs to main thread and thus will get events
226 if ( qApp )
227 parent->moveToThread( qApp->thread() );
228 }
229
231 {
232 connMutex.lock();
233
234 QTime now = QTime::currentTime();
235
236 // what connections have expired?
237 QList<int> toDelete;
238 for ( int i = 0; i < conns.count(); ++i )
239 {
240 if ( conns.at( i ).lastUsedTime.secsTo( now ) >= CONN_POOL_EXPIRATION_TIME )
241 toDelete.append( i );
242 }
243
244 // delete expired connections
245 for ( int j = toDelete.count() - 1; j >= 0; --j )
246 {
247 int index = toDelete[j];
248 qgsConnectionPool_ConnectionDestroy( conns[index].c );
249 conns.remove( index );
250 }
251
252 if ( conns.isEmpty() )
253 expirationTimer->stop();
254
255 connMutex.unlock();
256 }
257
258 protected:
259
260 QString connInfo;
261 QStack<Item> conns;
263 QMutex connMutex;
264 QSemaphore sem;
265 QTimer *expirationTimer = nullptr;
266
267};
268
269
287template <typename T, typename T_Group>
289{
290 public:
291
292 typedef QMap<QString, T_Group *> T_Groups;
293
295 {
296 QgsDebugMsgLevel( QStringLiteral( "Destroying connection pool" ), 2 );
297 mMutex.lock();
298 for ( auto it = mGroups.constBegin(); it != mGroups.constEnd(); ++it )
299 {
300 QgsDebugMsgLevel( QStringLiteral( "Destroying connection pool group with key %1" ).arg( it.key() ), 2 );
301 delete it.value();
302 }
303 QgsDebugMsgLevel( QStringLiteral( "Connection pool groups destroyed" ), 2 );
304 mGroups.clear();
305 mMutex.unlock();
306 }
307
318 T acquireConnection( const QString &connInfo, int timeout = -1, bool requestMayBeNested = false, QgsFeedback *feedback = nullptr )
319 {
320 QgsDebugMsgLevel( QStringLiteral( "Trying to acquire connection for %1" ).arg( connInfo ), 2 );
321 mMutex.lock();
322 typename T_Groups::iterator it = mGroups.find( connInfo );
323 if ( it == mGroups.end() )
324 {
325 QgsDebugMsgLevel( QStringLiteral( "Could not find existing group, adding new one" ), 2 );
326 it = mGroups.insert( connInfo, new T_Group( connInfo ) );
327 }
328 else
329 {
330 QgsDebugMsgLevel( QStringLiteral( "Found existing group" ), 2 );
331 }
332 T_Group *group = *it;
333 mMutex.unlock();
334
335 if ( feedback )
336 {
337 QElapsedTimer timer;
338 timer.start();
339
340 while ( !feedback->isCanceled() )
341 {
342 if ( T conn = group->acquire( 300, requestMayBeNested ) )
343 return conn;
344
345 if ( timeout > 0 && timer.elapsed() >= timeout )
346 return nullptr;
347 }
348 return nullptr;
349 }
350 else
351 {
352 return group->acquire( timeout, requestMayBeNested );
353 }
354 }
355
357 void releaseConnection( T conn )
358 {
359 mMutex.lock();
360 const QString groupName = qgsConnectionPool_ConnectionToName( conn );
361 QgsDebugMsgLevel( QStringLiteral( "Releasing connection for %1" ).arg( groupName ), 2 );
362 typename T_Groups::iterator it = mGroups.find( groupName );
363 Q_ASSERT( it != mGroups.end() );
364 T_Group *group = *it;
365 mMutex.unlock();
366
367 QgsDebugMsgLevel( QStringLiteral( "Group found, releasing..." ), 2 );
368 group->release( conn );
369 }
370
378 void invalidateConnections( const QString &connInfo )
379 {
380 QgsDebugMsgLevel( QStringLiteral( "Invalidating connections for %1" ).arg( connInfo ), 2 );
381 mMutex.lock();
382
383 auto it = mGroups.constFind( connInfo );
384 if ( it != mGroups.constEnd() )
385 {
386 QgsDebugMsgLevel( QStringLiteral( "Found group, invalidating..." ), 2 );
387 it.value()->invalidateConnections();
388 }
389 else
390 {
391 QgsDebugMsgLevel( QStringLiteral( "Could not find matching group!" ), 2 );
392 }
393 mMutex.unlock();
394 }
395
396
397 protected:
399 QMutex mMutex;
400};
401
402
403#endif // QGSCONNECTIONPOOL_H
Extends QApplication to provide access to QGIS specific resources such as theme paths,...
Template that stores data related to a connection to a single server or datasource.
QgsConnectionPoolGroup(const QgsConnectionPoolGroup &other)=delete
T acquire(int timeout, bool requestMayBeNested)
Try to acquire a connection for a maximum of timeout milliseconds.
QgsConnectionPoolGroup(const QString &ci)
Constructor for QgsConnectionPoolGroup, with the specified connection info.
QgsConnectionPoolGroup & operator=(const QgsConnectionPoolGroup &other)=delete
void initTimer(QObject *parent)
Template class responsible for keeping a pool of open connections.
QMap< QString, T_Group * > T_Groups
void invalidateConnections(const QString &connInfo)
Invalidates all connections to the specified resource.
T acquireConnection(const QString &connInfo, int timeout=-1, bool requestMayBeNested=false, QgsFeedback *feedback=nullptr)
Try to acquire a connection for a maximum of timeout milliseconds.
void releaseConnection(T conn)
Release an existing connection so it will get back into the pool and can be reused.
Base class for feedback objects to be used for cancellation of something running in a worker thread.
Definition qgsfeedback.h:44
As part of the API refactoring and improvements which landed in the Processing API was substantially reworked from the x version This was done in order to allow much of the underlying Processing framework to be ported into c
#define CONN_POOL_SPARE_CONNECTIONS
#define CONN_POOL_EXPIRATION_TIME
#define QgsDebugMsgLevel(str, level)
Definition qgslogger.h:41