MythTV  0.26-pre
mthreadpool.cpp
Go to the documentation of this file.
00001 /*  -*- Mode: c++ -*-
00002  *
00003  *   Class MThreadPool
00004  *
00005  *   Copyright (C) Daniel Kristjansson 2011
00006  *
00007  *   This program is free software; you can redistribute it and/or modify
00008  *   it under the terms of the GNU General Public License as published by
00009  *   the Free Software Foundation; either version 2 of the License, or
00010  *   (at your option) any later version.
00011  *
00012  *   This program is distributed in the hope that it will be useful,
00013  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  *   GNU General Public License for more details.
00016  *
00017  *   You should have received a copy of the GNU General Public License
00018  *   along with this program; if not, write to the Free Software
00019  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  */
00021 
00022 // C++ headers
00023 #include <algorithm>
00024 using namespace std;
00025 
00026 // Qt headers
00027 #include <QCoreApplication>
00028 #include <QWaitCondition>
00029 #include <QMutexLocker>
00030 #include <QRunnable>
00031 #include <QMutex>
00032 #include <QList>
00033 #include <QPair>
00034 #include <QMap>
00035 #include <QSet>
00036 
00037 // MythTV headers
00038 #include "mthreadpool.h"
00039 #include "mythlogging.h"
00040 #include "mythtimer.h"
00041 #include "logging.h"
00042 #include "mthread.h"
00043 #include "mythdb.h"
00044 
00045 typedef QPair<QRunnable*,QString> MPoolEntry;
00046 typedef QList<MPoolEntry> MPoolQueue;
00047 typedef QMap<int, MPoolQueue> MPoolQueues;
00048 
00049 class MPoolThread : public MThread
00050 {
00051   public:
00052     MPoolThread(MThreadPool &pool, int timeout) :
00053         MThread("PT"), m_pool(pool), m_expiry_timeout(timeout),
00054         m_do_run(true), m_reserved(false)
00055     {
00056         QMutexLocker locker(&s_lock);
00057         setObjectName(QString("PT%1").arg(s_thread_num));
00058         s_thread_num++;
00059     }
00060 
00061     void run(void)
00062     {
00063         RunProlog();
00064 
00065         MythTimer t;
00066         t.start();
00067         QMutexLocker locker(&m_lock);
00068         while (true)
00069         {
00070             if (m_do_run && !m_runnable)
00071                 m_wait.wait(locker.mutex(), m_expiry_timeout+1);
00072 
00073             if (!m_runnable)
00074             {
00075                 m_do_run = false;
00076 
00077                 locker.unlock();
00078                 m_pool.NotifyDone(this);
00079                 locker.relock();
00080                 break;
00081             }
00082 
00083             if (!m_runnable_name.isEmpty())
00084                 loggingRegisterThread(m_runnable_name);
00085 
00086             bool autodelete = m_runnable->autoDelete();
00087             m_runnable->run();
00088             if (autodelete)
00089                 delete m_runnable;
00090             if (m_reserved)
00091                 m_pool.ReleaseThread();
00092             m_reserved = false;
00093             m_runnable = NULL;
00094 
00095             loggingDeregisterThread();
00096             loggingRegisterThread(objectName());
00097 
00098             GetMythDB()->GetDBManager()->PurgeIdleConnections(false);
00099             QCoreApplication::processEvents();
00100 
00101             t.start();
00102 
00103             if (m_do_run)
00104             {
00105                 locker.unlock();
00106                 m_pool.NotifyAvailable(this);
00107                 locker.relock();
00108             }
00109             else
00110             {
00111                 locker.unlock();
00112                 m_pool.NotifyDone(this);
00113                 locker.relock();
00114                 break;
00115             }
00116         }
00117 
00118         RunEpilog();
00119     }
00120 
00121     bool SetRunnable(QRunnable *runnable, QString runnableName,
00122                      bool reserved)
00123     {
00124         QMutexLocker locker(&m_lock);
00125         if (m_do_run && (m_runnable == NULL))
00126         {
00127             m_runnable = runnable;
00128             m_runnable_name = runnableName;
00129             m_reserved = reserved;
00130             m_wait.wakeAll();
00131             return true;
00132         }
00133         return false;
00134     }
00135 
00136     void Shutdown(void)
00137     {
00138         QMutexLocker locker(&m_lock);
00139         m_do_run = false;
00140         m_wait.wakeAll();
00141     }
00142 
00143     QMutex m_lock;
00144     QWaitCondition m_wait;
00145     MThreadPool &m_pool;
00146     int m_expiry_timeout;
00147     bool m_do_run;
00148     QString m_runnable_name;
00149     bool m_reserved;
00150 
00151     static QMutex s_lock;
00152     static uint s_thread_num;
00153 };
00154 QMutex MPoolThread::s_lock;
00155 uint MPoolThread::s_thread_num = 0;
00156 
00158 
00159 class MThreadPoolPrivate
00160 {
00161   public:
00162     MThreadPoolPrivate(const QString &name) :
00163         m_name(name),
00164         m_running(true),
00165         m_expiry_timeout(120 * 1000),
00166         m_max_thread_count(QThread::idealThreadCount()),
00167         m_reserve_thread(0)
00168     {
00169     }
00170 
00171     int GetRealMaxThread(void)
00172     {
00173         return max(m_max_thread_count,1) + m_reserve_thread;
00174     }
00175 
00176     mutable QMutex m_lock;
00177     QString m_name;
00178     QWaitCondition m_wait;
00179     bool m_running;
00180     int m_expiry_timeout;
00181     int m_max_thread_count;
00182     int m_reserve_thread;
00183 
00184     MPoolQueues m_run_queues;
00185     QSet<MPoolThread*> m_avail_threads;
00186     QSet<MPoolThread*> m_running_threads;
00187     QList<MPoolThread*> m_delete_threads;
00188 
00189     static QMutex s_pool_lock;
00190     static MThreadPool *s_pool;
00191     static QList<MThreadPool*> s_all_pools;
00192 };
00193 
00194 QMutex MThreadPoolPrivate::s_pool_lock(QMutex::Recursive);
00195 MThreadPool *MThreadPoolPrivate::s_pool = NULL;
00196 QList<MThreadPool*> MThreadPoolPrivate::s_all_pools;
00197 
00199 
00200 MThreadPool::MThreadPool(const QString &name) :
00201     m_priv(new MThreadPoolPrivate(name))
00202 {
00203     QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00204     MThreadPoolPrivate::s_all_pools.push_back(this);
00205 }
00206 
00207 MThreadPool::~MThreadPool()
00208 {
00209     Stop();
00210     DeletePoolThreads();
00211     {
00212         QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00213         MThreadPoolPrivate::s_all_pools.removeAll(this);
00214     }
00215     delete m_priv;
00216     m_priv = NULL;
00217 }
00218 
00219 void MThreadPool::Stop(void)
00220 {
00221     QMutexLocker locker(&m_priv->m_lock);
00222     m_priv->m_running = false;
00223     QSet<MPoolThread*>::iterator it = m_priv->m_avail_threads.begin();
00224     for (; it != m_priv->m_avail_threads.end(); ++it)
00225         (*it)->Shutdown();
00226     it = m_priv->m_running_threads.begin();
00227     for (; it != m_priv->m_running_threads.end(); ++it)
00228         (*it)->Shutdown();
00229     m_priv->m_wait.wakeAll();
00230 }
00231 
00232 void MThreadPool::DeletePoolThreads(void)
00233 {
00234     waitForDone();
00235 
00236     QMutexLocker locker(&m_priv->m_lock);
00237     QSet<MPoolThread*>::iterator it = m_priv->m_avail_threads.begin();
00238     for (; it != m_priv->m_avail_threads.end(); ++it)
00239     {
00240         m_priv->m_delete_threads.push_front(*it);
00241     }
00242     m_priv->m_avail_threads.clear();
00243 
00244     while (!m_priv->m_delete_threads.empty())
00245     {
00246         MPoolThread *thread = m_priv->m_delete_threads.back();
00247         locker.unlock();
00248 
00249         thread->wait();
00250 
00251         locker.relock();
00252         delete thread;
00253         if (m_priv->m_delete_threads.back() == thread)
00254             m_priv->m_delete_threads.pop_back();
00255         else
00256             m_priv->m_delete_threads.removeAll(thread);
00257     }
00258 }
00259 
00260 MThreadPool *MThreadPool::globalInstance(void)
00261 {
00262     QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00263     if (!MThreadPoolPrivate::s_pool)
00264         MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool");
00265     return MThreadPoolPrivate::s_pool;
00266 }
00267 
00268 void MThreadPool::StopAllPools(void)
00269 {
00270     QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00271     QList<MThreadPool*>::iterator it;
00272     for (it = MThreadPoolPrivate::s_all_pools.begin();
00273          it != MThreadPoolPrivate::s_all_pools.end(); ++it)
00274     {
00275         (*it)->Stop();
00276     }
00277 }
00278 
00279 void MThreadPool::ShutdownAllPools(void)
00280 {
00281     QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00282     QList<MThreadPool*>::iterator it;
00283     for (it = MThreadPoolPrivate::s_all_pools.begin();
00284          it != MThreadPoolPrivate::s_all_pools.end(); ++it)
00285     {
00286         (*it)->Stop();
00287     }
00288     for (it = MThreadPoolPrivate::s_all_pools.begin();
00289          it != MThreadPoolPrivate::s_all_pools.end(); ++it)
00290     {
00291         (*it)->DeletePoolThreads();
00292     }
00293 }
00294 
00295 void MThreadPool::start(QRunnable *runnable, QString debugName, int priority)
00296 {
00297     QMutexLocker locker(&m_priv->m_lock);
00298     if (TryStartInternal(runnable, debugName, false))
00299         return;
00300 
00301     MPoolQueues::iterator it = m_priv->m_run_queues.find(priority);
00302     if (it != m_priv->m_run_queues.end())
00303     {
00304         (*it).push_back(MPoolEntry(runnable,debugName));
00305     }
00306     else
00307     {
00308         MPoolQueue list;
00309         list.push_back(MPoolEntry(runnable,debugName));
00310         m_priv->m_run_queues[priority] = list;
00311     }
00312 }
00313 
00314 void MThreadPool::startReserved(
00315     QRunnable *runnable, QString debugName, int waitForAvailMS)
00316 {
00317     QMutexLocker locker(&m_priv->m_lock);
00318     if (waitForAvailMS > 0 && m_priv->m_avail_threads.empty() &&
00319         m_priv->m_running_threads.size() >= m_priv->m_max_thread_count)
00320     {
00321         MythTimer t;
00322         t.start();
00323         int left = waitForAvailMS - t.elapsed();
00324         while (left > 0 && m_priv->m_avail_threads.empty() &&
00325                m_priv->m_running_threads.size() >= m_priv->m_max_thread_count)
00326         {
00327             m_priv->m_wait.wait(locker.mutex(), left);
00328             left = waitForAvailMS - t.elapsed();
00329         }
00330     }
00331     TryStartInternal(runnable, debugName, true);
00332 }
00333 
00334 
00335 bool MThreadPool::tryStart(QRunnable *runnable, QString debugName)
00336 {
00337     QMutexLocker locker(&m_priv->m_lock);
00338     return TryStartInternal(runnable, debugName, false);
00339 }
00340 
00341 bool MThreadPool::TryStartInternal(
00342     QRunnable *runnable, QString debugName, bool reserved)
00343 {
00344     if (!m_priv->m_running)
00345         return false;
00346 
00347     while (!m_priv->m_delete_threads.empty())
00348     {
00349         m_priv->m_delete_threads.back()->wait();
00350         delete m_priv->m_delete_threads.back();
00351         m_priv->m_delete_threads.pop_back();
00352     }
00353 
00354     while (m_priv->m_avail_threads.begin() != m_priv->m_avail_threads.end())
00355     {
00356         MPoolThread *thread = *m_priv->m_avail_threads.begin();
00357         m_priv->m_avail_threads.erase(m_priv->m_avail_threads.begin());
00358         m_priv->m_running_threads.insert(thread);
00359         if (reserved)
00360             m_priv->m_reserve_thread++;
00361         if (thread->SetRunnable(runnable, debugName, reserved))
00362         {
00363             return true;
00364         }
00365         else
00366         {
00367             if (reserved)
00368                 m_priv->m_reserve_thread--;
00369             thread->Shutdown();
00370             m_priv->m_running_threads.remove(thread);
00371             m_priv->m_delete_threads.push_front(thread);
00372         }
00373     }
00374 
00375     if (reserved ||
00376         m_priv->m_running_threads.size() < m_priv->GetRealMaxThread())
00377     {
00378         if (reserved)
00379             m_priv->m_reserve_thread++;
00380         MPoolThread *thread = new MPoolThread(*this, m_priv->m_expiry_timeout);
00381         m_priv->m_running_threads.insert(thread);
00382         thread->SetRunnable(runnable, debugName, reserved);
00383         thread->start();
00384         return true;
00385     }
00386 
00387     return false;
00388 }
00389 
00390 void MThreadPool::NotifyAvailable(MPoolThread *thread)
00391 {
00392     QMutexLocker locker(&m_priv->m_lock);
00393 
00394     if (!m_priv->m_running)
00395     {
00396         m_priv->m_running_threads.remove(thread);
00397         thread->Shutdown();
00398         m_priv->m_delete_threads.push_front(thread);
00399         m_priv->m_wait.wakeAll();
00400         return;
00401     }
00402 
00403     MPoolQueues::iterator it = m_priv->m_run_queues.begin();
00404     if (it == m_priv->m_run_queues.end())
00405     {
00406         m_priv->m_running_threads.remove(thread);
00407         m_priv->m_avail_threads.insert(thread);
00408         m_priv->m_wait.wakeAll();
00409         return;
00410     }
00411 
00412     MPoolEntry e = (*it).front();
00413     if (!thread->SetRunnable(e.first, e.second, false))
00414     {
00415         m_priv->m_running_threads.remove(thread);
00416         m_priv->m_wait.wakeAll();
00417         if (!TryStartInternal(e.first, e.second, false))
00418         {
00419             thread->Shutdown();
00420             m_priv->m_delete_threads.push_front(thread);
00421             return;
00422         }
00423         thread->Shutdown();
00424         m_priv->m_delete_threads.push_front(thread);
00425     }
00426 
00427     (*it).pop_front();
00428     if ((*it).empty())
00429         m_priv->m_run_queues.erase(it);
00430 }
00431 
00432 void MThreadPool::NotifyDone(MPoolThread *thread)
00433 {
00434     QMutexLocker locker(&m_priv->m_lock);
00435     m_priv->m_running_threads.remove(thread);
00436     m_priv->m_avail_threads.remove(thread);
00437     if (!m_priv->m_delete_threads.contains(thread))
00438         m_priv->m_delete_threads.push_front(thread);
00439     m_priv->m_wait.wakeAll();
00440 }
00441 
00442 int MThreadPool::expiryTimeout(void) const
00443 {
00444     QMutexLocker locker(&m_priv->m_lock);
00445     return m_priv->m_expiry_timeout;
00446 }
00447 
00448 void MThreadPool::setExpiryTimeout(int expiryTimeout)
00449 {
00450     QMutexLocker locker(&m_priv->m_lock);
00451     m_priv->m_expiry_timeout = expiryTimeout;
00452 }
00453 
00454 int MThreadPool::maxThreadCount(void) const
00455 {
00456     QMutexLocker locker(&m_priv->m_lock);
00457     return m_priv->m_max_thread_count;
00458 }
00459 
00460 void MThreadPool::setMaxThreadCount(int maxThreadCount)
00461 {
00462     QMutexLocker locker(&m_priv->m_lock);
00463     m_priv->m_max_thread_count = maxThreadCount;
00464 }
00465 
00466 int MThreadPool::activeThreadCount(void) const
00467 {
00468     QMutexLocker locker(&m_priv->m_lock);
00469     return m_priv->m_avail_threads.size() + m_priv->m_running_threads.size();
00470 }
00471 
00472 /*
00473 void MThreadPool::reserveThread(void)
00474 {
00475     QMutexLocker locker(&m_priv->m_lock);
00476     m_priv->m_reserve_thread++;
00477 }
00478 
00479 void MThreadPool::releaseThread(void)
00480 {
00481     QMutexLocker locker(&m_priv->m_lock);
00482     if (m_priv->m_reserve_thread > 0)
00483         m_priv->m_reserve_thread--;
00484 }
00485 */
00486 
00487 void MThreadPool::ReleaseThread(void)
00488 {
00489     QMutexLocker locker(&m_priv->m_lock);
00490     if (m_priv->m_reserve_thread > 0)
00491         m_priv->m_reserve_thread--;
00492 }
00493 
00494 #if 0
00495 static void print_set(QString title, QSet<MPoolThread*> set)
00496 {
00497     LOG(VB_GENERAL, LOG_INFO, title);
00498     QSet<MPoolThread*>::iterator it = set.begin();
00499     for (; it != set.end(); ++it)
00500     {
00501         LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1")
00502             .arg((quint64)(*it),0,16));
00503     }
00504     LOG(VB_GENERAL, LOG_INFO, "");
00505 }
00506 #endif
00507 
00508 void MThreadPool::waitForDone(void)
00509 {
00510     QMutexLocker locker(&m_priv->m_lock);
00511     while (true)
00512     {
00513         while (!m_priv->m_delete_threads.empty())
00514         {
00515             m_priv->m_delete_threads.back()->wait();
00516             delete m_priv->m_delete_threads.back();
00517             m_priv->m_delete_threads.pop_back();
00518         }
00519 
00520         if (m_priv->m_running && !m_priv->m_run_queues.empty())
00521         {
00522             m_priv->m_wait.wait(locker.mutex());
00523             continue;
00524         }
00525 
00526         QSet<MPoolThread*> working = m_priv->m_running_threads;
00527         working = working.subtract(m_priv->m_avail_threads);
00528         if (working.empty())
00529             break;
00530         m_priv->m_wait.wait(locker.mutex());
00531     }
00532 }
00533 
00534 /* vim: set expandtab tabstop=4 shiftwidth=4: */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends