MythTV  0.26-pre
jobqueue.cpp
Go to the documentation of this file.
00001 
00002 #include <unistd.h>
00003 #include <sys/types.h>
00004 #include <sys/stat.h>
00005 #include <iostream>
00006 #include <cstdlib>
00007 #include <fcntl.h>
00008 #include <pthread.h>
00009 using namespace std;
00010 
00011 #include <QDateTime>
00012 #include <QFileInfo>
00013 #include <QRegExp>
00014 #include <QEvent>
00015 #include <QCoreApplication>
00016 
00017 #include "mythconfig.h"
00018 
00019 #include "exitcodes.h"
00020 #include "jobqueue.h"
00021 #include "programinfo.h"
00022 #include "mythcorecontext.h"
00023 #include "mythmiscutil.h"
00024 #include "previewgenerator.h"
00025 #include "compat.h"
00026 #include "recordingprofile.h"
00027 #include "recordinginfo.h"
00028 #include "mthread.h"
00029 
00030 #include "mythdb.h"
00031 #include "mythdirs.h"
00032 #include "mythlogging.h"
00033 
00034 #ifndef O_STREAMING
00035 #define O_STREAMING 0
00036 #endif
00037 
00038 #ifndef O_LARGEFILE
00039 #define O_LARGEFILE 0
00040 #endif
00041 
00042 #define LOC     QString("JobQueue: ")
00043 
00044 JobQueue::JobQueue(bool master) :
00045     m_hostname(gCoreContext->GetHostName()),
00046     jobsRunning(0),
00047     jobQueueCPU(0),
00048     m_pginfo(NULL),
00049     runningJobsLock(new QMutex(QMutex::Recursive)),
00050     isMaster(master),
00051     queueThread(new MThread("JobQueue", this)),
00052     processQueue(false)
00053 {
00054     jobQueueCPU = gCoreContext->GetNumSetting("JobQueueCPU", 0);
00055 
00056 #ifndef USING_VALGRIND
00057     QMutexLocker locker(&queueThreadCondLock);
00058     processQueue = true;
00059     queueThread->start();
00060 #else
00061     LOG(VB_GENERAL, LOG_ERR, LOC +
00062         "The JobQueue has been disabled because "
00063         "you compiled with the --enable-valgrind option.");
00064 #endif // USING_VALGRIND
00065 
00066     gCoreContext->addListener(this);
00067 }
00068 
00069 JobQueue::~JobQueue(void)
00070 {
00071     queueThreadCondLock.lock();
00072     processQueue = false;
00073     queueThreadCond.wakeAll();
00074     queueThreadCondLock.unlock();
00075 
00076     queueThread->wait();
00077     delete queueThread;
00078     queueThread = NULL;
00079 
00080     gCoreContext->removeListener(this);
00081 
00082     delete runningJobsLock;
00083 }
00084 
00085 void JobQueue::customEvent(QEvent *e)
00086 {
00087     if ((MythEvent::Type)(e->type()) == MythEvent::MythEventMessage)
00088     {
00089         MythEvent *me = (MythEvent *)e;
00090         QString message = me->Message();
00091 
00092         if (message.left(9) == "LOCAL_JOB")
00093         {
00094             // LOCAL_JOB action ID jobID
00095             // LOCAL_JOB action type chanid recstartts hostname
00096             QString msg;
00097             message = message.simplified();
00098             QStringList tokens = message.split(" ", QString::SkipEmptyParts);
00099             QString action = tokens[1];
00100             int jobID = -1;
00101 
00102             if (tokens[2] == "ID")
00103                 jobID = tokens[3].toInt();
00104             else
00105             {
00106                 jobID = GetJobID(
00107                     tokens[2].toInt(),
00108                     tokens[3].toUInt(),
00109                     QDateTime::fromString(tokens[4], Qt::ISODate));
00110             }
00111 
00112             runningJobsLock->lock();
00113             if (!runningJobs.contains(jobID))
00114             {
00115                 msg = QString("Unable to determine jobID for message: "
00116                               "%1.  Program will not be flagged.")
00117                               .arg(message);
00118                 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
00119                 runningJobsLock->unlock();
00120                 return;
00121             }
00122             runningJobsLock->unlock();
00123 
00124             msg = QString("Received message '%1'").arg(message);
00125             LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
00126 
00127             if ((action == "STOP") ||
00128                 (action == "PAUSE") ||
00129                 (action == "RESTART") ||
00130                 (action == "RESUME" ))
00131             {
00132                 runningJobsLock->lock();
00133 
00134                 if (action == "STOP")
00135                     runningJobs[jobID].flag = JOB_STOP;
00136                 else if (action == "PAUSE")
00137                     runningJobs[jobID].flag = JOB_PAUSE;
00138                 else if (action == "RESUME")
00139                     runningJobs[jobID].flag = JOB_RUN;
00140                 else if (action == "RESTART")
00141                     runningJobs[jobID].flag = JOB_RESTART;
00142 
00143                 runningJobsLock->unlock();
00144             }
00145         }
00146     }
00147 }
00148 
00149 void JobQueue::run(void)
00150 {
00151     queueThreadCondLock.lock();
00152     queueThreadCond.wakeAll();
00153     queueThreadCondLock.unlock();
00154 
00155     RecoverQueue();
00156 
00157     queueThreadCondLock.lock();
00158     queueThreadCond.wait(&queueThreadCondLock, 10 * 1000);
00159     queueThreadCondLock.unlock();
00160 
00161     ProcessQueue();
00162 }
00163 
00164 void JobQueue::ProcessQueue(void)
00165 {
00166     LOG(VB_JOBQUEUE, LOG_INFO, LOC + "ProcessQueue() started");
00167 
00168     QString logInfo;
00169     int jobID;
00170     int cmds;
00171     int flags;
00172     int status;
00173     QString hostname;
00174     int sleepTime;
00175 
00176     QMap<int, int> jobStatus;
00177     int maxJobs;
00178     QString message;
00179     QMap<int, JobQueueEntry> jobs;
00180     bool atMax = false;
00181     bool inTimeWindow = true;
00182     bool startedJobAlready = false;
00183     QMap<int, RunningJobInfo>::Iterator rjiter;
00184 
00185     QMutexLocker locker(&queueThreadCondLock);
00186     while (processQueue)
00187     {
00188         locker.unlock();
00189 
00190         startedJobAlready = false;
00191         sleepTime = gCoreContext->GetNumSetting("JobQueueCheckFrequency", 30);
00192         maxJobs = gCoreContext->GetNumSetting("JobQueueMaxSimultaneousJobs", 3);
00193         LOG(VB_JOBQUEUE, LOG_INFO, LOC +
00194             QString("Currently set to run up to %1 job(s) max.")
00195                         .arg(maxJobs));
00196 
00197         jobStatus.clear();
00198 
00199         runningJobsLock->lock();
00200         for (rjiter = runningJobs.begin(); rjiter != runningJobs.end();
00201             ++rjiter)
00202         {
00203             if ((*rjiter).pginfo)
00204                 (*rjiter).pginfo->UpdateInUseMark();
00205         }
00206         runningJobsLock->unlock();
00207 
00208         jobsRunning = 0;
00209         GetJobsInQueue(jobs);
00210 
00211         if (jobs.size())
00212         {
00213             inTimeWindow = InJobRunWindow();
00214             for (int x = 0; x < jobs.size(); x++)
00215             {
00216                 status = jobs[x].status;
00217                 hostname = jobs[x].hostname;
00218 
00219                 if (((status == JOB_RUNNING) ||
00220                      (status == JOB_STARTING) ||
00221                      (status == JOB_PAUSED)) &&
00222                     (hostname == m_hostname))
00223                 jobsRunning++;
00224             }
00225 
00226             message = QString("Currently Running %1 jobs.")
00227                               .arg(jobsRunning);
00228             if (!inTimeWindow)
00229             {
00230                 message += QString("  Jobs in Queue, but we are outside of the "
00231                                    "Job Queue time window, no new jobs can be "
00232                                    "started.");
00233                 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00234             }
00235             else if (jobsRunning >= maxJobs)
00236             {
00237                 message += " (At Maximum, no new jobs can be started until "
00238                            "a running job completes)";
00239 
00240                 if (!atMax)
00241                     LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00242 
00243                 atMax = true;
00244             }
00245             else
00246             {
00247                 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00248                 atMax = false;
00249             }
00250 
00251 
00252             for ( int x = 0;
00253                  (x < jobs.size()) && (jobsRunning < maxJobs); x++)
00254             {
00255                 jobID = jobs[x].id;
00256                 cmds = jobs[x].cmds;
00257                 flags = jobs[x].flags;
00258                 status = jobs[x].status;
00259                 hostname = jobs[x].hostname;
00260 
00261                 if (!jobs[x].chanid)
00262                     logInfo = QString("jobID #%1").arg(jobID);
00263                 else
00264                     logInfo = QString("chanid %1 @ %2").arg(jobs[x].chanid)
00265                                       .arg(jobs[x].startts);
00266 
00267                 // Should we even be looking at this job?
00268                 if ((inTimeWindow) &&
00269                     (!hostname.isEmpty()) &&
00270                     (hostname != m_hostname))
00271                 {
00272                     // Setting the status here will prevent us from processing
00273                     // any other jobs for this recording until this one is
00274                     // completed on the remote host.
00275                     jobStatus[jobID] = status;
00276 
00277                     message = QString("Skipping '%1' job for %2, "
00278                                       "should run on '%3' instead")
00279                                       .arg(JobText(jobs[x].type)).arg(logInfo)
00280                                       .arg(hostname);
00281                     LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00282                     continue;
00283                 }
00284 
00285                 // Check to see if there was a previous job that is not done
00286                 if (inTimeWindow)
00287                 {
00288                     int otherJobID = GetRunningJobID(jobs[x].chanid,
00289                                                      jobs[x].recstartts);
00290                     if (otherJobID && (jobStatus.contains(otherJobID)) &&
00291                         (!(jobStatus[otherJobID] & JOB_DONE)))
00292                     {
00293                         message =
00294                             QString("Skipping '%1' job for %2, "
00295                                     "Job ID %3 is already running for "
00296                                     "this recording with a status of '%4'")
00297                                     .arg(JobText(jobs[x].type)).arg(logInfo)
00298                                     .arg(otherJobID)
00299                                     .arg(StatusText(jobStatus[otherJobID]));
00300                         LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00301                         continue;
00302                     }
00303                 }
00304 
00305                 jobStatus[jobID] = status;
00306 
00307                 // Are we allowed to run this job?
00308                 if ((inTimeWindow) && (!AllowedToRun(jobs[x])))
00309                 {
00310                     message = QString("Skipping '%1' job for %2, "
00311                                       "not allowed to run on this backend.")
00312                                       .arg(JobText(jobs[x].type)).arg(logInfo);
00313                     LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00314                     continue;
00315                 }
00316 
00317                 // Is this job scheduled for the future
00318                 if (jobs[x].schedruntime > QDateTime::currentDateTime())
00319                 {
00320                     message = QString("Skipping '%1' job for %2, this job is "
00321                                       "not scheduled to run until %3.")
00322                                       .arg(JobText(jobs[x].type)).arg(logInfo)
00323                                       .arg(jobs[x].schedruntime
00324                                            .toString(Qt::ISODate));
00325                     LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00326                     continue;
00327                 }
00328 
00329                 if (cmds & JOB_STOP)
00330                 {
00331                     // if we're trying to stop a job and it's not queued
00332                     //  then lets send a STOP command
00333                     if (status != JOB_QUEUED) {
00334                         message = QString("Stopping '%1' job for %2")
00335                                           .arg(JobText(jobs[x].type))
00336                                           .arg(logInfo);
00337                         LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00338 
00339                         runningJobsLock->lock();
00340                         if (runningJobs.contains(jobID))
00341                             runningJobs[jobID].flag = JOB_STOP;
00342                         runningJobsLock->unlock();
00343 
00344                         // ChangeJobCmds(m_db, jobID, JOB_RUN);
00345                         continue;
00346 
00347                     // if we're trying to stop a job and it's still queued
00348                     //  then let's just change the status to cancelled so
00349                     //  we don't try to run it from the queue
00350                     } else {
00351                         message = QString("Cancelling '%1' job for %2")
00352                                           .arg(JobText(jobs[x].type))
00353                                           .arg(logInfo);
00354                         LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00355 
00356                         // at the bottom of this loop we requeue any jobs that
00357                         //  are not currently queued and also not associated
00358                         //  with a hostname so we must claim this job before we
00359                         //  can cancel it
00360                         if (!ChangeJobHost(jobID, m_hostname))
00361                         {
00362                             message = QString("Unable to claim '%1' job for %2")
00363                                               .arg(JobText(jobs[x].type))
00364                                               .arg(logInfo);
00365                             LOG(VB_JOBQUEUE, LOG_ERR, LOC + message);
00366                             continue;
00367                         }
00368 
00369                         ChangeJobStatus(jobID, JOB_CANCELLED, "");
00370                         ChangeJobCmds(jobID, JOB_RUN);
00371                         continue;
00372                     }
00373                 }
00374 
00375                 if ((cmds & JOB_PAUSE) && (status != JOB_QUEUED))
00376                 {
00377                     message = QString("Pausing '%1' job for %2")
00378                                       .arg(JobText(jobs[x].type)).arg(logInfo);
00379                     LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00380 
00381                     runningJobsLock->lock();
00382                     if (runningJobs.contains(jobID))
00383                         runningJobs[jobID].flag = JOB_PAUSE;
00384                     runningJobsLock->unlock();
00385 
00386                     ChangeJobCmds(jobID, JOB_RUN);
00387                     continue;
00388                 }
00389 
00390                 if ((cmds & JOB_RESTART) && (status != JOB_QUEUED))
00391                 {
00392                     message = QString("Restart '%1' job for %2")
00393                                       .arg(JobText(jobs[x].type)).arg(logInfo);
00394                     LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00395 
00396                     runningJobsLock->lock();
00397                     if (runningJobs.contains(jobID))
00398                         runningJobs[jobID].flag = JOB_RUN;
00399                     runningJobsLock->unlock();
00400 
00401                     ChangeJobCmds(jobID, JOB_RUN);
00402                     continue;
00403                 }
00404 
00405                 if (status != JOB_QUEUED)
00406                 {
00407 
00408                     if (hostname.isEmpty())
00409                     {
00410                         message = QString("Resetting '%1' job for %2 to %3 "
00411                                           "status, because no hostname is set.")
00412                                           .arg(JobText(jobs[x].type))
00413                                           .arg(logInfo)
00414                                           .arg(StatusText(JOB_QUEUED));
00415                         LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00416 
00417                         ChangeJobStatus(jobID, JOB_QUEUED, "");
00418                         ChangeJobCmds(jobID, JOB_RUN);
00419                     }
00420                     else if (inTimeWindow)
00421                     {
00422                         message = QString("Skipping '%1' job for %2, "
00423                                           "current job status is '%3'")
00424                                           .arg(JobText(jobs[x].type))
00425                                           .arg(logInfo)
00426                                           .arg(StatusText(status));
00427                         LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00428                     }
00429                     continue;
00430                 }
00431 
00432                 // never start or claim more than one job in a single run
00433                 if (startedJobAlready)
00434                     continue;
00435 
00436                 if ((inTimeWindow) &&
00437                     (hostname.isEmpty()) &&
00438                     (!ChangeJobHost(jobID, m_hostname)))
00439                 {
00440                     message = QString("Unable to claim '%1' job for %2")
00441                                       .arg(JobText(jobs[x].type)).arg(logInfo);
00442                     LOG(VB_JOBQUEUE, LOG_ERR, LOC + message);
00443                     continue;
00444                 }
00445 
00446                 if (!inTimeWindow)
00447                 {
00448                     message = QString("Skipping '%1' job for %2, "
00449                                       "current time is outside of the "
00450                                       "Job Queue processing window.")
00451                                       .arg(JobText(jobs[x].type)).arg(logInfo);
00452                     LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00453                     continue;
00454                 }
00455 
00456                 message = QString("Processing '%1' job for %2, "
00457                                   "current status is '%3'")
00458                                   .arg(JobText(jobs[x].type)).arg(logInfo)
00459                                   .arg(StatusText(status));
00460                 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00461 
00462                 ProcessJob(jobs[x]);
00463 
00464                 startedJobAlready = true;
00465             }
00466         }
00467 
00468         if (QCoreApplication::applicationName() == MYTH_APPNAME_MYTHJOBQUEUE)
00469         {
00470             if (jobsRunning > 0)
00471             {
00472                 if (!(gCoreContext->IsBlockingClient()))
00473                 {
00474                     gCoreContext->BlockShutdown();
00475                     LOG(VB_JOBQUEUE, LOG_INFO, QString("%1 jobs running. "
00476                         "Blocking shutdown.").arg(jobsRunning));
00477                 }
00478             }
00479             else
00480             {
00481                 if (gCoreContext->IsBlockingClient())
00482                 {
00483                     gCoreContext->AllowShutdown();
00484                     LOG(VB_JOBQUEUE, LOG_INFO, "No jobs running. "
00485                                                "Allowing shutdown.");
00486                 }
00487             }
00488         }
00489 
00490 
00491         locker.relock();
00492         if (processQueue)
00493         {
00494             int st = (startedJobAlready) ? (5 * 1000) : (sleepTime * 1000);
00495             if (st > 0)
00496                 queueThreadCond.wait(locker.mutex(), st);
00497         }
00498     }
00499 }
00500 
00501 bool JobQueue::QueueRecordingJobs(const RecordingInfo &recinfo, int jobTypes)
00502 {
00503     if (jobTypes == JOB_NONE)
00504         jobTypes = recinfo.GetAutoRunJobs();
00505 
00506     if (recinfo.IsCommercialFree())
00507         jobTypes &= (~JOB_COMMFLAG);
00508 
00509     if (jobTypes != JOB_NONE)
00510     {
00511         QString jobHost = QString("");
00512 
00513         if (gCoreContext->GetNumSetting("JobsRunOnRecordHost", 0))
00514             jobHost = recinfo.GetHostname();
00515 
00516         return JobQueue::QueueJobs(
00517             jobTypes, recinfo.GetChanID(), recinfo.GetRecordingStartTime(),
00518             "", "", jobHost);
00519     }
00520     else
00521         return false;
00522 
00523     return true;
00524 }
00525 
00526 bool JobQueue::QueueJob(int jobType, uint chanid, const QDateTime &recstartts,
00527                         QString args, QString comment, QString host,
00528                         int flags, int status, QDateTime schedruntime)
00529 {
00530     int tmpStatus = JOB_UNKNOWN;
00531     int tmpCmd = JOB_UNKNOWN;
00532     int jobID = -1;
00533     int chanidInt = -1;
00534 
00535     if(!schedruntime.isValid())
00536         schedruntime = QDateTime::currentDateTime();
00537 
00538     MSqlQuery query(MSqlQuery::InitCon());
00539 
00540     // In order to replace a job, we must have a chanid/recstartts combo
00541     if (chanid)
00542     {
00543         query.prepare("SELECT status, id, cmds FROM jobqueue "
00544                       "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00545                       "AND type = :JOBTYPE;");
00546         query.bindValue(":CHANID", chanid);
00547         query.bindValue(":STARTTIME", recstartts);
00548         query.bindValue(":JOBTYPE", jobType);
00549 
00550         if (!query.exec())
00551         {
00552             MythDB::DBError("Error in JobQueue::QueueJob()", query);
00553             return false;
00554         }
00555         else
00556         {
00557             if (query.next())
00558             {
00559                 tmpStatus = query.value(0).toInt();
00560                 jobID = query.value(1).toInt();
00561                 tmpCmd = query.value(2).toInt();
00562             }
00563         }
00564         switch (tmpStatus)
00565         {
00566             case JOB_UNKNOWN:
00567                      break;
00568             case JOB_STARTING:
00569             case JOB_RUNNING:
00570             case JOB_PAUSED:
00571             case JOB_STOPPING:
00572             case JOB_ERRORING:
00573             case JOB_ABORTING:
00574                      return false;
00575             default:
00576                      DeleteJob(jobID);
00577                      break;
00578         }
00579         if (! (tmpStatus & JOB_DONE) && (tmpCmd & JOB_STOP))
00580             return false;
00581 
00582         chanidInt = chanid;
00583     }
00584 
00585     if (host.isNull())
00586         host = QString("");
00587 
00588     query.prepare("INSERT INTO jobqueue (chanid, starttime, inserttime, type, "
00589                   "status, statustime, schedruntime, hostname, args, comment, "
00590                   "flags) "
00591                   "VALUES (:CHANID, :STARTTIME, now(), :JOBTYPE, :STATUS, "
00592                   "now(), :SCHEDRUNTIME, :HOST, :ARGS, :COMMENT, :FLAGS);");
00593 
00594     query.bindValue(":CHANID", chanidInt);
00595     query.bindValue(":STARTTIME", recstartts);
00596     query.bindValue(":JOBTYPE", jobType);
00597     query.bindValue(":STATUS", status);
00598     query.bindValue(":SCHEDRUNTIME", schedruntime);
00599     query.bindValue(":HOST", host);
00600     query.bindValue(":ARGS", args);
00601     query.bindValue(":COMMENT", comment);
00602     query.bindValue(":FLAGS", flags);
00603 
00604     if (!query.exec())
00605     {
00606         MythDB::DBError("Error in JobQueue::StartJob()", query);
00607         return false;
00608     }
00609 
00610     return true;
00611 }
00612 
00613 bool JobQueue::QueueJobs(int jobTypes, uint chanid, const QDateTime &recstartts,
00614                          QString args, QString comment, QString host)
00615 {
00616     if (gCoreContext->GetNumSetting("AutoTranscodeBeforeAutoCommflag", 0))
00617     {
00618         if (jobTypes & JOB_METADATA)
00619             QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host);
00620         if (jobTypes & JOB_TRANSCODE)
00621             QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host);
00622         if (jobTypes & JOB_COMMFLAG)
00623             QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host);
00624     }
00625     else
00626     {
00627         if (jobTypes & JOB_METADATA)
00628             QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host);
00629         if (jobTypes & JOB_COMMFLAG)
00630             QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host);
00631         if (jobTypes & JOB_TRANSCODE)
00632         {
00633             QDateTime schedruntime = QDateTime::currentDateTime();
00634 
00635             int defer = gCoreContext->GetNumSetting("DeferAutoTranscodeDays", 0);
00636             if (defer)
00637             {
00638                 schedruntime = schedruntime.addDays(defer);
00639                 schedruntime.setTime(QTime(0,0));
00640             }
00641 
00642             QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host,
00643               0, JOB_QUEUED, schedruntime);
00644         }
00645     }
00646 
00647     if (jobTypes & JOB_USERJOB1)
00648         QueueJob(JOB_USERJOB1, chanid, recstartts, args, comment, host);
00649     if (jobTypes & JOB_USERJOB2)
00650         QueueJob(JOB_USERJOB2, chanid, recstartts, args, comment, host);
00651     if (jobTypes & JOB_USERJOB3)
00652         QueueJob(JOB_USERJOB3, chanid, recstartts, args, comment, host);
00653     if (jobTypes & JOB_USERJOB4)
00654         QueueJob(JOB_USERJOB4, chanid, recstartts, args, comment, host);
00655 
00656     return true;
00657 }
00658 
00659 int JobQueue::GetJobID(int jobType, uint chanid, const QDateTime &recstartts)
00660 {
00661     MSqlQuery query(MSqlQuery::InitCon());
00662 
00663     query.prepare("SELECT id FROM jobqueue "
00664                   "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00665                   "AND type = :JOBTYPE;");
00666     query.bindValue(":CHANID", chanid);
00667     query.bindValue(":STARTTIME", recstartts);
00668     query.bindValue(":JOBTYPE", jobType);
00669 
00670     if (!query.exec())
00671     {
00672         MythDB::DBError("Error in JobQueue::GetJobID()", query);
00673         return -1;
00674     }
00675     else
00676     {
00677         if (query.next())
00678             return query.value(0).toInt();
00679     }
00680 
00681     return -1;
00682 }
00683 
00684 bool JobQueue::GetJobInfoFromID(
00685     int jobID, int &jobType, uint &chanid, QDateTime &recstartts)
00686 {
00687     MSqlQuery query(MSqlQuery::InitCon());
00688 
00689     query.prepare("SELECT type, chanid, starttime FROM jobqueue "
00690                   "WHERE id = :ID;");
00691 
00692     query.bindValue(":ID", jobID);
00693 
00694     if (!query.exec())
00695     {
00696         MythDB::DBError("Error in JobQueue::GetJobInfoFromID()", query);
00697         return false;
00698     }
00699     else
00700     {
00701         if (query.next())
00702         {
00703             jobType    = query.value(0).toInt();
00704             chanid     = query.value(1).toUInt();
00705             recstartts = query.value(2).toDateTime();
00706             return true;
00707         }
00708     }
00709 
00710     return false;
00711 }
00712 
00713 bool JobQueue::GetJobInfoFromID(
00714     int jobID, int &jobType, uint &chanid, QString &recstartts)
00715 {
00716     QDateTime tmpStarttime;
00717 
00718     bool result = JobQueue::GetJobInfoFromID(
00719         jobID, jobType, chanid, tmpStarttime);
00720 
00721     if (result)
00722         recstartts = tmpStarttime.toString("yyyyMMddhhmmss");
00723 
00724     return result;
00725 }
00726 
00727 bool JobQueue::PauseJob(int jobID)
00728 {
00729     QString message = QString("GLOBAL_JOB PAUSE ID %1").arg(jobID);
00730     MythEvent me(message);
00731     gCoreContext->dispatch(me);
00732 
00733     return ChangeJobCmds(jobID, JOB_PAUSE);
00734 }
00735 
00736 bool JobQueue::ResumeJob(int jobID)
00737 {
00738     QString message = QString("GLOBAL_JOB RESUME ID %1").arg(jobID);
00739     MythEvent me(message);
00740     gCoreContext->dispatch(me);
00741 
00742     return ChangeJobCmds(jobID, JOB_RESUME);
00743 }
00744 
00745 bool JobQueue::RestartJob(int jobID)
00746 {
00747     QString message = QString("GLOBAL_JOB RESTART ID %1").arg(jobID);
00748     MythEvent me(message);
00749     gCoreContext->dispatch(me);
00750 
00751     return ChangeJobCmds(jobID, JOB_RESTART);
00752 }
00753 
00754 bool JobQueue::StopJob(int jobID)
00755 {
00756     QString message = QString("GLOBAL_JOB STOP ID %1").arg(jobID);
00757     MythEvent me(message);
00758     gCoreContext->dispatch(me);
00759 
00760     return ChangeJobCmds(jobID, JOB_STOP);
00761 }
00762 
00763 bool JobQueue::DeleteAllJobs(uint chanid, const QDateTime &recstartts)
00764 {
00765     MSqlQuery query(MSqlQuery::InitCon());
00766     QString message;
00767 
00768     query.prepare("UPDATE jobqueue SET status = :CANCELLED "
00769                   "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00770                   "AND status = :QUEUED;");
00771 
00772     query.bindValue(":CANCELLED", JOB_CANCELLED);
00773     query.bindValue(":CHANID", chanid);
00774     query.bindValue(":STARTTIME", recstartts);
00775     query.bindValue(":QUEUED", JOB_QUEUED);
00776 
00777     if (!query.exec())
00778         MythDB::DBError("Cancel Pending Jobs", query);
00779 
00780     query.prepare("UPDATE jobqueue SET cmds = :CMD "
00781                   "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00782                   "AND status <> :CANCELLED;");
00783     query.bindValue(":CMD", JOB_STOP);
00784     query.bindValue(":CHANID", chanid);
00785     query.bindValue(":STARTTIME", recstartts);
00786     query.bindValue(":CANCELLED", JOB_CANCELLED);
00787 
00788     if (!query.exec())
00789     {
00790         MythDB::DBError("Stop Unfinished Jobs", query);
00791         return false;
00792     }
00793 
00794     // wait until running job(s) are done
00795     bool jobsAreRunning = true;
00796     int totalSlept = 0;
00797     int maxSleep = 90;
00798     while (jobsAreRunning && totalSlept < maxSleep)
00799     {
00800         usleep(1000);
00801         query.prepare("SELECT id FROM jobqueue "
00802                       "WHERE chanid = :CHANID and starttime = :STARTTIME "
00803                       "AND status NOT IN "
00804                       "(:FINISHED,:ABORTED,:ERRORED,:CANCELLED);");
00805         query.bindValue(":CHANID", chanid);
00806         query.bindValue(":STARTTIME", recstartts);
00807         query.bindValue(":FINISHED", JOB_FINISHED);
00808         query.bindValue(":ABORTED", JOB_ABORTED);
00809         query.bindValue(":ERRORED", JOB_ERRORED);
00810         query.bindValue(":CANCELLED", JOB_CANCELLED);
00811 
00812         if (!query.exec())
00813         {
00814             MythDB::DBError("Stop Unfinished Jobs", query);
00815             return false;
00816         }
00817 
00818         if (query.size() == 0)
00819         {
00820             jobsAreRunning = false;
00821             break;
00822         }
00823         else if ((totalSlept % 5) == 0)
00824         {
00825             message = QString("Waiting on %1 jobs still running for "
00826                               "chanid %2 @ %3").arg(query.size())
00827                               .arg(chanid).arg(recstartts.toString());
00828             LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00829         }
00830 
00831         sleep(1);
00832         totalSlept++;
00833     }
00834 
00835     if (totalSlept <= maxSleep)
00836     {
00837         query.prepare("DELETE FROM jobqueue "
00838                       "WHERE chanid = :CHANID AND starttime = :STARTTIME;");
00839         query.bindValue(":CHANID", chanid);
00840         query.bindValue(":STARTTIME", recstartts);
00841 
00842         if (!query.exec())
00843             MythDB::DBError("Delete All Jobs", query);
00844     }
00845     else
00846     {
00847         query.prepare("SELECT id, type, status, comment FROM jobqueue "
00848                       "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00849                       "AND status <> :CANCELLED ORDER BY id;");
00850 
00851         query.bindValue(":CHANID", chanid);
00852         query.bindValue(":STARTTIME", recstartts);
00853         query.bindValue(":CANCELLED", JOB_CANCELLED);
00854 
00855         if (!query.exec())
00856         {
00857             MythDB::DBError("Error in JobQueue::DeleteAllJobs(), Unable "
00858                             "to query list of Jobs left in Queue.", query);
00859             return 0;
00860         }
00861 
00862         LOG(VB_GENERAL, LOG_ERR, LOC +
00863             QString( "In DeleteAllJobs: There are Jobs "
00864                      "left in the JobQueue that are still running for "
00865                      "chanid %1 @ %2.").arg(chanid)
00866                 .arg(recstartts.toString()));
00867 
00868         while (query.next())
00869         {
00870             LOG(VB_GENERAL, LOG_ERR, LOC +
00871                 QString("Job ID %1: '%2' with status '%3' and comment '%4'")
00872                             .arg(query.value(0).toInt())
00873                             .arg(JobText(query.value(1).toInt()))
00874                             .arg(StatusText(query.value(2).toInt()))
00875                             .arg(query.value(3).toString()));
00876         }
00877 
00878         return false;
00879     }
00880 
00881     return true;
00882 }
00883 
00884 bool JobQueue::DeleteJob(int jobID)
00885 {
00886     if (jobID < 0)
00887         return false;
00888 
00889     MSqlQuery query(MSqlQuery::InitCon());
00890 
00891     query.prepare("DELETE FROM jobqueue WHERE id = :ID;");
00892 
00893     query.bindValue(":ID", jobID);
00894 
00895     if (!query.exec())
00896     {
00897         MythDB::DBError("Error in JobQueue::DeleteJob()", query);
00898         return false;
00899     }
00900 
00901     return true;
00902 }
00903 
00904 bool JobQueue::ChangeJobCmds(int jobID, int newCmds)
00905 {
00906     if (jobID < 0)
00907         return false;
00908 
00909     MSqlQuery query(MSqlQuery::InitCon());
00910 
00911     query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE id = :ID;");
00912 
00913     query.bindValue(":CMDS", newCmds);
00914     query.bindValue(":ID", jobID);
00915 
00916     if (!query.exec())
00917     {
00918         MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query);
00919         return false;
00920     }
00921 
00922     return true;
00923 }
00924 
00925 bool JobQueue::ChangeJobCmds(int jobType, uint chanid,
00926                              const QDateTime &recstartts,  int newCmds)
00927 {
00928     MSqlQuery query(MSqlQuery::InitCon());
00929 
00930     query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE type = :TYPE "
00931                   "AND chanid = :CHANID AND starttime = :STARTTIME;");
00932 
00933     query.bindValue(":CMDS", newCmds);
00934     query.bindValue(":TYPE", jobType);
00935     query.bindValue(":CHANID", chanid);
00936     query.bindValue(":STARTTIME", recstartts);
00937 
00938     if (!query.exec())
00939     {
00940         MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query);
00941         return false;
00942     }
00943 
00944     return true;
00945 }
00946 
00947 bool JobQueue::ChangeJobFlags(int jobID, int newFlags)
00948 {
00949     if (jobID < 0)
00950         return false;
00951 
00952     MSqlQuery query(MSqlQuery::InitCon());
00953 
00954     query.prepare("UPDATE jobqueue SET flags = :FLAGS WHERE id = :ID;");
00955 
00956     query.bindValue(":FLAGS", newFlags);
00957     query.bindValue(":ID", jobID);
00958 
00959     if (!query.exec())
00960     {
00961         MythDB::DBError("Error in JobQueue::ChangeJobFlags()", query);
00962         return false;
00963     }
00964 
00965     return true;
00966 }
00967 
00968 bool JobQueue::ChangeJobStatus(int jobID, int newStatus, QString comment)
00969 {
00970     if (jobID < 0)
00971         return false;
00972 
00973     LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobStatus(%1, %2, '%3')")
00974             .arg(jobID).arg(StatusText(newStatus)).arg(comment));
00975 
00976     MSqlQuery query(MSqlQuery::InitCon());
00977 
00978     query.prepare("UPDATE jobqueue SET status = :STATUS, comment = :COMMENT "
00979                   "WHERE id = :ID AND status <> :NEWSTATUS;");
00980 
00981     query.bindValue(":STATUS", newStatus);
00982     query.bindValue(":COMMENT", comment);
00983     query.bindValue(":ID", jobID);
00984     query.bindValue(":NEWSTATUS", newStatus);
00985 
00986     if (!query.exec())
00987     {
00988         MythDB::DBError("Error in JobQueue::ChangeJobStatus()", query);
00989         return false;
00990     }
00991 
00992     return true;
00993 }
00994 
00995 bool JobQueue::ChangeJobComment(int jobID, QString comment)
00996 {
00997     if (jobID < 0)
00998         return false;
00999 
01000     LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobComment(%1, '%2')")
01001             .arg(jobID).arg(comment));
01002 
01003     MSqlQuery query(MSqlQuery::InitCon());
01004 
01005     query.prepare("UPDATE jobqueue SET comment = :COMMENT "
01006                   "WHERE id = :ID;");
01007 
01008     query.bindValue(":COMMENT", comment);
01009     query.bindValue(":ID", jobID);
01010 
01011     if (!query.exec())
01012     {
01013         MythDB::DBError("Error in JobQueue::ChangeJobComment()", query);
01014         return false;
01015     }
01016 
01017     return true;
01018 }
01019 
01020 bool JobQueue::ChangeJobArgs(int jobID, QString args)
01021 {
01022     if (jobID < 0)
01023         return false;
01024 
01025     MSqlQuery query(MSqlQuery::InitCon());
01026 
01027     query.prepare("UPDATE jobqueue SET args = :ARGS "
01028                   "WHERE id = :ID;");
01029 
01030     query.bindValue(":ARGS", args);
01031     query.bindValue(":ID", jobID);
01032 
01033     if (!query.exec())
01034     {
01035         MythDB::DBError("Error in JobQueue::ChangeJobArgs()", query);
01036         return false;
01037     }
01038 
01039     return true;
01040 }
01041 
01042 int JobQueue::GetRunningJobID(uint chanid, const QDateTime &recstartts)
01043 {
01044     runningJobsLock->lock();
01045     QMap<int, RunningJobInfo>::iterator it = runningJobs.begin();
01046     for (; it != runningJobs.end(); ++it)
01047     {
01048         RunningJobInfo jInfo = *it;
01049 
01050         if ((jInfo.pginfo->GetChanID()             == chanid) &&
01051             (jInfo.pginfo->GetRecordingStartTime() == recstartts))
01052         {
01053             runningJobsLock->unlock();
01054 
01055             return jInfo.id;
01056         }
01057     }
01058     runningJobsLock->unlock();
01059 
01060     return 0;
01061 }
01062 
01063 bool JobQueue::IsJobRunning(int jobType,
01064                             uint chanid, const QDateTime &recstartts)
01065 {
01066     int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
01067 
01068     if ((tmpStatus != JOB_UNKNOWN) && (tmpStatus != JOB_QUEUED) &&
01069         (!(tmpStatus & JOB_DONE)))
01070         return true;
01071 
01072     return false;
01073 }
01074 
01075 bool JobQueue::IsJobRunning(int jobType, const ProgramInfo &pginfo)
01076 {
01077     return JobQueue::IsJobRunning(
01078         jobType, pginfo.GetChanID(), pginfo.GetRecordingStartTime());
01079 }
01080 
01081 bool JobQueue::IsJobQueuedOrRunning(
01082     int jobType, uint chanid, const QDateTime &recstartts)
01083 {
01084     int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
01085 
01086     if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE)))
01087         return true;
01088 
01089     return false;
01090 }
01091 
01092 bool JobQueue::IsJobQueued(
01093     int jobType, uint chanid, const QDateTime &recstartts)
01094 {
01095     int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
01096 
01097     if (tmpStatus & JOB_QUEUED)
01098         return true;
01099 
01100     return false;
01101 }
01102 
01103 QString JobQueue::JobText(int jobType)
01104 {
01105     switch (jobType)
01106     {
01107         case JOB_TRANSCODE:  return tr("Transcode");
01108         case JOB_COMMFLAG:   return tr("Flag Commercials");
01109         case JOB_METADATA:   return tr("Look up Metadata");
01110     }
01111 
01112     if (jobType & JOB_USERJOB)
01113     {
01114         QString settingName =
01115             QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
01116         return gCoreContext->GetSetting(settingName, settingName);
01117     }
01118 
01119     return tr("Unknown Job");
01120 }
01121 
01122 QString JobQueue::StatusText(int status)
01123 {
01124     switch (status)
01125     {
01126 #define JOBSTATUS_STATUSTEXT(A,B,C) case A: return C;
01127         JOBSTATUS_MAP(JOBSTATUS_STATUSTEXT)
01128         default: break;
01129     }
01130     return tr("Undefined");
01131 }
01132 
01133 bool JobQueue::InJobRunWindow(int orStartsWithinMins)
01134 {
01135     QString queueStartTimeStr;
01136     QString queueEndTimeStr;
01137     QTime queueStartTime;
01138     QTime queueEndTime;
01139     QTime curTime = QTime::currentTime();
01140     bool inTimeWindow = false;
01141     orStartsWithinMins = orStartsWithinMins < 0 ? 0 : orStartsWithinMins;
01142 
01143     queueStartTimeStr = gCoreContext->GetSetting("JobQueueWindowStart", "00:00");
01144     queueEndTimeStr = gCoreContext->GetSetting("JobQueueWindowEnd", "23:59");
01145 
01146     queueStartTime = QTime::fromString(queueStartTimeStr, "hh:mm");
01147     if (!queueStartTime.isValid())
01148     {
01149         LOG(VB_GENERAL, LOG_ERR,
01150             QString("Invalid JobQueueWindowStart time '%1', using 00:00")
01151                         .arg(queueStartTimeStr));
01152         queueStartTime = QTime(0, 0);
01153     }
01154 
01155     queueEndTime = QTime::fromString(queueEndTimeStr, "hh:mm");
01156     if (!queueEndTime.isValid())
01157     {
01158         LOG(VB_GENERAL, LOG_ERR,
01159             QString("Invalid JobQueueWindowEnd time '%1', using 23:59")
01160                         .arg(queueEndTimeStr));
01161         queueEndTime = QTime(23, 59);
01162     }
01163 
01164     LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01165         QString("Currently set to run new jobs from %1 to %2")
01166                     .arg(queueStartTimeStr).arg(queueEndTimeStr));
01167 
01168     if ((queueStartTime <= curTime) && (curTime < queueEndTime))
01169     {
01170         inTimeWindow = true;
01171     }
01172     else if ((queueStartTime > queueEndTime) &&
01173              ((curTime < queueEndTime) || (queueStartTime <= curTime)))
01174     {
01175         inTimeWindow = true;
01176     }
01177     else if (orStartsWithinMins > 0)
01178     {
01179         // Check if the window starts soon
01180         if (curTime <= queueStartTime)
01181         {
01182             // Start time hasn't passed yet today
01183             if (queueStartTime.secsTo(curTime) <= (orStartsWithinMins * 60))
01184             {
01185                 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01186                     QString("Job run window will start within %1 minutes")
01187                             .arg(orStartsWithinMins));
01188                 inTimeWindow = true;
01189             }
01190         }
01191         else
01192         {
01193             // We passed the start time for today, try tomorrow
01194             QDateTime curDateTime = QDateTime::currentDateTime();
01195             QDateTime startDateTime = QDateTime(QDate::currentDate(),
01196                                                 queueStartTime).addDays(1);
01197 
01198             if (curDateTime.secsTo(startDateTime) <= (orStartsWithinMins * 60))
01199             {
01200                 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01201                     QString("Job run window will start "
01202                             "within %1 minutes (tomorrow)")
01203                         .arg(orStartsWithinMins));
01204                 inTimeWindow = true;
01205             }
01206         }
01207     }
01208 
01209     return inTimeWindow;
01210 }
01211 
01212 bool JobQueue::HasRunningOrPendingJobs(int startingWithinMins)
01213 {
01214     /* startingWithinMins <= 0 - look for any pending jobs
01215            > 0 -  only consider pending starting within this time */
01216     QMap<int, JobQueueEntry> jobs;
01217     QMap<int, JobQueueEntry>::Iterator it;
01218     QDateTime maxSchedRunTime = QDateTime::currentDateTime();
01219     int tmpStatus = 0;
01220     bool checkForQueuedJobs = (startingWithinMins <= 0
01221                                 || InJobRunWindow(startingWithinMins));
01222 
01223     if (checkForQueuedJobs && startingWithinMins > 0) {
01224         maxSchedRunTime = maxSchedRunTime.addSecs(startingWithinMins * 60);
01225         LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01226             QString("HasRunningOrPendingJobs: checking for jobs "
01227                     "starting before: %1").arg(maxSchedRunTime.toString()));
01228     }
01229 
01230     JobQueue::GetJobsInQueue(jobs, JOB_LIST_NOT_DONE);
01231 
01232     if (jobs.size()) {
01233         for (it = jobs.begin(); it != jobs.end(); ++it)
01234         {
01235             tmpStatus = (*it).status;
01236             if (tmpStatus == JOB_RUNNING) {
01237                 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01238                     QString("HasRunningOrPendingJobs: found running job"));
01239                 return true;
01240             }
01241 
01242             if (checkForQueuedJobs) {
01243                 if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE))) {
01244                     if (startingWithinMins <= 0) {
01245                         LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01246                             "HasRunningOrPendingJobs: found pending job");
01247                         return true;
01248                     }
01249                     else if ((*it).schedruntime <= maxSchedRunTime) {
01250                         LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01251                             QString("HasRunningOrPendingJobs: found pending "
01252                                     "job scheduled to start at: %1")
01253                                     .arg((*it).schedruntime.toString()));
01254                         return true;
01255                     }
01256                 }
01257             }
01258         }
01259     }
01260     return false;
01261 }
01262 
01263 
01264 int JobQueue::GetJobsInQueue(QMap<int, JobQueueEntry> &jobs, int findJobs)
01265 {
01266     JobQueueEntry thisJob;
01267     MSqlQuery query(MSqlQuery::InitCon());
01268     QDateTime recentDate = QDateTime::currentDateTime().addSecs(-4 * 3600);
01269     QString logInfo;
01270     int jobCount = 0;
01271     bool commflagWhileRecording =
01272              gCoreContext->GetNumSetting("AutoCommflagWhileRecording", 0);
01273 
01274     jobs.clear();
01275 
01276     query.prepare("SELECT j.id, j.chanid, j.starttime, j.inserttime, j.type, "
01277                       "j.cmds, j.flags, j.status, j.statustime, j.hostname, "
01278                       "j.args, j.comment, r.endtime, j.schedruntime "
01279                   "FROM jobqueue j "
01280                   "LEFT JOIN recorded r "
01281                   "  ON j.chanid = r.chanid AND j.starttime = r.starttime "
01282                   "ORDER BY j.schedruntime, j.id;");
01283 
01284     if (!query.exec())
01285     {
01286         MythDB::DBError("Error in JobQueue::GetJobs(), Unable to "
01287                         "query list of Jobs in Queue.", query);
01288         return 0;
01289     }
01290 
01291     LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01292         QString("GetJobsInQueue: findJobs search bitmask %1, "
01293                 "found %2 total jobs")
01294                     .arg(findJobs).arg(query.size()));
01295 
01296     while (query.next())
01297     {
01298         bool wantThisJob = false;
01299 
01300         thisJob.id = query.value(0).toInt();
01301         thisJob.recstartts = query.value(2).toDateTime();
01302         thisJob.schedruntime = query.value(13).toDateTime();
01303         thisJob.type = query.value(4).toInt();
01304         thisJob.status = query.value(7).toInt();
01305         thisJob.statustime = query.value(8).toDateTime();
01306         thisJob.startts = thisJob.recstartts.toString("yyyyMMddhhmmss");
01307 
01308         // -1 indicates the chanid is empty
01309         if (query.value(1).toInt() == -1)
01310         {
01311             thisJob.chanid = 0;
01312             logInfo = QString("jobID #%1").arg(thisJob.id);
01313         }
01314         else
01315         {
01316             thisJob.chanid = query.value(1).toUInt();
01317             logInfo = QString("chanid %1 @ %2").arg(thisJob.chanid)
01318                               .arg(thisJob.startts);
01319         }
01320 
01321         if ((query.value(12).toDateTime() > QDateTime::currentDateTime()) &&
01322             ((!commflagWhileRecording) ||
01323              ((thisJob.type != JOB_COMMFLAG) &&
01324               (thisJob.type != JOB_METADATA))))
01325         {
01326             LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01327                 QString("GetJobsInQueue: Ignoring '%1' Job "
01328                         "for %2 in %3 state.  Endtime in future.")
01329                             .arg(JobText(thisJob.type))
01330                             .arg(logInfo).arg(StatusText(thisJob.status)));
01331             continue;
01332         }
01333 
01334         if ((findJobs & JOB_LIST_ALL) ||
01335             ((findJobs & JOB_LIST_DONE) &&
01336              (thisJob.status & JOB_DONE)) ||
01337             ((findJobs & JOB_LIST_NOT_DONE) &&
01338              (!(thisJob.status & JOB_DONE))) ||
01339             ((findJobs & JOB_LIST_ERROR) &&
01340              (thisJob.status == JOB_ERRORED)) ||
01341             ((findJobs & JOB_LIST_RECENT) &&
01342              (thisJob.statustime > recentDate)))
01343             wantThisJob = true;
01344 
01345         if (!wantThisJob)
01346         {
01347             LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01348                 QString("GetJobsInQueue: Ignore '%1' Job for %2 in %3 state.")
01349                             .arg(JobText(thisJob.type))
01350                             .arg(logInfo).arg(StatusText(thisJob.status)));
01351             continue;
01352         }
01353 
01354         LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01355             QString("GetJobsInQueue: Found '%1' Job for %2 in %3 state.")
01356                         .arg(JobText(thisJob.type))
01357                         .arg(logInfo).arg(StatusText(thisJob.status)));
01358 
01359         thisJob.inserttime = query.value(3).toDateTime();
01360         thisJob.cmds = query.value(5).toInt();
01361         thisJob.flags = query.value(6).toInt();
01362         thisJob.hostname = query.value(9).toString();
01363         thisJob.args = query.value(10).toString();
01364         thisJob.comment = query.value(11).toString();
01365 
01366         if ((thisJob.type & JOB_USERJOB) &&
01367             (UserJobTypeToIndex(thisJob.type) == 0))
01368         {
01369             thisJob.type = JOB_NONE;
01370             LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01371                 QString("GetJobsInQueue: Unknown Job Type: %1")
01372                     .arg(thisJob.type));
01373         }
01374 
01375         if (thisJob.type != JOB_NONE)
01376             jobs[jobCount++] = thisJob;
01377     }
01378 
01379     return jobCount;
01380 }
01381 
01382 bool JobQueue::ChangeJobHost(int jobID, QString newHostname)
01383 {
01384     MSqlQuery query(MSqlQuery::InitCon());
01385 
01386     if (!newHostname.isEmpty())
01387     {
01388         query.prepare("UPDATE jobqueue SET hostname = :NEWHOSTNAME "
01389                       "WHERE hostname = :EMPTY AND id = :ID;");
01390         query.bindValue(":NEWHOSTNAME", newHostname);
01391         query.bindValue(":EMPTY", "");
01392         query.bindValue(":ID", jobID);
01393     }
01394     else
01395     {
01396         query.prepare("UPDATE jobqueue SET hostname = :EMPTY "
01397                       "WHERE id = :ID;");
01398         query.bindValue(":EMPTY", "");
01399         query.bindValue(":ID", jobID);
01400     }
01401 
01402     if (!query.exec())
01403     {
01404         MythDB::DBError(QString("Error in JobQueue::ChangeJobHost(), "
01405                                 "Unable to set hostname to '%1' for "
01406                                 "job %2.").arg(newHostname).arg(jobID),
01407                         query);
01408         return false;
01409     }
01410 
01411     if (query.numRowsAffected() > 0)
01412         return true;
01413 
01414     return false;
01415 }
01416 
01417 bool JobQueue::AllowedToRun(JobQueueEntry job)
01418 {
01419     QString allowSetting;
01420 
01421     if ((!job.hostname.isEmpty()) &&
01422         (job.hostname != m_hostname))
01423         return false;
01424 
01425     if (job.type & JOB_USERJOB)
01426     {
01427         allowSetting =
01428             QString("JobAllowUserJob%1").arg(UserJobTypeToIndex(job.type));
01429     }
01430     else
01431     {
01432         switch (job.type)
01433         {
01434             case JOB_TRANSCODE:  allowSetting = "JobAllowTranscode";
01435                                  break;
01436             case JOB_COMMFLAG:   allowSetting = "JobAllowCommFlag";
01437                                  break;
01438             case JOB_METADATA:   allowSetting = "JobAllowMetadata";
01439                                  break;
01440             default:             return false;
01441         }
01442     }
01443 
01444     if (gCoreContext->GetNumSetting(allowSetting, 1))
01445         return true;
01446 
01447     return false;
01448 }
01449 
01450 enum JobCmds JobQueue::GetJobCmd(int jobID)
01451 {
01452     MSqlQuery query(MSqlQuery::InitCon());
01453 
01454     query.prepare("SELECT cmds FROM jobqueue WHERE id = :ID;");
01455 
01456     query.bindValue(":ID", jobID);
01457 
01458     if (query.exec())
01459     {
01460         if (query.next())
01461             return (enum JobCmds)query.value(0).toInt();
01462     }
01463     else
01464     {
01465         MythDB::DBError("Error in JobQueue::GetJobCmd()", query);
01466     }
01467 
01468     return JOB_RUN;
01469 }
01470 
01471 QString JobQueue::GetJobArgs(int jobID)
01472 {
01473     MSqlQuery query(MSqlQuery::InitCon());
01474 
01475     query.prepare("SELECT args FROM jobqueue WHERE id = :ID;");
01476 
01477     query.bindValue(":ID", jobID);
01478 
01479     if (query.exec())
01480     {
01481         if (query.next())
01482             return query.value(0).toString();
01483     }
01484     else
01485     {
01486         MythDB::DBError("Error in JobQueue::GetJobArgs()", query);
01487     }
01488 
01489     return QString("");
01490 }
01491 
01492 enum JobFlags JobQueue::GetJobFlags(int jobID)
01493 {
01494     MSqlQuery query(MSqlQuery::InitCon());
01495 
01496     query.prepare("SELECT flags FROM jobqueue WHERE id = :ID;");
01497 
01498     query.bindValue(":ID", jobID);
01499 
01500     if (query.exec())
01501     {
01502         if (query.next())
01503             return (enum JobFlags)query.value(0).toInt();
01504     }
01505     else
01506     {
01507         MythDB::DBError("Error in JobQueue::GetJobFlags()", query);
01508     }
01509 
01510     return JOB_NO_FLAGS;
01511 }
01512 
01513 enum JobStatus JobQueue::GetJobStatus(int jobID)
01514 {
01515     MSqlQuery query(MSqlQuery::InitCon());
01516 
01517     query.prepare("SELECT status FROM jobqueue WHERE id = :ID;");
01518 
01519     query.bindValue(":ID", jobID);
01520 
01521     if (query.exec())
01522     {
01523         if (query.next())
01524             return (enum JobStatus)query.value(0).toInt();
01525     }
01526     else
01527     {
01528         MythDB::DBError("Error in JobQueue::GetJobStatus()", query);
01529     }
01530     return JOB_UNKNOWN;
01531 }
01532 
01533 enum JobStatus JobQueue::GetJobStatus(
01534     int jobType, uint chanid, const QDateTime &recstartts)
01535 {
01536     MSqlQuery query(MSqlQuery::InitCon());
01537 
01538     query.prepare("SELECT status FROM jobqueue WHERE type = :TYPE "
01539                   "AND chanid = :CHANID AND starttime = :STARTTIME;");
01540 
01541     query.bindValue(":TYPE", jobType);
01542     query.bindValue(":CHANID", chanid);
01543     query.bindValue(":STARTTIME", recstartts);
01544 
01545     if (query.exec())
01546     {
01547         if (query.next())
01548             return (enum JobStatus)query.value(0).toInt();
01549     }
01550     else
01551     {
01552         MythDB::DBError("Error in JobQueue::GetJobStatus()", query);
01553     }
01554     return JOB_UNKNOWN;
01555 }
01556 
01557 void JobQueue::RecoverQueue(bool justOld)
01558 {
01559     QMap<int, JobQueueEntry> jobs;
01560     QString msg;
01561     QString logInfo;
01562 
01563     msg = QString("RecoverQueue: Checking for unfinished jobs to "
01564                   "recover.");
01565     LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
01566 
01567     GetJobsInQueue(jobs);
01568 
01569     if (jobs.size())
01570     {
01571         QMap<int, JobQueueEntry>::Iterator it;
01572         QDateTime oldDate = QDateTime::currentDateTime().addDays(-1);
01573         QString hostname = gCoreContext->GetHostName();
01574         int tmpStatus;
01575         int tmpCmds;
01576 
01577         for (it = jobs.begin(); it != jobs.end(); ++it)
01578         {
01579             tmpCmds = (*it).cmds;
01580             tmpStatus = (*it).status;
01581 
01582             if (!(*it).chanid)
01583                 logInfo = QString("jobID #%1").arg((*it).id);
01584             else
01585                 logInfo = QString("chanid %1 @ %2").arg((*it).chanid)
01586                                   .arg((*it).startts);
01587 
01588             if (((tmpStatus == JOB_STARTING) ||
01589                  (tmpStatus == JOB_RUNNING) ||
01590                  (tmpStatus == JOB_PAUSED) ||
01591                  (tmpCmds & JOB_STOP) ||
01592                  (tmpStatus == JOB_STOPPING)) &&
01593                 (((!justOld) &&
01594                   ((*it).hostname == hostname)) ||
01595                  ((*it).statustime < oldDate)))
01596             {
01597                 msg = QString("RecoverQueue: Recovering '%1' for %2 "
01598                               "from '%3' state.")
01599                               .arg(JobText((*it).type))
01600                               .arg(logInfo).arg(StatusText((*it).status));
01601                 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
01602 
01603                 ChangeJobStatus((*it).id, JOB_QUEUED, "");
01604                 ChangeJobCmds((*it).id, JOB_RUN);
01605                 if (!gCoreContext->GetNumSetting("JobsRunOnRecordHost", 0))
01606                     ChangeJobHost((*it).id, "");
01607             }
01608             else
01609             {
01610 #if 0
01611                 msg = QString("RecoverQueue: Ignoring '%1' for %2 "
01612                               "in '%3' state.")
01613                               .arg(JobText((*it).type))
01614                               .arg(logInfo).arg(StatusText((*it).status));
01615                 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
01616 #endif
01617             }
01618         }
01619     }
01620 }
01621 
01622 void JobQueue::CleanupOldJobsInQueue()
01623 {
01624     MSqlQuery delquery(MSqlQuery::InitCon());
01625     QDateTime donePurgeDate = QDateTime::currentDateTime().addDays(-2);
01626     QDateTime errorsPurgeDate = QDateTime::currentDateTime().addDays(-4);
01627 
01628     delquery.prepare("DELETE FROM jobqueue "
01629                      "WHERE (status in (:FINISHED, :ABORTED, :CANCELLED) "
01630                      "AND statustime < :DONEPURGEDATE) "
01631                      "OR (status in (:ERRORED) "
01632                      "AND statustime < :ERRORSPURGEDATE) ");
01633     delquery.bindValue(":FINISHED", JOB_FINISHED);
01634     delquery.bindValue(":ABORTED", JOB_ABORTED);
01635     delquery.bindValue(":CANCELLED", JOB_CANCELLED);
01636     delquery.bindValue(":ERRORED", JOB_ERRORED);
01637     delquery.bindValue(":DONEPURGEDATE", donePurgeDate);
01638     delquery.bindValue(":ERRORSPURGEDATE", errorsPurgeDate);
01639 
01640     if (!delquery.exec())
01641     {
01642         MythDB::DBError("JobQueue::CleanupOldJobsInQueue: Error deleting "
01643                         "old finished jobs.", delquery);
01644     }
01645 }
01646 
01647 void JobQueue::ProcessJob(JobQueueEntry job)
01648 {
01649     int jobID = job.id;
01650     QString name = QString("jobqueue%1%2").arg(jobID).arg(random());
01651 
01652     if (!MSqlQuery::testDBConnection())
01653     {
01654         LOG(VB_JOBQUEUE, LOG_ERR, LOC +
01655                 "ProcessJob(): Unable to open database connection");
01656         return;
01657     }
01658 
01659     ChangeJobStatus(jobID, JOB_PENDING);
01660     ProgramInfo *pginfo = NULL;
01661 
01662     if (job.chanid)
01663     {
01664         pginfo = new ProgramInfo(job.chanid, job.recstartts);
01665 
01666         if (!pginfo->GetChanID())
01667         {
01668             LOG(VB_JOBQUEUE, LOG_ERR, LOC +
01669                 QString("Unable to retrieve program info for chanid %1 @ %2")
01670                     .arg(job.chanid).arg(job.recstartts.toString()));
01671 
01672             ChangeJobStatus(jobID, JOB_ERRORED,
01673                             "Unable to retrieve Program Info from database");
01674 
01675             delete pginfo;
01676 
01677             return;
01678         }
01679 
01680         pginfo->SetPathname(pginfo->GetPlaybackURL());
01681     }
01682 
01683 
01684     runningJobsLock->lock();
01685 
01686     ChangeJobStatus(jobID, JOB_STARTING);
01687     RunningJobInfo jInfo;
01688     jInfo.type    = job.type;
01689     jInfo.id      = jobID;
01690     jInfo.flag    = JOB_RUN;
01691     jInfo.desc    = GetJobDescription(job.type);
01692     jInfo.command = GetJobCommand(jobID, job.type, pginfo);
01693     jInfo.pginfo  = pginfo;
01694 
01695     runningJobs[jobID] = jInfo;
01696 
01697     if (pginfo)
01698         pginfo->MarkAsInUse(true, kJobQueueInUseID);
01699 
01700     if (pginfo && pginfo->GetRecordingGroup() == "Deleted")
01701     {
01702         ChangeJobStatus(jobID, JOB_CANCELLED,
01703                         "Program has been deleted");
01704         RemoveRunningJob(jobID);
01705     }
01706     else if ((job.type == JOB_TRANSCODE) ||
01707         (runningJobs[jobID].command == "mythtranscode"))
01708     {
01709         StartChildJob(TranscodeThread, jobID);
01710     }
01711     else if ((job.type == JOB_COMMFLAG) ||
01712              (runningJobs[jobID].command == "mythcommflag"))
01713     {
01714         StartChildJob(FlagCommercialsThread, jobID);
01715     }
01716     else if ((job.type == JOB_METADATA) ||
01717              (runningJobs[jobID].command == "mythmetadatalookup"))
01718     {
01719         StartChildJob(MetadataLookupThread, jobID);
01720     }
01721     else if (job.type & JOB_USERJOB)
01722     {
01723         StartChildJob(UserJobThread, jobID);
01724     }
01725     else
01726     {
01727         ChangeJobStatus(jobID, JOB_ERRORED,
01728                         "UNKNOWN JobType, unable to process!");
01729         RemoveRunningJob(jobID);
01730     }
01731 
01732     runningJobsLock->unlock();
01733 }
01734 
01735 void JobQueue::StartChildJob(void *(*ChildThreadRoutine)(void *), int jobID)
01736 {
01737     JobThreadStruct *jts = new JobThreadStruct;
01738     jts->jq = this;
01739     jts->jobID = jobID;
01740 
01741     pthread_t childThread;
01742     pthread_attr_t attr;
01743     pthread_attr_init(&attr);
01744     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
01745     pthread_create(&childThread, &attr, ChildThreadRoutine, jts);
01746     pthread_attr_destroy(&attr);
01747 }
01748 
01749 QString JobQueue::GetJobDescription(int jobType)
01750 {
01751     if (jobType == JOB_TRANSCODE)
01752         return "Transcode";
01753     else if (jobType == JOB_COMMFLAG)
01754         return "Commercial Detection";
01755     else if (!(jobType & JOB_USERJOB))
01756         return "Unknown Job";
01757 
01758     QString descSetting =
01759         QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
01760 
01761     return gCoreContext->GetSetting(descSetting, "Unknown Job");
01762 }
01763 
01764 QString JobQueue::GetJobCommand(int id, int jobType, ProgramInfo *tmpInfo)
01765 {
01766     QString command;
01767     MSqlQuery query(MSqlQuery::InitCon());
01768 
01769     if (jobType == JOB_TRANSCODE)
01770     {
01771         command = gCoreContext->GetSetting("JobQueueTranscodeCommand");
01772         if (command.trimmed().isEmpty())
01773             command = "mythtranscode";
01774 
01775         if (command == "mythtranscode")
01776             return command;
01777     }
01778     else if (jobType == JOB_COMMFLAG)
01779     {
01780         command = gCoreContext->GetSetting("JobQueueCommFlagCommand");
01781         if (command.trimmed().isEmpty())
01782             command = "mythcommflag";
01783 
01784         if (command == "mythcommflag")
01785             return command;
01786     }
01787     else if (jobType & JOB_USERJOB)
01788     {
01789         command = gCoreContext->GetSetting(
01790                     QString("UserJob%1").arg(UserJobTypeToIndex(jobType)), "");
01791     }
01792 
01793     if (!command.isEmpty())
01794     {
01795         command.replace("%JOBID%", QString("%1").arg(id));
01796     }
01797 
01798     if (!command.isEmpty() && tmpInfo)
01799     {
01800         tmpInfo->SubstituteMatches(command);
01801 
01802         command.replace("%VERBOSELEVEL%", QString("%1").arg(verboseMask));
01803         command.replace("%VERBOSEMODE%", QString("%1").arg(logPropagateArgs));
01804 
01805         uint transcoder = tmpInfo->QueryTranscoderID();
01806         command.replace("%TRANSPROFILE%",
01807                         (RecordingProfile::TranscoderAutodetect == transcoder) ?
01808                         "autodetect" : QString::number(transcoder));
01809     }
01810 
01811     return command;
01812 }
01813 
01814 void JobQueue::RemoveRunningJob(int id)
01815 {
01816     runningJobsLock->lock();
01817 
01818     if (runningJobs.contains(id))
01819     {
01820         ProgramInfo *pginfo = runningJobs[id].pginfo;
01821         if (pginfo)
01822         {
01823             pginfo->MarkAsInUse(false, kJobQueueInUseID);
01824             delete pginfo;
01825         }
01826 
01827         runningJobs.remove(id);
01828     }
01829 
01830     runningJobsLock->unlock();
01831 }
01832 
01833 QString JobQueue::PrettyPrint(off_t bytes)
01834 {
01835     // Pretty print "bytes" as KB, MB, GB, TB, etc., subject to the desired
01836     // number of units
01837     static const struct {
01838         const char      *suffix;
01839         unsigned int    max;
01840         int         precision;
01841     } pptab[] = {
01842         { "bytes", 9999, 0 },
01843         { "kB", 999, 0 },
01844         { "MB", 999, 1 },
01845         { "GB", 999, 1 },
01846         { "TB", 999, 1 },
01847         { "PB", 999, 1 },
01848         { "EB", 999, 1 },
01849         { "ZB", 999, 1 },
01850         { "YB", 0, 0 },
01851     };
01852     unsigned int    ii;
01853     float           fbytes = bytes;
01854 
01855     ii = 0;
01856     while (pptab[ii].max && fbytes > pptab[ii].max) {
01857         fbytes /= 1024;
01858         ii++;
01859     }
01860 
01861     return QString("%1 %2")
01862         .arg(fbytes, 0, 'f', pptab[ii].precision)
01863         .arg(pptab[ii].suffix);
01864 }
01865 
01866 void *JobQueue::TranscodeThread(void *param)
01867 {
01868     JobThreadStruct *jts = (JobThreadStruct *)param;
01869     JobQueue *jq = jts->jq;
01870 
01871     MThread::ThreadSetup(QString("Transcode_%1").arg(jts->jobID));
01872     jq->DoTranscodeThread(jts->jobID);
01873     MThread::ThreadCleanup();
01874 
01875     delete jts;
01876 
01877     return NULL;
01878 }
01879 
01880 void JobQueue::DoTranscodeThread(int jobID)
01881 {
01882     // We can't currently transcode non-recording files w/o a ProgramInfo
01883     runningJobsLock->lock();
01884     if (!runningJobs[jobID].pginfo)
01885     {
01886         LOG(VB_JOBQUEUE, LOG_ERR, LOC +
01887             "The JobQueue cannot currently transcode files that do not "
01888             "have a chanid/starttime in the recorded table.");
01889         ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
01890         RemoveRunningJob(jobID);
01891         runningJobsLock->unlock();
01892         return;
01893     }
01894 
01895     ProgramInfo *program_info = runningJobs[jobID].pginfo;
01896     runningJobsLock->unlock();
01897 
01898     ChangeJobStatus(jobID, JOB_RUNNING);
01899 
01900     // make sure flags are up to date
01901     program_info->Reload();
01902 
01903     bool useCutlist = program_info->HasCutlist() &&
01904         !!(GetJobFlags(jobID) & JOB_USE_CUTLIST);
01905 
01906     uint transcoder = program_info->QueryTranscoderID();
01907     QString profilearg =
01908         (RecordingProfile::TranscoderAutodetect == transcoder) ?
01909         "autodetect" : QString::number(transcoder);
01910 
01911     QString path;
01912     QString command;
01913 
01914     runningJobsLock->lock();
01915     if (runningJobs[jobID].command == "mythtranscode")
01916     {
01917         path = GetInstallPrefix() + "/bin/mythtranscode";
01918         command = QString("%1 -j %2 --profile %3")
01919                   .arg(path).arg(jobID).arg(profilearg);
01920         if (useCutlist)
01921             command += " --honorcutlist";
01922         command += logPropagateArgs;
01923     }
01924     else
01925     {
01926         command = runningJobs[jobID].command;
01927 
01928         QStringList tokens = command.split(" ", QString::SkipEmptyParts);
01929         if (!tokens.empty())
01930             path = tokens[0];
01931     }
01932     runningJobsLock->unlock();
01933 
01934     if (jobQueueCPU < 2)
01935     {
01936         myth_nice(17);
01937         myth_ioprio((0 == jobQueueCPU) ? 8 : 7);
01938     }
01939 
01940     QString transcoderName;
01941     if (transcoder == RecordingProfile::TranscoderAutodetect)
01942     {
01943         transcoderName = "Autodetect";
01944     }
01945     else
01946     {
01947         MSqlQuery query(MSqlQuery::InitCon());
01948         query.prepare("SELECT name FROM recordingprofiles WHERE id = :ID;");
01949         query.bindValue(":ID", transcoder);
01950         if (query.exec() && query.next())
01951         {
01952             transcoderName = query.value(0).toString();
01953         }
01954         else
01955         {
01956             /* Unexpected value; log it. */
01957             transcoderName = QString("Autodetect(%1)").arg(transcoder);
01958         }
01959     }
01960 
01961     QString msg;
01962     bool retry = true;
01963     int retrylimit = 3;
01964     while (retry)
01965     {
01966         retry = false;
01967 
01968         ChangeJobStatus(jobID, JOB_STARTING);
01969         program_info->SaveTranscodeStatus(TRANSCODING_RUNNING);
01970 
01971         QString filename = program_info->GetPlaybackURL(false, true);
01972 
01973         long long filesize = 0;
01974         long long origfilesize = QFileInfo(filename).size();
01975 
01976         QString msg = QString("Transcode %1")
01977                               .arg(StatusText(GetJobStatus(jobID)));
01978 
01979         QString detailstr = QString("%1: %2 (%3)")
01980             .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
01981             .arg(transcoderName)
01982             .arg(PrettyPrint(origfilesize));
01983         QByteArray details = detailstr.toLocal8Bit();
01984 
01985         LOG(VB_GENERAL, LOG_INFO, LOC + QString("%1 for %2")
01986                 .arg(msg).arg(details.constData()));
01987 
01988         LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
01989                                            .arg(command));
01990 
01991         GetMythDB()->GetDBManager()->CloseDatabases();
01992         uint result = myth_system(command);
01993         int status = GetJobStatus(jobID);
01994 
01995         if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) ||
01996             (result == GENERIC_EXIT_CMD_NOT_FOUND))
01997         {
01998             ChangeJobStatus(jobID, JOB_ERRORED,
01999                 "ERROR: Unable to find mythtranscode, check backend logs.");
02000             program_info->SaveTranscodeStatus(TRANSCODING_NOT_TRANSCODED);
02001 
02002             msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
02003             detailstr = QString("%1: %2 does not exist or is not executable")
02004                 .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
02005                 .arg(path);
02006             details = detailstr.toLocal8Bit();
02007 
02008             LOG(VB_GENERAL, LOG_ERR, LOC +
02009                 QString("%1 for %2").arg(msg).arg(details.constData()));
02010         }
02011         else if (result == GENERIC_EXIT_RESTART && retrylimit > 0)
02012         {
02013             LOG(VB_JOBQUEUE, LOG_INFO, LOC + "Transcode command restarting");
02014             retry = true;
02015             retrylimit--;
02016 
02017             program_info->SaveTranscodeStatus(TRANSCODING_NOT_TRANSCODED);
02018         }
02019         else
02020         {
02021             if (status == JOB_FINISHED)
02022             {
02023                 ChangeJobStatus(jobID, JOB_FINISHED, "Finished.");
02024                 retry = false;
02025 
02026                 filename = program_info->GetPlaybackURL(false, true);
02027                 QFileInfo st(filename);
02028 
02029                 if (st.exists())
02030                 {
02031                     filesize = st.size();
02032 
02033                     QString comment = QString("%1: %2 => %3")
02034                                             .arg(transcoderName)
02035                                             .arg(PrettyPrint(origfilesize))
02036                                             .arg(PrettyPrint(filesize));
02037                     ChangeJobComment(jobID, comment);
02038 
02039                     if (filesize > 0)
02040                         program_info->SaveFilesize(filesize);
02041 
02042                     details = (QString("%1: %2 (%3)")
02043                                .arg(program_info->toString(
02044                                         ProgramInfo::kTitleSubtitle))
02045                                .arg(transcoderName)
02046                                .arg(PrettyPrint(filesize))).toLocal8Bit();
02047                 }
02048                 else
02049                 {
02050                     QString comment =
02051                         QString("could not stat '%1'").arg(filename);
02052 
02053                     ChangeJobStatus(jobID, JOB_FINISHED, comment);
02054 
02055                     details = (QString("%1: %2")
02056                                .arg(program_info->toString(
02057                                         ProgramInfo::kTitleSubtitle))
02058                                .arg(comment)).toLocal8Bit();
02059                 }
02060 
02061                 program_info->SaveTranscodeStatus(TRANSCODING_COMPLETE);
02062             }
02063             else
02064             {
02065                 program_info->SaveTranscodeStatus(TRANSCODING_NOT_TRANSCODED);
02066 
02067                 QString comment =
02068                     QString("exit status %1, job status was \"%2\"")
02069                     .arg(result)
02070                     .arg(StatusText(status));
02071 
02072                 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02073 
02074                 details = (QString("%1: %2 (%3)")
02075                            .arg(program_info->toString(
02076                                     ProgramInfo::kTitleSubtitle))
02077                            .arg(transcoderName)
02078                            .arg(comment)).toLocal8Bit().constData();
02079             }
02080 
02081             msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
02082             LOG(VB_GENERAL, LOG_INFO, LOC + msg + ": " + details);
02083         }
02084     }
02085 
02086     if (retrylimit == 0)
02087     {
02088         LOG(VB_JOBQUEUE, LOG_ERR, LOC + "Retry limit exceeded for transcoder, "
02089                                         "setting job status to errored.");
02090         ChangeJobStatus(jobID, JOB_ERRORED, "Retry limit exceeded");
02091     }
02092 
02093     RemoveRunningJob(jobID);
02094 }
02095 
02096 void *JobQueue::MetadataLookupThread(void *param)
02097 {
02098     JobThreadStruct *jts = (JobThreadStruct *)param;
02099     JobQueue *jq = jts->jq;
02100 
02101     MThread::ThreadSetup(QString("Metadata_%1").arg(jts->jobID));
02102     jq->DoMetadataLookupThread(jts->jobID);
02103     MThread::ThreadCleanup();
02104 
02105     delete jts;
02106 
02107     return NULL;
02108 }
02109 
02110 void JobQueue::DoMetadataLookupThread(int jobID)
02111 {
02112     // We can't currently lookup non-recording files w/o a ProgramInfo
02113     runningJobsLock->lock();
02114     if (!runningJobs[jobID].pginfo)
02115     {
02116         LOG(VB_JOBQUEUE, LOG_ERR, LOC +
02117             "The JobQueue cannot currently perform lookups for items which do "
02118             "not have a chanid/starttime in the recorded table.");
02119         ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
02120         RemoveRunningJob(jobID);
02121         runningJobsLock->unlock();
02122         return;
02123     }
02124 
02125     ProgramInfo *program_info = runningJobs[jobID].pginfo;
02126     runningJobsLock->unlock();
02127 
02128     QString detailstr = QString("%1 recorded from channel %3")
02129         .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
02130         .arg(program_info->toString(ProgramInfo::kRecordingKey));
02131     QByteArray details = detailstr.toLocal8Bit();
02132 
02133     if (!MSqlQuery::testDBConnection())
02134     {
02135         QString msg = QString("Metadata Lookup failed.  Could not open "
02136                               "new database connection for %1. "
02137                               "Program cannot be looked up.")
02138                               .arg(details.constData());
02139         LOG(VB_GENERAL, LOG_ERR, LOC + msg);
02140 
02141         ChangeJobStatus(jobID, JOB_ERRORED,
02142                         "Could not open new database connection for "
02143                         "metadata lookup.");
02144 
02145         delete program_info;
02146         return;
02147     }
02148 
02149     QString msg = tr("Metadata Lookup Starting");
02150     LOG(VB_GENERAL, LOG_INFO,
02151         LOC + "Metadata Lookup Starting for " + detailstr);
02152 
02153     uint retVal = 0;
02154     QString path;
02155     QString command;
02156 
02157     path = GetInstallPrefix() + "/bin/mythmetadatalookup";
02158     command = QString("%1 -j %2")
02159                       .arg(path).arg(jobID);
02160     command += logPropagateArgs;
02161 
02162     LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
02163             .arg(command));
02164 
02165     GetMythDB()->GetDBManager()->CloseDatabases();
02166     retVal = myth_system(command);
02167     int priority = LOG_NOTICE;
02168     QString comment;
02169 
02170     runningJobsLock->lock();
02171 
02172     if ((retVal == GENERIC_EXIT_DAEMONIZING_ERROR) ||
02173         (retVal == GENERIC_EXIT_CMD_NOT_FOUND))
02174     {
02175         comment = tr("Unable to find mythmetadatalookup");
02176         ChangeJobStatus(jobID, JOB_ERRORED, comment);
02177         priority = LOG_WARNING;
02178     }
02179     else if (runningJobs[jobID].flag == JOB_STOP)
02180     {
02181         comment = tr("Aborted by user");
02182         ChangeJobStatus(jobID, JOB_ABORTED, comment);
02183         priority = LOG_WARNING;
02184     }
02185     else if (retVal == GENERIC_EXIT_NO_RECORDING_DATA)
02186     {
02187         comment = tr("Unable to open file or init decoder");
02188         ChangeJobStatus(jobID, JOB_ERRORED, comment);
02189         priority = LOG_WARNING;
02190     }
02191     else if (retVal >= GENERIC_EXIT_NOT_OK) // 256 or above - error
02192     {
02193         comment = tr("Failed with exit status %1").arg(retVal);
02194         ChangeJobStatus(jobID, JOB_ERRORED, comment);
02195         priority = LOG_WARNING;
02196     }
02197     else
02198     {
02199         comment = tr("Metadata Lookup Complete.");
02200         ChangeJobStatus(jobID, JOB_FINISHED, comment);
02201 
02202         program_info->SendUpdateEvent();
02203     }
02204 
02205     msg = tr("Metadata Lookup %1", "Job ID")
02206         .arg(StatusText(GetJobStatus(jobID)));
02207 
02208     if (!comment.isEmpty())
02209     {
02210         detailstr += QString(" (%1)").arg(comment);
02211         details = detailstr.toLocal8Bit();
02212     }
02213 
02214     if (priority <= LOG_WARNING)
02215         LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details.constData());
02216 
02217     RemoveRunningJob(jobID);
02218     runningJobsLock->unlock();
02219 }
02220 
02221 void *JobQueue::FlagCommercialsThread(void *param)
02222 {
02223     JobThreadStruct *jts = (JobThreadStruct *)param;
02224     JobQueue *jq = jts->jq;
02225 
02226     MThread::ThreadSetup(QString("Commflag_%1").arg(jts->jobID));
02227     jq->DoFlagCommercialsThread(jts->jobID);
02228     MThread::ThreadCleanup();
02229 
02230     delete jts;
02231 
02232     return NULL;
02233 }
02234 
02235 void JobQueue::DoFlagCommercialsThread(int jobID)
02236 {
02237     // We can't currently commflag non-recording files w/o a ProgramInfo
02238     runningJobsLock->lock();
02239     if (!runningJobs[jobID].pginfo)
02240     {
02241         LOG(VB_JOBQUEUE, LOG_ERR, LOC +
02242                 "The JobQueue cannot currently commflag files that do not "
02243                 "have a chanid/starttime in the recorded table.");
02244         ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
02245         RemoveRunningJob(jobID);
02246         runningJobsLock->unlock();
02247         return;
02248     }
02249 
02250     ProgramInfo *program_info = runningJobs[jobID].pginfo;
02251     runningJobsLock->unlock();
02252 
02253     QString detailstr = QString("%1 recorded from channel %3")
02254         .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
02255         .arg(program_info->toString(ProgramInfo::kRecordingKey));
02256     QByteArray details = detailstr.toLocal8Bit();
02257 
02258     if (!MSqlQuery::testDBConnection())
02259     {
02260         QString msg = QString("Commercial Detection failed.  Could not open "
02261                               "new database connection for %1. "
02262                               "Program cannot be flagged.")
02263                               .arg(details.constData());
02264         LOG(VB_GENERAL, LOG_ERR, LOC + msg);
02265 
02266         ChangeJobStatus(jobID, JOB_ERRORED,
02267                         "Could not open new database connection for "
02268                         "commercial detector.");
02269 
02270         delete program_info;
02271         return;
02272     }
02273 
02274     QString msg = tr("Commercial Detection Starting");
02275     LOG(VB_GENERAL, LOG_INFO,
02276         LOC + "Commercial Detection Starting for " + detailstr);
02277 
02278     uint breaksFound = 0;
02279     QString path;
02280     QString command;
02281 
02282     runningJobsLock->lock();
02283     if (runningJobs[jobID].command == "mythcommflag")
02284     {
02285         path = GetInstallPrefix() + "/bin/mythcommflag";
02286         command = QString("%1 -j %2 --noprogress")
02287                           .arg(path).arg(jobID);
02288         command += logPropagateArgs;
02289     }
02290     else
02291     {
02292         command = runningJobs[jobID].command;
02293         QStringList tokens = command.split(" ", QString::SkipEmptyParts);
02294         if (!tokens.empty())
02295             path = tokens[0];
02296     }
02297     runningJobsLock->unlock();
02298 
02299     LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
02300             .arg(command));
02301 
02302     GetMythDB()->GetDBManager()->CloseDatabases();
02303     breaksFound = myth_system(command, kMSLowExitVal);
02304     int priority = LOG_NOTICE;
02305     QString comment;
02306 
02307     runningJobsLock->lock();
02308 
02309     if ((breaksFound == GENERIC_EXIT_DAEMONIZING_ERROR) ||
02310         (breaksFound == GENERIC_EXIT_CMD_NOT_FOUND))
02311     {
02312         comment = tr("Unable to find mythcommflag");
02313         ChangeJobStatus(jobID, JOB_ERRORED, comment);
02314         priority = LOG_WARNING;
02315     }
02316     else if (runningJobs[jobID].flag == JOB_STOP)
02317     {
02318         comment = tr("Aborted by user");
02319         ChangeJobStatus(jobID, JOB_ABORTED, comment);
02320         priority = LOG_WARNING;
02321     }
02322     else if (breaksFound == GENERIC_EXIT_NO_RECORDING_DATA)
02323     {
02324         comment = tr("Unable to open file or init decoder");
02325         ChangeJobStatus(jobID, JOB_ERRORED, comment);
02326         priority = LOG_WARNING;
02327     }
02328     else if (breaksFound >= GENERIC_EXIT_NOT_OK) // 256 or above - error
02329     {
02330         comment = tr("Failed with exit status %1").arg(breaksFound);
02331         ChangeJobStatus(jobID, JOB_ERRORED, comment);
02332         priority = LOG_WARNING;
02333     }
02334     else
02335     {
02336         comment = tr("%n commercial break(s)", "", breaksFound);
02337         ChangeJobStatus(jobID, JOB_FINISHED, comment);
02338 
02339         program_info->SendUpdateEvent();
02340 
02341         if (!program_info->IsLocal())
02342             program_info->SetPathname(program_info->GetPlaybackURL(false,true));
02343         if (program_info->IsLocal())
02344         {
02345             PreviewGenerator *pg = new PreviewGenerator(
02346                 program_info, QString(), PreviewGenerator::kLocal);
02347             pg->Run();
02348             pg->deleteLater();
02349         }
02350     }
02351 
02352     msg = tr("Commercial Detection %1", "Job ID")
02353         .arg(StatusText(GetJobStatus(jobID)));
02354 
02355     if (!comment.isEmpty())
02356     {
02357         detailstr += QString(" (%1)").arg(comment);
02358         details = detailstr.toLocal8Bit();
02359     }
02360 
02361     if (priority <= LOG_WARNING)
02362         LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details.constData());
02363 
02364     RemoveRunningJob(jobID);
02365     runningJobsLock->unlock();
02366 }
02367 
02368 void *JobQueue::UserJobThread(void *param)
02369 {
02370     JobThreadStruct *jts = (JobThreadStruct *)param;
02371     JobQueue *jq = jts->jq;
02372 
02373     MThread::ThreadSetup(QString("UserJob_%1").arg(jts->jobID));
02374     jq->DoUserJobThread(jts->jobID);
02375     MThread::ThreadCleanup();
02376 
02377     delete jts;
02378 
02379     return NULL;
02380 }
02381 
02382 void JobQueue::DoUserJobThread(int jobID)
02383 {
02384     runningJobsLock->lock();
02385     ProgramInfo *pginfo = runningJobs[jobID].pginfo;
02386     QString jobDesc = runningJobs[jobID].desc;
02387     QString command = runningJobs[jobID].command;
02388     runningJobsLock->unlock();
02389 
02390     ChangeJobStatus(jobID, JOB_RUNNING);
02391 
02392     QString msg;
02393 
02394     if (pginfo)
02395     {
02396         msg = QString("Started %1 for %2 recorded from channel %3")
02397             .arg(jobDesc)
02398             .arg(pginfo->toString(ProgramInfo::kTitleSubtitle))
02399             .arg(pginfo->toString(ProgramInfo::kRecordingKey));
02400     }
02401     else
02402         msg = QString("Started %1 for jobID %2").arg(jobDesc).arg(jobID);
02403 
02404     LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData()));
02405 
02406     switch (jobQueueCPU)
02407     {
02408         case  0: myth_nice(17);
02409                  myth_ioprio(8);
02410                  break;
02411         case  1: myth_nice(10);
02412                  myth_ioprio(7);
02413                  break;
02414         case  2:
02415         default: break;
02416     }
02417 
02418     LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
02419                                        .arg(command));
02420     GetMythDB()->GetDBManager()->CloseDatabases();
02421     uint result = myth_system(command);
02422 
02423     if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) ||
02424         (result == GENERIC_EXIT_CMD_NOT_FOUND))
02425     {
02426         msg = QString("User Job '%1' failed, unable to find "
02427                       "executable, check your PATH and backend logs.")
02428                       .arg(command);
02429         LOG(VB_GENERAL, LOG_ERR, LOC + msg);
02430         LOG(VB_GENERAL, LOG_NOTICE, LOC + QString("Current PATH: '%1'")
02431                                             .arg(getenv("PATH")));
02432 
02433         ChangeJobStatus(jobID, JOB_ERRORED,
02434             "ERROR: Unable to find executable, check backend logs.");
02435     }
02436     else if (result != 0)
02437     {
02438         msg = QString("User Job '%1' failed.").arg(command);
02439         LOG(VB_GENERAL, LOG_ERR, LOC + msg);
02440 
02441         ChangeJobStatus(jobID, JOB_ERRORED,
02442             "ERROR: User Job returned non-zero, check logs.");
02443     }
02444     else
02445     {
02446         if (pginfo)
02447         {
02448             msg = QString("Finished %1 for %2 recorded from channel %3")
02449                 .arg(jobDesc)
02450                 .arg(pginfo->toString(ProgramInfo::kTitleSubtitle))
02451                 .arg(pginfo->toString(ProgramInfo::kRecordingKey));
02452         }
02453         else
02454             msg = QString("Finished %1 for jobID %2").arg(jobDesc).arg(jobID);
02455 
02456         LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData()));
02457 
02458         ChangeJobStatus(jobID, JOB_FINISHED, "Successfully Completed.");
02459 
02460         if (pginfo)
02461             pginfo->SendUpdateEvent();
02462     }
02463 
02464     RemoveRunningJob(jobID);
02465 }
02466 
02467 int JobQueue::UserJobTypeToIndex(int jobType)
02468 {
02469     if (jobType & JOB_USERJOB)
02470     {
02471         int x = ((jobType & JOB_USERJOB)>> 8);
02472         int bits = 1;
02473         while ((x != 0) && ((x & 0x01) == 0))
02474         {
02475             bits++;
02476             x = x >> 1;
02477         }
02478         if ( bits > 4 )
02479             return JOB_NONE;
02480 
02481         return bits;
02482     }
02483     return JOB_NONE;
02484 }
02485 
02486 /* vim: set expandtab tabstop=4 shiftwidth=4: */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends