QGIS API Documentation 4.1.0-Master (376402f9aeb)
Loading...
Searching...
No Matches
qgsconnectionpool.h
Go to the documentation of this file.
1/***************************************************************************
2 qgsconnectionpool.h
3 ---------------------
4 begin : February 2014
5 copyright : (C) 2014 by Martin Dobias
6 email : wonder dot sk at gmail dot com
7 ***************************************************************************
8 * *
9 * This program is free software; you can redistribute it and/or modify *
10 * it under the terms of the GNU General Public License as published by *
11 * the Free Software Foundation; either version 2 of the License, or *
12 * (at your option) any later version. *
13 * *
14 ***************************************************************************/
15
16#ifndef QGSCONNECTIONPOOL_H
17#define QGSCONNECTIONPOOL_H
18
19
20#include "qgis.h"
21#include "qgsapplication.h"
22#include "qgsfeedback.h"
23#include "qgslogger.h"
24
25#include <QCoreApplication>
26#include <QElapsedTimer>
27#include <QMap>
28#include <QMutex>
29#include <QSemaphore>
30#include <QStack>
31#include <QString>
32#include <QThread>
33#include <QTime>
34#include <QTimer>
35
36#define SIP_NO_FILE
37
38using namespace Qt::StringLiterals;
39
40#define CONN_POOL_EXPIRATION_TIME 60 // in seconds
41#define CONN_POOL_SPARE_CONNECTIONS 2 // number of spare connections in case all the base connections are used but we have a nested request with the risk of a deadlock
42
43
67template<typename T> class QgsConnectionPoolGroup
68{
69 public:
70 struct Item
71 {
72 T c;
74 };
75
79 QgsConnectionPoolGroup( const QString &ci )
80 : connInfo( ci )
81 , sem( QgsApplication::instance()->maxConcurrentConnectionsPerPool() + CONN_POOL_SPARE_CONNECTIONS )
82 {}
83
85 {
86 QgsDebugMsgLevel( u"Destroying connection pool group"_s, 2 );
87 for ( const Item &item : std::as_const( conns ) )
88 {
89 qgsConnectionPool_ConnectionDestroy( item.c );
90 }
91 }
92
95
103 T acquire( int timeout, bool requestMayBeNested )
104 {
105 QgsDebugMsgLevel( u"Trying to acquire connection"_s, 2 );
106 const int requiredFreeConnectionCount = requestMayBeNested ? 1 : 3;
107 // we are going to acquire a resource - if no resource is available, we will block here
108 if ( !sem.tryAcquire( requiredFreeConnectionCount, QDeadlineTimer( timeout ) ) )
109 {
110 QgsDebugMsgLevel( u"Failed to acquire semaphore"_s, 2 );
111 return nullptr;
112 }
113
114 sem.release( requiredFreeConnectionCount - 1 );
115
116 // quick (preferred) way - use cached connection
117 {
118 QMutexLocker locker( &connMutex );
119
120 if ( !conns.isEmpty() )
121 {
122 QgsDebugMsgLevel( u"Trying to use existing connection"_s, 2 );
123 Item i = conns.pop();
124 if ( !qgsConnectionPool_ConnectionIsValid( i.c ) )
125 {
126 QgsDebugMsgLevel( u"Connection is not valid, destroying"_s, 2 );
127 qgsConnectionPool_ConnectionDestroy( i.c );
128 QgsDebugMsgLevel( u"Creating new connection"_s, 2 );
129 qgsConnectionPool_ConnectionCreate( connInfo, i.c );
130 }
131
132
133 // no need to run if nothing can expire
134 if ( conns.isEmpty() )
135 {
136 // will call the slot directly or queue the call (if the object lives in a different thread)
137 QMetaObject::invokeMethod( expirationTimer->parent(), "stopExpirationTimer" );
138 }
139
140 QgsDebugMsgLevel( u"Acquired connection"_s, 2 );
141 acquiredConns.append( i.c );
142
143 return i.c;
144 }
145 }
146
147 QgsDebugMsgLevel( u"Creating new connection"_s, 2 );
148 T c;
149 qgsConnectionPool_ConnectionCreate( connInfo, c );
150 if ( !c )
151 {
152 // we didn't get connection for some reason, so release the lock
153 sem.release();
154 QgsDebugMsgLevel( u"Failed to create new connection"_s, 2 );
155 return nullptr;
156 }
157
158 connMutex.lock();
159 QgsDebugMsgLevel( u"Acquired connection with name: %1"_s.arg( qgsConnectionPool_ConnectionToName( c ) ), 2 );
160 acquiredConns.append( c );
161 connMutex.unlock();
162 return c;
163 }
164
165 void release( T conn )
166 {
167 QgsDebugMsgLevel( u"Releasing connection"_s, 2 );
168 connMutex.lock();
169 acquiredConns.removeAll( conn );
170 if ( !qgsConnectionPool_ConnectionIsValid( conn ) )
171 {
172 QgsDebugMsgLevel( u"Destroying invalid connection"_s, 2 );
173 qgsConnectionPool_ConnectionDestroy( conn );
174 }
175 else
176 {
177 Item i;
178 i.c = conn;
179 i.lastUsedTime = QTime::currentTime();
180 conns.push( i );
181
182 if ( !expirationTimer->isActive() )
183 {
184 // will call the slot directly or queue the call (if the object lives in a different thread)
185 QMetaObject::invokeMethod( expirationTimer->parent(), "startExpirationTimer" );
186 }
187 }
188
189 connMutex.unlock();
190
191 sem.release(); // this can unlock a thread waiting in acquire()
192 }
193
195 {
196 QgsDebugMsgLevel( u"Invalidating connections for group"_s, 2 );
197 connMutex.lock();
198 for ( const Item &i : std::as_const( conns ) )
199 {
200 qgsConnectionPool_ConnectionDestroy( i.c );
201 }
202 conns.clear();
203 for ( T c : std::as_const( acquiredConns ) )
204 qgsConnectionPool_InvalidateConnection( c );
205 connMutex.unlock();
206 }
207
208 protected:
215 template<typename U> void initTimer( U *parent )
216 {
217 expirationTimer = new QTimer( parent );
218 expirationTimer->setInterval( CONN_POOL_EXPIRATION_TIME * 1000 );
219 QObject::connect( expirationTimer, &QTimer::timeout, parent, &U::handleConnectionExpired );
220
221 // just to make sure the object belongs to main thread and thus will get events
222 if ( qApp )
223 parent->moveToThread( qApp->thread() );
224 }
225
227 {
228 connMutex.lock();
229
230 QTime now = QTime::currentTime();
231
232 // what connections have expired?
233 QList<int> toDelete;
234 for ( int i = 0; i < conns.count(); ++i )
235 {
236 if ( conns.at( i ).lastUsedTime.secsTo( now ) >= CONN_POOL_EXPIRATION_TIME )
237 toDelete.append( i );
238 }
239
240 // delete expired connections
241 for ( int j = toDelete.count() - 1; j >= 0; --j )
242 {
243 int index = toDelete[j];
244 qgsConnectionPool_ConnectionDestroy( conns[index].c );
245 conns.remove( index );
246 }
247
248 if ( conns.isEmpty() )
249 expirationTimer->stop();
250
251 connMutex.unlock();
252 }
253
254 protected:
255 QString connInfo;
256 QStack<Item> conns;
258 QMutex connMutex;
259 QSemaphore sem;
260 QTimer *expirationTimer = nullptr;
261};
262
263
281template<typename T, typename T_Group> class QgsConnectionPool
282{
283 public:
284 typedef QMap<QString, T_Group *> T_Groups;
285
287 {
288 QgsDebugMsgLevel( u"Destroying connection pool"_s, 2 );
289 mMutex.lock();
290 for ( auto it = mGroups.constBegin(); it != mGroups.constEnd(); ++it )
291 {
292 QgsDebugMsgLevel( u"Destroying connection pool group with key %1"_s.arg( it.key() ), 2 );
293 delete it.value();
294 }
295 QgsDebugMsgLevel( u"Connection pool groups destroyed"_s, 2 );
296 mGroups.clear();
297 mMutex.unlock();
298 }
299
310 T acquireConnection( const QString &connInfo, int timeout = -1, bool requestMayBeNested = false, QgsFeedback *feedback = nullptr )
311 {
312 QgsDebugMsgLevel( u"Trying to acquire connection for %1"_s.arg( connInfo ), 2 );
313 mMutex.lock();
314 typename T_Groups::iterator it = mGroups.find( connInfo );
315 if ( it == mGroups.end() )
316 {
317 QgsDebugMsgLevel( u"Could not find existing group, adding new one"_s, 2 );
318 it = mGroups.insert( connInfo, new T_Group( connInfo ) );
319 }
320 else
321 {
322 QgsDebugMsgLevel( u"Found existing group"_s, 2 );
323 }
324 T_Group *group = *it;
325 mMutex.unlock();
326
327 if ( feedback )
328 {
329 QElapsedTimer timer;
330 timer.start();
331
332 while ( !feedback->isCanceled() )
333 {
334 if ( T conn = group->acquire( 300, requestMayBeNested ) )
335 return conn;
336
337 if ( timeout > 0 && timer.elapsed() >= timeout )
338 return nullptr;
339 }
340 return nullptr;
341 }
342 else
343 {
344 return group->acquire( timeout, requestMayBeNested );
345 }
346 }
347
349 void releaseConnection( T conn )
350 {
351 mMutex.lock();
352 const QString groupName = qgsConnectionPool_ConnectionToName( conn );
353 QgsDebugMsgLevel( u"Releasing connection for %1"_s.arg( groupName ), 2 );
354 typename T_Groups::iterator it = mGroups.find( groupName );
355 Q_ASSERT( it != mGroups.end() );
356 T_Group *group = *it;
357 mMutex.unlock();
358
359 QgsDebugMsgLevel( u"Group found, releasing..."_s, 2 );
360 group->release( conn );
361 }
362
370 void invalidateConnections( const QString &connInfo )
371 {
372 QgsDebugMsgLevel( u"Invalidating connections for %1"_s.arg( connInfo ), 2 );
373 mMutex.lock();
374
375 auto it = mGroups.constFind( connInfo );
376 if ( it != mGroups.constEnd() )
377 {
378 QgsDebugMsgLevel( u"Found group, invalidating..."_s, 2 );
379 it.value()->invalidateConnections();
380 }
381 else
382 {
383 QgsDebugMsgLevel( u"Could not find matching group!"_s, 2 );
384 }
385 mMutex.unlock();
386 }
387
388
389 protected:
391 QMutex mMutex;
392};
393
394
395#endif // QGSCONNECTIONPOOL_H
Extends QApplication to provide access to QGIS specific resources such as theme paths,...
QgsConnectionPoolGroup(const QgsConnectionPoolGroup &other)=delete
T acquire(int timeout, bool requestMayBeNested)
Try to acquire a connection for a maximum of timeout milliseconds.
void initTimer(U *parent)
Initializes the connection timeout handling.
QgsConnectionPoolGroup(const QString &ci)
Constructor for QgsConnectionPoolGroup, with the specified connection info.
QgsConnectionPoolGroup & operator=(const QgsConnectionPoolGroup &other)=delete
Template class responsible for keeping a pool of open connections.
QMap< QString, T_Group * > T_Groups
void invalidateConnections(const QString &connInfo)
Invalidates all connections to the specified resource.
T acquireConnection(const QString &connInfo, int timeout=-1, bool requestMayBeNested=false, QgsFeedback *feedback=nullptr)
Try to acquire a connection for a maximum of timeout milliseconds.
void releaseConnection(T conn)
Release an existing connection so it will get back into the pool and can be reused.
Base class for feedback objects to be used for cancellation of something running in a worker thread.
Definition qgsfeedback.h:44
As part of the API refactoring and improvements which landed in the Processing API was substantially reworked from the x version This was done in order to allow much of the underlying Processing framework to be ported into c
#define CONN_POOL_SPARE_CONNECTIONS
#define CONN_POOL_EXPIRATION_TIME
#define QgsDebugMsgLevel(str, level)
Definition qgslogger.h:63