|
MythTV
0.26-pre
|
00001 /* -*- Mode: c++ -*- 00002 * 00003 * Class MThreadPool 00004 * 00005 * Copyright (C) Daniel Kristjansson 2011 00006 * 00007 * This program is free software; you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation; either version 2 of the License, or 00010 * (at your option) any later version. 00011 * 00012 * This program is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with this program; if not, write to the Free Software 00019 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00020 */ 00021 00022 // C++ headers 00023 #include <algorithm> 00024 using namespace std; 00025 00026 // Qt headers 00027 #include <QCoreApplication> 00028 #include <QWaitCondition> 00029 #include <QMutexLocker> 00030 #include <QRunnable> 00031 #include <QMutex> 00032 #include <QList> 00033 #include <QPair> 00034 #include <QMap> 00035 #include <QSet> 00036 00037 // MythTV headers 00038 #include "mthreadpool.h" 00039 #include "mythlogging.h" 00040 #include "mythtimer.h" 00041 #include "logging.h" 00042 #include "mthread.h" 00043 #include "mythdb.h" 00044 00045 typedef QPair<QRunnable*,QString> MPoolEntry; 00046 typedef QList<MPoolEntry> MPoolQueue; 00047 typedef QMap<int, MPoolQueue> MPoolQueues; 00048 00049 class MPoolThread : public MThread 00050 { 00051 public: 00052 MPoolThread(MThreadPool &pool, int timeout) : 00053 MThread("PT"), m_pool(pool), m_expiry_timeout(timeout), 00054 m_do_run(true), m_reserved(false) 00055 { 00056 QMutexLocker locker(&s_lock); 00057 setObjectName(QString("PT%1").arg(s_thread_num)); 00058 s_thread_num++; 00059 } 00060 00061 void run(void) 00062 { 00063 RunProlog(); 00064 00065 MythTimer t; 00066 t.start(); 00067 QMutexLocker locker(&m_lock); 00068 while (true) 00069 { 00070 if (m_do_run && !m_runnable) 00071 m_wait.wait(locker.mutex(), m_expiry_timeout+1); 00072 00073 if (!m_runnable) 00074 { 00075 m_do_run = false; 00076 00077 locker.unlock(); 00078 m_pool.NotifyDone(this); 00079 locker.relock(); 00080 break; 00081 } 00082 00083 if (!m_runnable_name.isEmpty()) 00084 loggingRegisterThread(m_runnable_name); 00085 00086 bool autodelete = m_runnable->autoDelete(); 00087 m_runnable->run(); 00088 if (autodelete) 00089 delete m_runnable; 00090 if (m_reserved) 00091 m_pool.ReleaseThread(); 00092 m_reserved = false; 00093 m_runnable = NULL; 00094 00095 loggingDeregisterThread(); 00096 loggingRegisterThread(objectName()); 00097 00098 GetMythDB()->GetDBManager()->PurgeIdleConnections(false); 00099 QCoreApplication::processEvents(); 00100 00101 t.start(); 00102 00103 if (m_do_run) 00104 { 00105 locker.unlock(); 00106 m_pool.NotifyAvailable(this); 00107 locker.relock(); 00108 } 00109 else 00110 { 00111 locker.unlock(); 00112 m_pool.NotifyDone(this); 00113 locker.relock(); 00114 break; 00115 } 00116 } 00117 00118 RunEpilog(); 00119 } 00120 00121 bool SetRunnable(QRunnable *runnable, QString runnableName, 00122 bool reserved) 00123 { 00124 QMutexLocker locker(&m_lock); 00125 if (m_do_run && (m_runnable == NULL)) 00126 { 00127 m_runnable = runnable; 00128 m_runnable_name = runnableName; 00129 m_reserved = reserved; 00130 m_wait.wakeAll(); 00131 return true; 00132 } 00133 return false; 00134 } 00135 00136 void Shutdown(void) 00137 { 00138 QMutexLocker locker(&m_lock); 00139 m_do_run = false; 00140 m_wait.wakeAll(); 00141 } 00142 00143 QMutex m_lock; 00144 QWaitCondition m_wait; 00145 MThreadPool &m_pool; 00146 int m_expiry_timeout; 00147 bool m_do_run; 00148 QString m_runnable_name; 00149 bool m_reserved; 00150 00151 static QMutex s_lock; 00152 static uint s_thread_num; 00153 }; 00154 QMutex MPoolThread::s_lock; 00155 uint MPoolThread::s_thread_num = 0; 00156 00158 00159 class MThreadPoolPrivate 00160 { 00161 public: 00162 MThreadPoolPrivate(const QString &name) : 00163 m_name(name), 00164 m_running(true), 00165 m_expiry_timeout(120 * 1000), 00166 m_max_thread_count(QThread::idealThreadCount()), 00167 m_reserve_thread(0) 00168 { 00169 } 00170 00171 int GetRealMaxThread(void) 00172 { 00173 return max(m_max_thread_count,1) + m_reserve_thread; 00174 } 00175 00176 mutable QMutex m_lock; 00177 QString m_name; 00178 QWaitCondition m_wait; 00179 bool m_running; 00180 int m_expiry_timeout; 00181 int m_max_thread_count; 00182 int m_reserve_thread; 00183 00184 MPoolQueues m_run_queues; 00185 QSet<MPoolThread*> m_avail_threads; 00186 QSet<MPoolThread*> m_running_threads; 00187 QList<MPoolThread*> m_delete_threads; 00188 00189 static QMutex s_pool_lock; 00190 static MThreadPool *s_pool; 00191 static QList<MThreadPool*> s_all_pools; 00192 }; 00193 00194 QMutex MThreadPoolPrivate::s_pool_lock(QMutex::Recursive); 00195 MThreadPool *MThreadPoolPrivate::s_pool = NULL; 00196 QList<MThreadPool*> MThreadPoolPrivate::s_all_pools; 00197 00199 00200 MThreadPool::MThreadPool(const QString &name) : 00201 m_priv(new MThreadPoolPrivate(name)) 00202 { 00203 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock); 00204 MThreadPoolPrivate::s_all_pools.push_back(this); 00205 } 00206 00207 MThreadPool::~MThreadPool() 00208 { 00209 Stop(); 00210 DeletePoolThreads(); 00211 { 00212 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock); 00213 MThreadPoolPrivate::s_all_pools.removeAll(this); 00214 } 00215 delete m_priv; 00216 m_priv = NULL; 00217 } 00218 00219 void MThreadPool::Stop(void) 00220 { 00221 QMutexLocker locker(&m_priv->m_lock); 00222 m_priv->m_running = false; 00223 QSet<MPoolThread*>::iterator it = m_priv->m_avail_threads.begin(); 00224 for (; it != m_priv->m_avail_threads.end(); ++it) 00225 (*it)->Shutdown(); 00226 it = m_priv->m_running_threads.begin(); 00227 for (; it != m_priv->m_running_threads.end(); ++it) 00228 (*it)->Shutdown(); 00229 m_priv->m_wait.wakeAll(); 00230 } 00231 00232 void MThreadPool::DeletePoolThreads(void) 00233 { 00234 waitForDone(); 00235 00236 QMutexLocker locker(&m_priv->m_lock); 00237 QSet<MPoolThread*>::iterator it = m_priv->m_avail_threads.begin(); 00238 for (; it != m_priv->m_avail_threads.end(); ++it) 00239 { 00240 m_priv->m_delete_threads.push_front(*it); 00241 } 00242 m_priv->m_avail_threads.clear(); 00243 00244 while (!m_priv->m_delete_threads.empty()) 00245 { 00246 MPoolThread *thread = m_priv->m_delete_threads.back(); 00247 locker.unlock(); 00248 00249 thread->wait(); 00250 00251 locker.relock(); 00252 delete thread; 00253 if (m_priv->m_delete_threads.back() == thread) 00254 m_priv->m_delete_threads.pop_back(); 00255 else 00256 m_priv->m_delete_threads.removeAll(thread); 00257 } 00258 } 00259 00260 MThreadPool *MThreadPool::globalInstance(void) 00261 { 00262 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock); 00263 if (!MThreadPoolPrivate::s_pool) 00264 MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool"); 00265 return MThreadPoolPrivate::s_pool; 00266 } 00267 00268 void MThreadPool::StopAllPools(void) 00269 { 00270 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock); 00271 QList<MThreadPool*>::iterator it; 00272 for (it = MThreadPoolPrivate::s_all_pools.begin(); 00273 it != MThreadPoolPrivate::s_all_pools.end(); ++it) 00274 { 00275 (*it)->Stop(); 00276 } 00277 } 00278 00279 void MThreadPool::ShutdownAllPools(void) 00280 { 00281 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock); 00282 QList<MThreadPool*>::iterator it; 00283 for (it = MThreadPoolPrivate::s_all_pools.begin(); 00284 it != MThreadPoolPrivate::s_all_pools.end(); ++it) 00285 { 00286 (*it)->Stop(); 00287 } 00288 for (it = MThreadPoolPrivate::s_all_pools.begin(); 00289 it != MThreadPoolPrivate::s_all_pools.end(); ++it) 00290 { 00291 (*it)->DeletePoolThreads(); 00292 } 00293 } 00294 00295 void MThreadPool::start(QRunnable *runnable, QString debugName, int priority) 00296 { 00297 QMutexLocker locker(&m_priv->m_lock); 00298 if (TryStartInternal(runnable, debugName, false)) 00299 return; 00300 00301 MPoolQueues::iterator it = m_priv->m_run_queues.find(priority); 00302 if (it != m_priv->m_run_queues.end()) 00303 { 00304 (*it).push_back(MPoolEntry(runnable,debugName)); 00305 } 00306 else 00307 { 00308 MPoolQueue list; 00309 list.push_back(MPoolEntry(runnable,debugName)); 00310 m_priv->m_run_queues[priority] = list; 00311 } 00312 } 00313 00314 void MThreadPool::startReserved( 00315 QRunnable *runnable, QString debugName, int waitForAvailMS) 00316 { 00317 QMutexLocker locker(&m_priv->m_lock); 00318 if (waitForAvailMS > 0 && m_priv->m_avail_threads.empty() && 00319 m_priv->m_running_threads.size() >= m_priv->m_max_thread_count) 00320 { 00321 MythTimer t; 00322 t.start(); 00323 int left = waitForAvailMS - t.elapsed(); 00324 while (left > 0 && m_priv->m_avail_threads.empty() && 00325 m_priv->m_running_threads.size() >= m_priv->m_max_thread_count) 00326 { 00327 m_priv->m_wait.wait(locker.mutex(), left); 00328 left = waitForAvailMS - t.elapsed(); 00329 } 00330 } 00331 TryStartInternal(runnable, debugName, true); 00332 } 00333 00334 00335 bool MThreadPool::tryStart(QRunnable *runnable, QString debugName) 00336 { 00337 QMutexLocker locker(&m_priv->m_lock); 00338 return TryStartInternal(runnable, debugName, false); 00339 } 00340 00341 bool MThreadPool::TryStartInternal( 00342 QRunnable *runnable, QString debugName, bool reserved) 00343 { 00344 if (!m_priv->m_running) 00345 return false; 00346 00347 while (!m_priv->m_delete_threads.empty()) 00348 { 00349 m_priv->m_delete_threads.back()->wait(); 00350 delete m_priv->m_delete_threads.back(); 00351 m_priv->m_delete_threads.pop_back(); 00352 } 00353 00354 while (m_priv->m_avail_threads.begin() != m_priv->m_avail_threads.end()) 00355 { 00356 MPoolThread *thread = *m_priv->m_avail_threads.begin(); 00357 m_priv->m_avail_threads.erase(m_priv->m_avail_threads.begin()); 00358 m_priv->m_running_threads.insert(thread); 00359 if (reserved) 00360 m_priv->m_reserve_thread++; 00361 if (thread->SetRunnable(runnable, debugName, reserved)) 00362 { 00363 return true; 00364 } 00365 else 00366 { 00367 if (reserved) 00368 m_priv->m_reserve_thread--; 00369 thread->Shutdown(); 00370 m_priv->m_running_threads.remove(thread); 00371 m_priv->m_delete_threads.push_front(thread); 00372 } 00373 } 00374 00375 if (reserved || 00376 m_priv->m_running_threads.size() < m_priv->GetRealMaxThread()) 00377 { 00378 if (reserved) 00379 m_priv->m_reserve_thread++; 00380 MPoolThread *thread = new MPoolThread(*this, m_priv->m_expiry_timeout); 00381 m_priv->m_running_threads.insert(thread); 00382 thread->SetRunnable(runnable, debugName, reserved); 00383 thread->start(); 00384 return true; 00385 } 00386 00387 return false; 00388 } 00389 00390 void MThreadPool::NotifyAvailable(MPoolThread *thread) 00391 { 00392 QMutexLocker locker(&m_priv->m_lock); 00393 00394 if (!m_priv->m_running) 00395 { 00396 m_priv->m_running_threads.remove(thread); 00397 thread->Shutdown(); 00398 m_priv->m_delete_threads.push_front(thread); 00399 m_priv->m_wait.wakeAll(); 00400 return; 00401 } 00402 00403 MPoolQueues::iterator it = m_priv->m_run_queues.begin(); 00404 if (it == m_priv->m_run_queues.end()) 00405 { 00406 m_priv->m_running_threads.remove(thread); 00407 m_priv->m_avail_threads.insert(thread); 00408 m_priv->m_wait.wakeAll(); 00409 return; 00410 } 00411 00412 MPoolEntry e = (*it).front(); 00413 if (!thread->SetRunnable(e.first, e.second, false)) 00414 { 00415 m_priv->m_running_threads.remove(thread); 00416 m_priv->m_wait.wakeAll(); 00417 if (!TryStartInternal(e.first, e.second, false)) 00418 { 00419 thread->Shutdown(); 00420 m_priv->m_delete_threads.push_front(thread); 00421 return; 00422 } 00423 thread->Shutdown(); 00424 m_priv->m_delete_threads.push_front(thread); 00425 } 00426 00427 (*it).pop_front(); 00428 if ((*it).empty()) 00429 m_priv->m_run_queues.erase(it); 00430 } 00431 00432 void MThreadPool::NotifyDone(MPoolThread *thread) 00433 { 00434 QMutexLocker locker(&m_priv->m_lock); 00435 m_priv->m_running_threads.remove(thread); 00436 m_priv->m_avail_threads.remove(thread); 00437 if (!m_priv->m_delete_threads.contains(thread)) 00438 m_priv->m_delete_threads.push_front(thread); 00439 m_priv->m_wait.wakeAll(); 00440 } 00441 00442 int MThreadPool::expiryTimeout(void) const 00443 { 00444 QMutexLocker locker(&m_priv->m_lock); 00445 return m_priv->m_expiry_timeout; 00446 } 00447 00448 void MThreadPool::setExpiryTimeout(int expiryTimeout) 00449 { 00450 QMutexLocker locker(&m_priv->m_lock); 00451 m_priv->m_expiry_timeout = expiryTimeout; 00452 } 00453 00454 int MThreadPool::maxThreadCount(void) const 00455 { 00456 QMutexLocker locker(&m_priv->m_lock); 00457 return m_priv->m_max_thread_count; 00458 } 00459 00460 void MThreadPool::setMaxThreadCount(int maxThreadCount) 00461 { 00462 QMutexLocker locker(&m_priv->m_lock); 00463 m_priv->m_max_thread_count = maxThreadCount; 00464 } 00465 00466 int MThreadPool::activeThreadCount(void) const 00467 { 00468 QMutexLocker locker(&m_priv->m_lock); 00469 return m_priv->m_avail_threads.size() + m_priv->m_running_threads.size(); 00470 } 00471 00472 /* 00473 void MThreadPool::reserveThread(void) 00474 { 00475 QMutexLocker locker(&m_priv->m_lock); 00476 m_priv->m_reserve_thread++; 00477 } 00478 00479 void MThreadPool::releaseThread(void) 00480 { 00481 QMutexLocker locker(&m_priv->m_lock); 00482 if (m_priv->m_reserve_thread > 0) 00483 m_priv->m_reserve_thread--; 00484 } 00485 */ 00486 00487 void MThreadPool::ReleaseThread(void) 00488 { 00489 QMutexLocker locker(&m_priv->m_lock); 00490 if (m_priv->m_reserve_thread > 0) 00491 m_priv->m_reserve_thread--; 00492 } 00493 00494 #if 0 00495 static void print_set(QString title, QSet<MPoolThread*> set) 00496 { 00497 LOG(VB_GENERAL, LOG_INFO, title); 00498 QSet<MPoolThread*>::iterator it = set.begin(); 00499 for (; it != set.end(); ++it) 00500 { 00501 LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1") 00502 .arg((quint64)(*it),0,16)); 00503 } 00504 LOG(VB_GENERAL, LOG_INFO, ""); 00505 } 00506 #endif 00507 00508 void MThreadPool::waitForDone(void) 00509 { 00510 QMutexLocker locker(&m_priv->m_lock); 00511 while (true) 00512 { 00513 while (!m_priv->m_delete_threads.empty()) 00514 { 00515 m_priv->m_delete_threads.back()->wait(); 00516 delete m_priv->m_delete_threads.back(); 00517 m_priv->m_delete_threads.pop_back(); 00518 } 00519 00520 if (m_priv->m_running && !m_priv->m_run_queues.empty()) 00521 { 00522 m_priv->m_wait.wait(locker.mutex()); 00523 continue; 00524 } 00525 00526 QSet<MPoolThread*> working = m_priv->m_running_threads; 00527 working = working.subtract(m_priv->m_avail_threads); 00528 if (working.empty()) 00529 break; 00530 m_priv->m_wait.wait(locker.mutex()); 00531 } 00532 } 00533 00534 /* vim: set expandtab tabstop=4 shiftwidth=4: */
1.7.6.1