QGIS API Documentation 3.99.0-Master (d270888f95f)
Loading...
Searching...
No Matches
qgsconnectionpool.h
Go to the documentation of this file.
1/***************************************************************************
2 qgsconnectionpool.h
3 ---------------------
4 begin : February 2014
5 copyright : (C) 2014 by Martin Dobias
6 email : wonder dot sk at gmail dot com
7 ***************************************************************************
8 * *
9 * This program is free software; you can redistribute it and/or modify *
10 * it under the terms of the GNU General Public License as published by *
11 * the Free Software Foundation; either version 2 of the License, or *
12 * (at your option) any later version. *
13 * *
14 ***************************************************************************/
15
16#ifndef QGSCONNECTIONPOOL_H
17#define QGSCONNECTIONPOOL_H
18
19#define SIP_NO_FILE
20
21#include "qgis.h"
22#include "qgsapplication.h"
23#include "qgsfeedback.h"
24#include "qgslogger.h"
25
26#include <QCoreApplication>
27#include <QElapsedTimer>
28#include <QMap>
29#include <QMutex>
30#include <QSemaphore>
31#include <QStack>
32#include <QString>
33#include <QThread>
34#include <QTime>
35#include <QTimer>
36
37using namespace Qt::StringLiterals;
38
39#define CONN_POOL_EXPIRATION_TIME 60 // in seconds
40#define CONN_POOL_SPARE_CONNECTIONS 2 // number of spare connections in case all the base connections are used but we have a nested request with the risk of a deadlock
41
42
66template <typename T>
68{
69 public:
70
71 struct Item
72 {
73 T c;
75 };
76
80 QgsConnectionPoolGroup( const QString &ci )
81 : connInfo( ci )
82 , sem( QgsApplication::instance()->maxConcurrentConnectionsPerPool() + CONN_POOL_SPARE_CONNECTIONS )
83 {
84 }
85
87 {
88 QgsDebugMsgLevel( u"Destroying connection pool group"_s, 2 );
89 for ( const Item &item : std::as_const( conns ) )
90 {
91 qgsConnectionPool_ConnectionDestroy( item.c );
92 }
93 }
94
97
105 T acquire( int timeout, bool requestMayBeNested )
106 {
107 QgsDebugMsgLevel( u"Trying to acquire connection"_s, 2 );
108 const int requiredFreeConnectionCount = requestMayBeNested ? 1 : 3;
109 // we are going to acquire a resource - if no resource is available, we will block here
110 if ( timeout >= 0 )
111 {
112 if ( !sem.tryAcquire( requiredFreeConnectionCount, timeout ) )
113 {
114 QgsDebugMsgLevel( u"Failed to acquire semaphore"_s, 2 );
115 return nullptr;
116 }
117 }
118 else
119 {
120 // we should still be able to use tryAcquire with a negative timeout here, but
121 // tryAcquire is broken on Qt > 5.8 with negative timeouts - see
122 // https://bugreports.qt.io/browse/QTBUG-64413
123 // https://lists.osgeo.org/pipermail/qgis-developer/2017-November/050456.html
124 sem.acquire( requiredFreeConnectionCount );
125 }
126 sem.release( requiredFreeConnectionCount - 1 );
127
128 // quick (preferred) way - use cached connection
129 {
130 QMutexLocker locker( &connMutex );
131
132 if ( !conns.isEmpty() )
133 {
134 QgsDebugMsgLevel( u"Trying to use existing connection"_s, 2 );
135 Item i = conns.pop();
136 if ( !qgsConnectionPool_ConnectionIsValid( i.c ) )
137 {
138 QgsDebugMsgLevel( u"Connection is not valid, destroying"_s, 2 );
139 qgsConnectionPool_ConnectionDestroy( i.c );
140 QgsDebugMsgLevel( u"Creating new connection"_s, 2 );
141 qgsConnectionPool_ConnectionCreate( connInfo, i.c );
142 }
143
144
145 // no need to run if nothing can expire
146 if ( conns.isEmpty() )
147 {
148 // will call the slot directly or queue the call (if the object lives in a different thread)
149 QMetaObject::invokeMethod( expirationTimer->parent(), "stopExpirationTimer" );
150 }
151
152 QgsDebugMsgLevel( u"Acquired connection"_s, 2 );
153 acquiredConns.append( i.c );
154
155 return i.c;
156 }
157 }
158
159 QgsDebugMsgLevel( u"Creating new connection"_s, 2 );
160 T c;
161 qgsConnectionPool_ConnectionCreate( connInfo, c );
162 if ( !c )
163 {
164 // we didn't get connection for some reason, so release the lock
165 sem.release();
166 QgsDebugMsgLevel( u"Failed to create new connection"_s, 2 );
167 return nullptr;
168 }
169
170 connMutex.lock();
171 QgsDebugMsgLevel( u"Acquired connection with name: %1"_s.arg( qgsConnectionPool_ConnectionToName( c ) ), 2 );
172 acquiredConns.append( c );
173 connMutex.unlock();
174 return c;
175 }
176
177 void release( T conn )
178 {
179 QgsDebugMsgLevel( u"Releasing connection"_s, 2 );
180 connMutex.lock();
181 acquiredConns.removeAll( conn );
182 if ( !qgsConnectionPool_ConnectionIsValid( conn ) )
183 {
184 QgsDebugMsgLevel( u"Destroying invalid connection"_s, 2 );
185 qgsConnectionPool_ConnectionDestroy( conn );
186 }
187 else
188 {
189 Item i;
190 i.c = conn;
191 i.lastUsedTime = QTime::currentTime();
192 conns.push( i );
193
194 if ( !expirationTimer->isActive() )
195 {
196 // will call the slot directly or queue the call (if the object lives in a different thread)
197 QMetaObject::invokeMethod( expirationTimer->parent(), "startExpirationTimer" );
198 }
199 }
200
201 connMutex.unlock();
202
203 sem.release(); // this can unlock a thread waiting in acquire()
204 }
205
207 {
208 QgsDebugMsgLevel( u"Invalidating connections for group"_s, 2 );
209 connMutex.lock();
210 for ( const Item &i : std::as_const( conns ) )
211 {
212 qgsConnectionPool_ConnectionDestroy( i.c );
213 }
214 conns.clear();
215 for ( T c : std::as_const( acquiredConns ) )
216 qgsConnectionPool_InvalidateConnection( c );
217 connMutex.unlock();
218 }
219
220 protected:
221
228 template<typename U>
229 void initTimer( U *parent )
230 {
231 expirationTimer = new QTimer( parent );
232 expirationTimer->setInterval( CONN_POOL_EXPIRATION_TIME * 1000 );
233 QObject::connect( expirationTimer, &QTimer::timeout, parent, &U::handleConnectionExpired );
234
235 // just to make sure the object belongs to main thread and thus will get events
236 if ( qApp )
237 parent->moveToThread( qApp->thread() );
238 }
239
241 {
242 connMutex.lock();
243
244 QTime now = QTime::currentTime();
245
246 // what connections have expired?
247 QList<int> toDelete;
248 for ( int i = 0; i < conns.count(); ++i )
249 {
250 if ( conns.at( i ).lastUsedTime.secsTo( now ) >= CONN_POOL_EXPIRATION_TIME )
251 toDelete.append( i );
252 }
253
254 // delete expired connections
255 for ( int j = toDelete.count() - 1; j >= 0; --j )
256 {
257 int index = toDelete[j];
258 qgsConnectionPool_ConnectionDestroy( conns[index].c );
259 conns.remove( index );
260 }
261
262 if ( conns.isEmpty() )
263 expirationTimer->stop();
264
265 connMutex.unlock();
266 }
267
268 protected:
269
270 QString connInfo;
271 QStack<Item> conns;
273 QMutex connMutex;
274 QSemaphore sem;
275 QTimer *expirationTimer = nullptr;
276
277};
278
279
297template <typename T, typename T_Group>
299{
300 public:
301
302 typedef QMap<QString, T_Group *> T_Groups;
303
305 {
306 QgsDebugMsgLevel( u"Destroying connection pool"_s, 2 );
307 mMutex.lock();
308 for ( auto it = mGroups.constBegin(); it != mGroups.constEnd(); ++it )
309 {
310 QgsDebugMsgLevel( u"Destroying connection pool group with key %1"_s.arg( it.key() ), 2 );
311 delete it.value();
312 }
313 QgsDebugMsgLevel( u"Connection pool groups destroyed"_s, 2 );
314 mGroups.clear();
315 mMutex.unlock();
316 }
317
328 T acquireConnection( const QString &connInfo, int timeout = -1, bool requestMayBeNested = false, QgsFeedback *feedback = nullptr )
329 {
330 QgsDebugMsgLevel( u"Trying to acquire connection for %1"_s.arg( connInfo ), 2 );
331 mMutex.lock();
332 typename T_Groups::iterator it = mGroups.find( connInfo );
333 if ( it == mGroups.end() )
334 {
335 QgsDebugMsgLevel( u"Could not find existing group, adding new one"_s, 2 );
336 it = mGroups.insert( connInfo, new T_Group( connInfo ) );
337 }
338 else
339 {
340 QgsDebugMsgLevel( u"Found existing group"_s, 2 );
341 }
342 T_Group *group = *it;
343 mMutex.unlock();
344
345 if ( feedback )
346 {
347 QElapsedTimer timer;
348 timer.start();
349
350 while ( !feedback->isCanceled() )
351 {
352 if ( T conn = group->acquire( 300, requestMayBeNested ) )
353 return conn;
354
355 if ( timeout > 0 && timer.elapsed() >= timeout )
356 return nullptr;
357 }
358 return nullptr;
359 }
360 else
361 {
362 return group->acquire( timeout, requestMayBeNested );
363 }
364 }
365
367 void releaseConnection( T conn )
368 {
369 mMutex.lock();
370 const QString groupName = qgsConnectionPool_ConnectionToName( conn );
371 QgsDebugMsgLevel( u"Releasing connection for %1"_s.arg( groupName ), 2 );
372 typename T_Groups::iterator it = mGroups.find( groupName );
373 Q_ASSERT( it != mGroups.end() );
374 T_Group *group = *it;
375 mMutex.unlock();
376
377 QgsDebugMsgLevel( u"Group found, releasing..."_s, 2 );
378 group->release( conn );
379 }
380
388 void invalidateConnections( const QString &connInfo )
389 {
390 QgsDebugMsgLevel( u"Invalidating connections for %1"_s.arg( connInfo ), 2 );
391 mMutex.lock();
392
393 auto it = mGroups.constFind( connInfo );
394 if ( it != mGroups.constEnd() )
395 {
396 QgsDebugMsgLevel( u"Found group, invalidating..."_s, 2 );
397 it.value()->invalidateConnections();
398 }
399 else
400 {
401 QgsDebugMsgLevel( u"Could not find matching group!"_s, 2 );
402 }
403 mMutex.unlock();
404 }
405
406
407 protected:
409 QMutex mMutex;
410};
411
412
413#endif // QGSCONNECTIONPOOL_H
Extends QApplication to provide access to QGIS specific resources such as theme paths,...
QgsConnectionPoolGroup(const QgsConnectionPoolGroup &other)=delete
T acquire(int timeout, bool requestMayBeNested)
Try to acquire a connection for a maximum of timeout milliseconds.
void initTimer(U *parent)
Initializes the connection timeout handling.
QgsConnectionPoolGroup(const QString &ci)
Constructor for QgsConnectionPoolGroup, with the specified connection info.
QgsConnectionPoolGroup & operator=(const QgsConnectionPoolGroup &other)=delete
Template class responsible for keeping a pool of open connections.
QMap< QString, T_Group * > T_Groups
void invalidateConnections(const QString &connInfo)
Invalidates all connections to the specified resource.
T acquireConnection(const QString &connInfo, int timeout=-1, bool requestMayBeNested=false, QgsFeedback *feedback=nullptr)
Try to acquire a connection for a maximum of timeout milliseconds.
void releaseConnection(T conn)
Release an existing connection so it will get back into the pool and can be reused.
Base class for feedback objects to be used for cancellation of something running in a worker thread.
Definition qgsfeedback.h:44
As part of the API refactoring and improvements which landed in the Processing API was substantially reworked from the x version This was done in order to allow much of the underlying Processing framework to be ported into c
#define CONN_POOL_SPARE_CONNECTIONS
#define CONN_POOL_EXPIRATION_TIME
#define QgsDebugMsgLevel(str, level)
Definition qgslogger.h:63