QGIS API Documentation 3.99.0-Master (a8f284845db)
Loading...
Searching...
No Matches
qgsconnectionpool.h
Go to the documentation of this file.
1/***************************************************************************
2 qgsconnectionpool.h
3 ---------------------
4 begin : February 2014
5 copyright : (C) 2014 by Martin Dobias
6 email : wonder dot sk at gmail dot com
7 ***************************************************************************
8 * *
9 * This program is free software; you can redistribute it and/or modify *
10 * it under the terms of the GNU General Public License as published by *
11 * the Free Software Foundation; either version 2 of the License, or *
12 * (at your option) any later version. *
13 * *
14 ***************************************************************************/
15
16#ifndef QGSCONNECTIONPOOL_H
17#define QGSCONNECTIONPOOL_H
18
19
20#include "qgis.h"
21#include "qgsapplication.h"
22#include "qgsfeedback.h"
23#include "qgslogger.h"
24
25#include <QCoreApplication>
26#include <QElapsedTimer>
27#include <QMap>
28#include <QMutex>
29#include <QSemaphore>
30#include <QStack>
31#include <QString>
32#include <QThread>
33#include <QTime>
34#include <QTimer>
35
36#define SIP_NO_FILE
37
38using namespace Qt::StringLiterals;
39
40#define CONN_POOL_EXPIRATION_TIME 60 // in seconds
41#define CONN_POOL_SPARE_CONNECTIONS 2 // number of spare connections in case all the base connections are used but we have a nested request with the risk of a deadlock
42
43
67template <typename T>
69{
70 public:
71
72 struct Item
73 {
74 T c;
76 };
77
81 QgsConnectionPoolGroup( const QString &ci )
82 : connInfo( ci )
83 , sem( QgsApplication::instance()->maxConcurrentConnectionsPerPool() + CONN_POOL_SPARE_CONNECTIONS )
84 {
85 }
86
88 {
89 QgsDebugMsgLevel( u"Destroying connection pool group"_s, 2 );
90 for ( const Item &item : std::as_const( conns ) )
91 {
92 qgsConnectionPool_ConnectionDestroy( item.c );
93 }
94 }
95
98
106 T acquire( int timeout, bool requestMayBeNested )
107 {
108 QgsDebugMsgLevel( u"Trying to acquire connection"_s, 2 );
109 const int requiredFreeConnectionCount = requestMayBeNested ? 1 : 3;
110 // we are going to acquire a resource - if no resource is available, we will block here
111 if ( timeout >= 0 )
112 {
113 if ( !sem.tryAcquire( requiredFreeConnectionCount, timeout ) )
114 {
115 QgsDebugMsgLevel( u"Failed to acquire semaphore"_s, 2 );
116 return nullptr;
117 }
118 }
119 else
120 {
121 // we should still be able to use tryAcquire with a negative timeout here, but
122 // tryAcquire is broken on Qt > 5.8 with negative timeouts - see
123 // https://bugreports.qt.io/browse/QTBUG-64413
124 // https://lists.osgeo.org/pipermail/qgis-developer/2017-November/050456.html
125 sem.acquire( requiredFreeConnectionCount );
126 }
127 sem.release( requiredFreeConnectionCount - 1 );
128
129 // quick (preferred) way - use cached connection
130 {
131 QMutexLocker locker( &connMutex );
132
133 if ( !conns.isEmpty() )
134 {
135 QgsDebugMsgLevel( u"Trying to use existing connection"_s, 2 );
136 Item i = conns.pop();
137 if ( !qgsConnectionPool_ConnectionIsValid( i.c ) )
138 {
139 QgsDebugMsgLevel( u"Connection is not valid, destroying"_s, 2 );
140 qgsConnectionPool_ConnectionDestroy( i.c );
141 QgsDebugMsgLevel( u"Creating new connection"_s, 2 );
142 qgsConnectionPool_ConnectionCreate( connInfo, i.c );
143 }
144
145
146 // no need to run if nothing can expire
147 if ( conns.isEmpty() )
148 {
149 // will call the slot directly or queue the call (if the object lives in a different thread)
150 QMetaObject::invokeMethod( expirationTimer->parent(), "stopExpirationTimer" );
151 }
152
153 QgsDebugMsgLevel( u"Acquired connection"_s, 2 );
154 acquiredConns.append( i.c );
155
156 return i.c;
157 }
158 }
159
160 QgsDebugMsgLevel( u"Creating new connection"_s, 2 );
161 T c;
162 qgsConnectionPool_ConnectionCreate( connInfo, c );
163 if ( !c )
164 {
165 // we didn't get connection for some reason, so release the lock
166 sem.release();
167 QgsDebugMsgLevel( u"Failed to create new connection"_s, 2 );
168 return nullptr;
169 }
170
171 connMutex.lock();
172 QgsDebugMsgLevel( u"Acquired connection with name: %1"_s.arg( qgsConnectionPool_ConnectionToName( c ) ), 2 );
173 acquiredConns.append( c );
174 connMutex.unlock();
175 return c;
176 }
177
178 void release( T conn )
179 {
180 QgsDebugMsgLevel( u"Releasing connection"_s, 2 );
181 connMutex.lock();
182 acquiredConns.removeAll( conn );
183 if ( !qgsConnectionPool_ConnectionIsValid( conn ) )
184 {
185 QgsDebugMsgLevel( u"Destroying invalid connection"_s, 2 );
186 qgsConnectionPool_ConnectionDestroy( conn );
187 }
188 else
189 {
190 Item i;
191 i.c = conn;
192 i.lastUsedTime = QTime::currentTime();
193 conns.push( i );
194
195 if ( !expirationTimer->isActive() )
196 {
197 // will call the slot directly or queue the call (if the object lives in a different thread)
198 QMetaObject::invokeMethod( expirationTimer->parent(), "startExpirationTimer" );
199 }
200 }
201
202 connMutex.unlock();
203
204 sem.release(); // this can unlock a thread waiting in acquire()
205 }
206
208 {
209 QgsDebugMsgLevel( u"Invalidating connections for group"_s, 2 );
210 connMutex.lock();
211 for ( const Item &i : std::as_const( conns ) )
212 {
213 qgsConnectionPool_ConnectionDestroy( i.c );
214 }
215 conns.clear();
216 for ( T c : std::as_const( acquiredConns ) )
217 qgsConnectionPool_InvalidateConnection( c );
218 connMutex.unlock();
219 }
220
221 protected:
222
229 template<typename U>
230 void initTimer( U *parent )
231 {
232 expirationTimer = new QTimer( parent );
233 expirationTimer->setInterval( CONN_POOL_EXPIRATION_TIME * 1000 );
234 QObject::connect( expirationTimer, &QTimer::timeout, parent, &U::handleConnectionExpired );
235
236 // just to make sure the object belongs to main thread and thus will get events
237 if ( qApp )
238 parent->moveToThread( qApp->thread() );
239 }
240
242 {
243 connMutex.lock();
244
245 QTime now = QTime::currentTime();
246
247 // what connections have expired?
248 QList<int> toDelete;
249 for ( int i = 0; i < conns.count(); ++i )
250 {
251 if ( conns.at( i ).lastUsedTime.secsTo( now ) >= CONN_POOL_EXPIRATION_TIME )
252 toDelete.append( i );
253 }
254
255 // delete expired connections
256 for ( int j = toDelete.count() - 1; j >= 0; --j )
257 {
258 int index = toDelete[j];
259 qgsConnectionPool_ConnectionDestroy( conns[index].c );
260 conns.remove( index );
261 }
262
263 if ( conns.isEmpty() )
264 expirationTimer->stop();
265
266 connMutex.unlock();
267 }
268
269 protected:
270
271 QString connInfo;
272 QStack<Item> conns;
274 QMutex connMutex;
275 QSemaphore sem;
276 QTimer *expirationTimer = nullptr;
277
278};
279
280
298template <typename T, typename T_Group>
300{
301 public:
302
303 typedef QMap<QString, T_Group *> T_Groups;
304
306 {
307 QgsDebugMsgLevel( u"Destroying connection pool"_s, 2 );
308 mMutex.lock();
309 for ( auto it = mGroups.constBegin(); it != mGroups.constEnd(); ++it )
310 {
311 QgsDebugMsgLevel( u"Destroying connection pool group with key %1"_s.arg( it.key() ), 2 );
312 delete it.value();
313 }
314 QgsDebugMsgLevel( u"Connection pool groups destroyed"_s, 2 );
315 mGroups.clear();
316 mMutex.unlock();
317 }
318
329 T acquireConnection( const QString &connInfo, int timeout = -1, bool requestMayBeNested = false, QgsFeedback *feedback = nullptr )
330 {
331 QgsDebugMsgLevel( u"Trying to acquire connection for %1"_s.arg( connInfo ), 2 );
332 mMutex.lock();
333 typename T_Groups::iterator it = mGroups.find( connInfo );
334 if ( it == mGroups.end() )
335 {
336 QgsDebugMsgLevel( u"Could not find existing group, adding new one"_s, 2 );
337 it = mGroups.insert( connInfo, new T_Group( connInfo ) );
338 }
339 else
340 {
341 QgsDebugMsgLevel( u"Found existing group"_s, 2 );
342 }
343 T_Group *group = *it;
344 mMutex.unlock();
345
346 if ( feedback )
347 {
348 QElapsedTimer timer;
349 timer.start();
350
351 while ( !feedback->isCanceled() )
352 {
353 if ( T conn = group->acquire( 300, requestMayBeNested ) )
354 return conn;
355
356 if ( timeout > 0 && timer.elapsed() >= timeout )
357 return nullptr;
358 }
359 return nullptr;
360 }
361 else
362 {
363 return group->acquire( timeout, requestMayBeNested );
364 }
365 }
366
368 void releaseConnection( T conn )
369 {
370 mMutex.lock();
371 const QString groupName = qgsConnectionPool_ConnectionToName( conn );
372 QgsDebugMsgLevel( u"Releasing connection for %1"_s.arg( groupName ), 2 );
373 typename T_Groups::iterator it = mGroups.find( groupName );
374 Q_ASSERT( it != mGroups.end() );
375 T_Group *group = *it;
376 mMutex.unlock();
377
378 QgsDebugMsgLevel( u"Group found, releasing..."_s, 2 );
379 group->release( conn );
380 }
381
389 void invalidateConnections( const QString &connInfo )
390 {
391 QgsDebugMsgLevel( u"Invalidating connections for %1"_s.arg( connInfo ), 2 );
392 mMutex.lock();
393
394 auto it = mGroups.constFind( connInfo );
395 if ( it != mGroups.constEnd() )
396 {
397 QgsDebugMsgLevel( u"Found group, invalidating..."_s, 2 );
398 it.value()->invalidateConnections();
399 }
400 else
401 {
402 QgsDebugMsgLevel( u"Could not find matching group!"_s, 2 );
403 }
404 mMutex.unlock();
405 }
406
407
408 protected:
410 QMutex mMutex;
411};
412
413
414#endif // QGSCONNECTIONPOOL_H
Extends QApplication to provide access to QGIS specific resources such as theme paths,...
QgsConnectionPoolGroup(const QgsConnectionPoolGroup &other)=delete
T acquire(int timeout, bool requestMayBeNested)
Try to acquire a connection for a maximum of timeout milliseconds.
void initTimer(U *parent)
Initializes the connection timeout handling.
QgsConnectionPoolGroup(const QString &ci)
Constructor for QgsConnectionPoolGroup, with the specified connection info.
QgsConnectionPoolGroup & operator=(const QgsConnectionPoolGroup &other)=delete
Template class responsible for keeping a pool of open connections.
QMap< QString, T_Group * > T_Groups
void invalidateConnections(const QString &connInfo)
Invalidates all connections to the specified resource.
T acquireConnection(const QString &connInfo, int timeout=-1, bool requestMayBeNested=false, QgsFeedback *feedback=nullptr)
Try to acquire a connection for a maximum of timeout milliseconds.
void releaseConnection(T conn)
Release an existing connection so it will get back into the pool and can be reused.
Base class for feedback objects to be used for cancellation of something running in a worker thread.
Definition qgsfeedback.h:44
As part of the API refactoring and improvements which landed in the Processing API was substantially reworked from the x version This was done in order to allow much of the underlying Processing framework to be ported into c
#define CONN_POOL_SPARE_CONNECTIONS
#define CONN_POOL_EXPIRATION_TIME
#define QgsDebugMsgLevel(str, level)
Definition qgslogger.h:63