QGIS API Documentation 4.0.0-Norrköping (1ddcee3d0e4)
Loading...
Searching...
No Matches
qgsconnectionpool.h
Go to the documentation of this file.
1/***************************************************************************
2 qgsconnectionpool.h
3 ---------------------
4 begin : February 2014
5 copyright : (C) 2014 by Martin Dobias
6 email : wonder dot sk at gmail dot com
7 ***************************************************************************
8 * *
9 * This program is free software; you can redistribute it and/or modify *
10 * it under the terms of the GNU General Public License as published by *
11 * the Free Software Foundation; either version 2 of the License, or *
12 * (at your option) any later version. *
13 * *
14 ***************************************************************************/
15
16#ifndef QGSCONNECTIONPOOL_H
17#define QGSCONNECTIONPOOL_H
18
19
20#include "qgis.h"
21#include "qgsapplication.h"
22#include "qgsfeedback.h"
23#include "qgslogger.h"
24
25#include <QCoreApplication>
26#include <QElapsedTimer>
27#include <QMap>
28#include <QMutex>
29#include <QSemaphore>
30#include <QStack>
31#include <QString>
32#include <QThread>
33#include <QTime>
34#include <QTimer>
35
36#define SIP_NO_FILE
37
38using namespace Qt::StringLiterals;
39
40#define CONN_POOL_EXPIRATION_TIME 60 // in seconds
41#define CONN_POOL_SPARE_CONNECTIONS 2 // number of spare connections in case all the base connections are used but we have a nested request with the risk of a deadlock
42
43
67template<typename T> class QgsConnectionPoolGroup
68{
69 public:
70 struct Item
71 {
72 T c;
74 };
75
79 QgsConnectionPoolGroup( const QString &ci )
80 : connInfo( ci )
81 , sem( QgsApplication::instance()->maxConcurrentConnectionsPerPool() + CONN_POOL_SPARE_CONNECTIONS )
82 {}
83
85 {
86 QgsDebugMsgLevel( u"Destroying connection pool group"_s, 2 );
87 for ( const Item &item : std::as_const( conns ) )
88 {
89 qgsConnectionPool_ConnectionDestroy( item.c );
90 }
91 }
92
95
103 T acquire( int timeout, bool requestMayBeNested )
104 {
105 QgsDebugMsgLevel( u"Trying to acquire connection"_s, 2 );
106 const int requiredFreeConnectionCount = requestMayBeNested ? 1 : 3;
107 // we are going to acquire a resource - if no resource is available, we will block here
108 if ( timeout >= 0 )
109 {
110 if ( !sem.tryAcquire( requiredFreeConnectionCount, timeout ) )
111 {
112 QgsDebugMsgLevel( u"Failed to acquire semaphore"_s, 2 );
113 return nullptr;
114 }
115 }
116 else
117 {
118 // we should still be able to use tryAcquire with a negative timeout here, but
119 // tryAcquire is broken on Qt > 5.8 with negative timeouts - see
120 // https://bugreports.qt.io/browse/QTBUG-64413
121 // https://lists.osgeo.org/pipermail/qgis-developer/2017-November/050456.html
122 sem.acquire( requiredFreeConnectionCount );
123 }
124 sem.release( requiredFreeConnectionCount - 1 );
125
126 // quick (preferred) way - use cached connection
127 {
128 QMutexLocker locker( &connMutex );
129
130 if ( !conns.isEmpty() )
131 {
132 QgsDebugMsgLevel( u"Trying to use existing connection"_s, 2 );
133 Item i = conns.pop();
134 if ( !qgsConnectionPool_ConnectionIsValid( i.c ) )
135 {
136 QgsDebugMsgLevel( u"Connection is not valid, destroying"_s, 2 );
137 qgsConnectionPool_ConnectionDestroy( i.c );
138 QgsDebugMsgLevel( u"Creating new connection"_s, 2 );
139 qgsConnectionPool_ConnectionCreate( connInfo, i.c );
140 }
141
142
143 // no need to run if nothing can expire
144 if ( conns.isEmpty() )
145 {
146 // will call the slot directly or queue the call (if the object lives in a different thread)
147 QMetaObject::invokeMethod( expirationTimer->parent(), "stopExpirationTimer" );
148 }
149
150 QgsDebugMsgLevel( u"Acquired connection"_s, 2 );
151 acquiredConns.append( i.c );
152
153 return i.c;
154 }
155 }
156
157 QgsDebugMsgLevel( u"Creating new connection"_s, 2 );
158 T c;
159 qgsConnectionPool_ConnectionCreate( connInfo, c );
160 if ( !c )
161 {
162 // we didn't get connection for some reason, so release the lock
163 sem.release();
164 QgsDebugMsgLevel( u"Failed to create new connection"_s, 2 );
165 return nullptr;
166 }
167
168 connMutex.lock();
169 QgsDebugMsgLevel( u"Acquired connection with name: %1"_s.arg( qgsConnectionPool_ConnectionToName( c ) ), 2 );
170 acquiredConns.append( c );
171 connMutex.unlock();
172 return c;
173 }
174
175 void release( T conn )
176 {
177 QgsDebugMsgLevel( u"Releasing connection"_s, 2 );
178 connMutex.lock();
179 acquiredConns.removeAll( conn );
180 if ( !qgsConnectionPool_ConnectionIsValid( conn ) )
181 {
182 QgsDebugMsgLevel( u"Destroying invalid connection"_s, 2 );
183 qgsConnectionPool_ConnectionDestroy( conn );
184 }
185 else
186 {
187 Item i;
188 i.c = conn;
189 i.lastUsedTime = QTime::currentTime();
190 conns.push( i );
191
192 if ( !expirationTimer->isActive() )
193 {
194 // will call the slot directly or queue the call (if the object lives in a different thread)
195 QMetaObject::invokeMethod( expirationTimer->parent(), "startExpirationTimer" );
196 }
197 }
198
199 connMutex.unlock();
200
201 sem.release(); // this can unlock a thread waiting in acquire()
202 }
203
205 {
206 QgsDebugMsgLevel( u"Invalidating connections for group"_s, 2 );
207 connMutex.lock();
208 for ( const Item &i : std::as_const( conns ) )
209 {
210 qgsConnectionPool_ConnectionDestroy( i.c );
211 }
212 conns.clear();
213 for ( T c : std::as_const( acquiredConns ) )
214 qgsConnectionPool_InvalidateConnection( c );
215 connMutex.unlock();
216 }
217
218 protected:
225 template<typename U> void initTimer( U *parent )
226 {
227 expirationTimer = new QTimer( parent );
228 expirationTimer->setInterval( CONN_POOL_EXPIRATION_TIME * 1000 );
229 QObject::connect( expirationTimer, &QTimer::timeout, parent, &U::handleConnectionExpired );
230
231 // just to make sure the object belongs to main thread and thus will get events
232 if ( qApp )
233 parent->moveToThread( qApp->thread() );
234 }
235
237 {
238 connMutex.lock();
239
240 QTime now = QTime::currentTime();
241
242 // what connections have expired?
243 QList<int> toDelete;
244 for ( int i = 0; i < conns.count(); ++i )
245 {
246 if ( conns.at( i ).lastUsedTime.secsTo( now ) >= CONN_POOL_EXPIRATION_TIME )
247 toDelete.append( i );
248 }
249
250 // delete expired connections
251 for ( int j = toDelete.count() - 1; j >= 0; --j )
252 {
253 int index = toDelete[j];
254 qgsConnectionPool_ConnectionDestroy( conns[index].c );
255 conns.remove( index );
256 }
257
258 if ( conns.isEmpty() )
259 expirationTimer->stop();
260
261 connMutex.unlock();
262 }
263
264 protected:
265 QString connInfo;
266 QStack<Item> conns;
268 QMutex connMutex;
269 QSemaphore sem;
270 QTimer *expirationTimer = nullptr;
271};
272
273
291template<typename T, typename T_Group> class QgsConnectionPool
292{
293 public:
294 typedef QMap<QString, T_Group *> T_Groups;
295
297 {
298 QgsDebugMsgLevel( u"Destroying connection pool"_s, 2 );
299 mMutex.lock();
300 for ( auto it = mGroups.constBegin(); it != mGroups.constEnd(); ++it )
301 {
302 QgsDebugMsgLevel( u"Destroying connection pool group with key %1"_s.arg( it.key() ), 2 );
303 delete it.value();
304 }
305 QgsDebugMsgLevel( u"Connection pool groups destroyed"_s, 2 );
306 mGroups.clear();
307 mMutex.unlock();
308 }
309
320 T acquireConnection( const QString &connInfo, int timeout = -1, bool requestMayBeNested = false, QgsFeedback *feedback = nullptr )
321 {
322 QgsDebugMsgLevel( u"Trying to acquire connection for %1"_s.arg( connInfo ), 2 );
323 mMutex.lock();
324 typename T_Groups::iterator it = mGroups.find( connInfo );
325 if ( it == mGroups.end() )
326 {
327 QgsDebugMsgLevel( u"Could not find existing group, adding new one"_s, 2 );
328 it = mGroups.insert( connInfo, new T_Group( connInfo ) );
329 }
330 else
331 {
332 QgsDebugMsgLevel( u"Found existing group"_s, 2 );
333 }
334 T_Group *group = *it;
335 mMutex.unlock();
336
337 if ( feedback )
338 {
339 QElapsedTimer timer;
340 timer.start();
341
342 while ( !feedback->isCanceled() )
343 {
344 if ( T conn = group->acquire( 300, requestMayBeNested ) )
345 return conn;
346
347 if ( timeout > 0 && timer.elapsed() >= timeout )
348 return nullptr;
349 }
350 return nullptr;
351 }
352 else
353 {
354 return group->acquire( timeout, requestMayBeNested );
355 }
356 }
357
359 void releaseConnection( T conn )
360 {
361 mMutex.lock();
362 const QString groupName = qgsConnectionPool_ConnectionToName( conn );
363 QgsDebugMsgLevel( u"Releasing connection for %1"_s.arg( groupName ), 2 );
364 typename T_Groups::iterator it = mGroups.find( groupName );
365 Q_ASSERT( it != mGroups.end() );
366 T_Group *group = *it;
367 mMutex.unlock();
368
369 QgsDebugMsgLevel( u"Group found, releasing..."_s, 2 );
370 group->release( conn );
371 }
372
380 void invalidateConnections( const QString &connInfo )
381 {
382 QgsDebugMsgLevel( u"Invalidating connections for %1"_s.arg( connInfo ), 2 );
383 mMutex.lock();
384
385 auto it = mGroups.constFind( connInfo );
386 if ( it != mGroups.constEnd() )
387 {
388 QgsDebugMsgLevel( u"Found group, invalidating..."_s, 2 );
389 it.value()->invalidateConnections();
390 }
391 else
392 {
393 QgsDebugMsgLevel( u"Could not find matching group!"_s, 2 );
394 }
395 mMutex.unlock();
396 }
397
398
399 protected:
401 QMutex mMutex;
402};
403
404
405#endif // QGSCONNECTIONPOOL_H
Extends QApplication to provide access to QGIS specific resources such as theme paths,...
QgsConnectionPoolGroup(const QgsConnectionPoolGroup &other)=delete
T acquire(int timeout, bool requestMayBeNested)
Try to acquire a connection for a maximum of timeout milliseconds.
void initTimer(U *parent)
Initializes the connection timeout handling.
QgsConnectionPoolGroup(const QString &ci)
Constructor for QgsConnectionPoolGroup, with the specified connection info.
QgsConnectionPoolGroup & operator=(const QgsConnectionPoolGroup &other)=delete
Template class responsible for keeping a pool of open connections.
QMap< QString, T_Group * > T_Groups
void invalidateConnections(const QString &connInfo)
Invalidates all connections to the specified resource.
T acquireConnection(const QString &connInfo, int timeout=-1, bool requestMayBeNested=false, QgsFeedback *feedback=nullptr)
Try to acquire a connection for a maximum of timeout milliseconds.
void releaseConnection(T conn)
Release an existing connection so it will get back into the pool and can be reused.
Base class for feedback objects to be used for cancellation of something running in a worker thread.
Definition qgsfeedback.h:44
As part of the API refactoring and improvements which landed in the Processing API was substantially reworked from the x version This was done in order to allow much of the underlying Processing framework to be ported into c
#define CONN_POOL_SPARE_CONNECTIONS
#define CONN_POOL_EXPIRATION_TIME
#define QgsDebugMsgLevel(str, level)
Definition qgslogger.h:63