QGIS API Documentation 3.99.0-Master (2fe06baccd8)
Loading...
Searching...
No Matches
qgsconnectionpool.h
Go to the documentation of this file.
1/***************************************************************************
2 qgsconnectionpool.h
3 ---------------------
4 begin : February 2014
5 copyright : (C) 2014 by Martin Dobias
6 email : wonder dot sk at gmail dot com
7 ***************************************************************************
8 * *
9 * This program is free software; you can redistribute it and/or modify *
10 * it under the terms of the GNU General Public License as published by *
11 * the Free Software Foundation; either version 2 of the License, or *
12 * (at your option) any later version. *
13 * *
14 ***************************************************************************/
15
16#ifndef QGSCONNECTIONPOOL_H
17#define QGSCONNECTIONPOOL_H
18
19#define SIP_NO_FILE
20
21#include "qgis.h"
22#include "qgsapplication.h"
23#include "qgsfeedback.h"
24#include "qgslogger.h"
25
26#include <QCoreApplication>
27#include <QElapsedTimer>
28#include <QMap>
29#include <QMutex>
30#include <QSemaphore>
31#include <QStack>
32#include <QThread>
33#include <QTime>
34#include <QTimer>
35
36#define CONN_POOL_EXPIRATION_TIME 60 // in seconds
37#define CONN_POOL_SPARE_CONNECTIONS 2 // number of spare connections in case all the base connections are used but we have a nested request with the risk of a deadlock
38
39
63template <typename T>
65{
66 public:
67
68 struct Item
69 {
70 T c;
72 };
73
77 QgsConnectionPoolGroup( const QString &ci )
78 : connInfo( ci )
79 , sem( QgsApplication::instance()->maxConcurrentConnectionsPerPool() + CONN_POOL_SPARE_CONNECTIONS )
80 {
81 }
82
84 {
85 QgsDebugMsgLevel( QStringLiteral( "Destroying connection pool group" ), 2 );
86 for ( const Item &item : std::as_const( conns ) )
87 {
88 qgsConnectionPool_ConnectionDestroy( item.c );
89 }
90 }
91
94
102 T acquire( int timeout, bool requestMayBeNested )
103 {
104 QgsDebugMsgLevel( QStringLiteral( "Trying to acquire connection" ), 2 );
105 const int requiredFreeConnectionCount = requestMayBeNested ? 1 : 3;
106 // we are going to acquire a resource - if no resource is available, we will block here
107 if ( timeout >= 0 )
108 {
109 if ( !sem.tryAcquire( requiredFreeConnectionCount, timeout ) )
110 {
111 QgsDebugMsgLevel( QStringLiteral( "Failed to acquire semaphore" ), 2 );
112 return nullptr;
113 }
114 }
115 else
116 {
117 // we should still be able to use tryAcquire with a negative timeout here, but
118 // tryAcquire is broken on Qt > 5.8 with negative timeouts - see
119 // https://bugreports.qt.io/browse/QTBUG-64413
120 // https://lists.osgeo.org/pipermail/qgis-developer/2017-November/050456.html
121 sem.acquire( requiredFreeConnectionCount );
122 }
123 sem.release( requiredFreeConnectionCount - 1 );
124
125 // quick (preferred) way - use cached connection
126 {
127 QMutexLocker locker( &connMutex );
128
129 if ( !conns.isEmpty() )
130 {
131 QgsDebugMsgLevel( QStringLiteral( "Trying to use existing connection" ), 2 );
132 Item i = conns.pop();
133 if ( !qgsConnectionPool_ConnectionIsValid( i.c ) )
134 {
135 QgsDebugMsgLevel( QStringLiteral( "Connection is not valid, destroying" ), 2 );
136 qgsConnectionPool_ConnectionDestroy( i.c );
137 QgsDebugMsgLevel( QStringLiteral( "Creating new connection" ), 2 );
138 qgsConnectionPool_ConnectionCreate( connInfo, i.c );
139 }
140
141
142 // no need to run if nothing can expire
143 if ( conns.isEmpty() )
144 {
145 // will call the slot directly or queue the call (if the object lives in a different thread)
146 QMetaObject::invokeMethod( expirationTimer->parent(), "stopExpirationTimer" );
147 }
148
149 QgsDebugMsgLevel( QStringLiteral( "Acquired connection" ), 2 );
150 acquiredConns.append( i.c );
151
152 return i.c;
153 }
154 }
155
156 QgsDebugMsgLevel( QStringLiteral( "Creating new connection" ), 2 );
157 T c;
158 qgsConnectionPool_ConnectionCreate( connInfo, c );
159 if ( !c )
160 {
161 // we didn't get connection for some reason, so release the lock
162 sem.release();
163 QgsDebugMsgLevel( QStringLiteral( "Failed to create new connection" ), 2 );
164 return nullptr;
165 }
166
167 connMutex.lock();
168 QgsDebugMsgLevel( QStringLiteral( "Acquired connection with name: %1" ).arg( qgsConnectionPool_ConnectionToName( c ) ), 2 );
169 acquiredConns.append( c );
170 connMutex.unlock();
171 return c;
172 }
173
174 void release( T conn )
175 {
176 QgsDebugMsgLevel( QStringLiteral( "Releasing connection" ), 2 );
177 connMutex.lock();
178 acquiredConns.removeAll( conn );
179 if ( !qgsConnectionPool_ConnectionIsValid( conn ) )
180 {
181 QgsDebugMsgLevel( QStringLiteral( "Destroying invalid connection" ), 2 );
182 qgsConnectionPool_ConnectionDestroy( conn );
183 }
184 else
185 {
186 Item i;
187 i.c = conn;
188 i.lastUsedTime = QTime::currentTime();
189 conns.push( i );
190
191 if ( !expirationTimer->isActive() )
192 {
193 // will call the slot directly or queue the call (if the object lives in a different thread)
194 QMetaObject::invokeMethod( expirationTimer->parent(), "startExpirationTimer" );
195 }
196 }
197
198 connMutex.unlock();
199
200 sem.release(); // this can unlock a thread waiting in acquire()
201 }
202
204 {
205 QgsDebugMsgLevel( QStringLiteral( "Invalidating connections for group" ), 2 );
206 connMutex.lock();
207 for ( const Item &i : std::as_const( conns ) )
208 {
209 qgsConnectionPool_ConnectionDestroy( i.c );
210 }
211 conns.clear();
212 for ( T c : std::as_const( acquiredConns ) )
213 qgsConnectionPool_InvalidateConnection( c );
214 connMutex.unlock();
215 }
216
217 protected:
218
225 template<typename U>
226 void initTimer( U *parent )
227 {
228 expirationTimer = new QTimer( parent );
229 expirationTimer->setInterval( CONN_POOL_EXPIRATION_TIME * 1000 );
230 QObject::connect( expirationTimer, &QTimer::timeout, parent, &U::handleConnectionExpired );
231
232 // just to make sure the object belongs to main thread and thus will get events
233 if ( qApp )
234 parent->moveToThread( qApp->thread() );
235 }
236
238 {
239 connMutex.lock();
240
241 QTime now = QTime::currentTime();
242
243 // what connections have expired?
244 QList<int> toDelete;
245 for ( int i = 0; i < conns.count(); ++i )
246 {
247 if ( conns.at( i ).lastUsedTime.secsTo( now ) >= CONN_POOL_EXPIRATION_TIME )
248 toDelete.append( i );
249 }
250
251 // delete expired connections
252 for ( int j = toDelete.count() - 1; j >= 0; --j )
253 {
254 int index = toDelete[j];
255 qgsConnectionPool_ConnectionDestroy( conns[index].c );
256 conns.remove( index );
257 }
258
259 if ( conns.isEmpty() )
260 expirationTimer->stop();
261
262 connMutex.unlock();
263 }
264
265 protected:
266
267 QString connInfo;
268 QStack<Item> conns;
270 QMutex connMutex;
271 QSemaphore sem;
272 QTimer *expirationTimer = nullptr;
273
274};
275
276
294template <typename T, typename T_Group>
296{
297 public:
298
299 typedef QMap<QString, T_Group *> T_Groups;
300
302 {
303 QgsDebugMsgLevel( QStringLiteral( "Destroying connection pool" ), 2 );
304 mMutex.lock();
305 for ( auto it = mGroups.constBegin(); it != mGroups.constEnd(); ++it )
306 {
307 QgsDebugMsgLevel( QStringLiteral( "Destroying connection pool group with key %1" ).arg( it.key() ), 2 );
308 delete it.value();
309 }
310 QgsDebugMsgLevel( QStringLiteral( "Connection pool groups destroyed" ), 2 );
311 mGroups.clear();
312 mMutex.unlock();
313 }
314
325 T acquireConnection( const QString &connInfo, int timeout = -1, bool requestMayBeNested = false, QgsFeedback *feedback = nullptr )
326 {
327 QgsDebugMsgLevel( QStringLiteral( "Trying to acquire connection for %1" ).arg( connInfo ), 2 );
328 mMutex.lock();
329 typename T_Groups::iterator it = mGroups.find( connInfo );
330 if ( it == mGroups.end() )
331 {
332 QgsDebugMsgLevel( QStringLiteral( "Could not find existing group, adding new one" ), 2 );
333 it = mGroups.insert( connInfo, new T_Group( connInfo ) );
334 }
335 else
336 {
337 QgsDebugMsgLevel( QStringLiteral( "Found existing group" ), 2 );
338 }
339 T_Group *group = *it;
340 mMutex.unlock();
341
342 if ( feedback )
343 {
344 QElapsedTimer timer;
345 timer.start();
346
347 while ( !feedback->isCanceled() )
348 {
349 if ( T conn = group->acquire( 300, requestMayBeNested ) )
350 return conn;
351
352 if ( timeout > 0 && timer.elapsed() >= timeout )
353 return nullptr;
354 }
355 return nullptr;
356 }
357 else
358 {
359 return group->acquire( timeout, requestMayBeNested );
360 }
361 }
362
364 void releaseConnection( T conn )
365 {
366 mMutex.lock();
367 const QString groupName = qgsConnectionPool_ConnectionToName( conn );
368 QgsDebugMsgLevel( QStringLiteral( "Releasing connection for %1" ).arg( groupName ), 2 );
369 typename T_Groups::iterator it = mGroups.find( groupName );
370 Q_ASSERT( it != mGroups.end() );
371 T_Group *group = *it;
372 mMutex.unlock();
373
374 QgsDebugMsgLevel( QStringLiteral( "Group found, releasing..." ), 2 );
375 group->release( conn );
376 }
377
385 void invalidateConnections( const QString &connInfo )
386 {
387 QgsDebugMsgLevel( QStringLiteral( "Invalidating connections for %1" ).arg( connInfo ), 2 );
388 mMutex.lock();
389
390 auto it = mGroups.constFind( connInfo );
391 if ( it != mGroups.constEnd() )
392 {
393 QgsDebugMsgLevel( QStringLiteral( "Found group, invalidating..." ), 2 );
394 it.value()->invalidateConnections();
395 }
396 else
397 {
398 QgsDebugMsgLevel( QStringLiteral( "Could not find matching group!" ), 2 );
399 }
400 mMutex.unlock();
401 }
402
403
404 protected:
406 QMutex mMutex;
407};
408
409
410#endif // QGSCONNECTIONPOOL_H
Extends QApplication to provide access to QGIS specific resources such as theme paths,...
QgsConnectionPoolGroup(const QgsConnectionPoolGroup &other)=delete
T acquire(int timeout, bool requestMayBeNested)
Try to acquire a connection for a maximum of timeout milliseconds.
void initTimer(U *parent)
Initializes the connection timeout handling.
QgsConnectionPoolGroup(const QString &ci)
Constructor for QgsConnectionPoolGroup, with the specified connection info.
QgsConnectionPoolGroup & operator=(const QgsConnectionPoolGroup &other)=delete
Template class responsible for keeping a pool of open connections.
QMap< QString, T_Group * > T_Groups
void invalidateConnections(const QString &connInfo)
Invalidates all connections to the specified resource.
T acquireConnection(const QString &connInfo, int timeout=-1, bool requestMayBeNested=false, QgsFeedback *feedback=nullptr)
Try to acquire a connection for a maximum of timeout milliseconds.
void releaseConnection(T conn)
Release an existing connection so it will get back into the pool and can be reused.
Base class for feedback objects to be used for cancellation of something running in a worker thread.
Definition qgsfeedback.h:44
As part of the API refactoring and improvements which landed in the Processing API was substantially reworked from the x version This was done in order to allow much of the underlying Processing framework to be ported into c
#define CONN_POOL_SPARE_CONNECTIONS
#define CONN_POOL_EXPIRATION_TIME
#define QgsDebugMsgLevel(str, level)
Definition qgslogger.h:61