|
MythTV
0.26-pre
|
00001 00002 #include <unistd.h> 00003 #include <sys/types.h> 00004 #include <sys/stat.h> 00005 #include <iostream> 00006 #include <cstdlib> 00007 #include <fcntl.h> 00008 #include <pthread.h> 00009 using namespace std; 00010 00011 #include <QDateTime> 00012 #include <QFileInfo> 00013 #include <QRegExp> 00014 #include <QEvent> 00015 #include <QCoreApplication> 00016 00017 #include "mythconfig.h" 00018 00019 #include "exitcodes.h" 00020 #include "jobqueue.h" 00021 #include "programinfo.h" 00022 #include "mythcorecontext.h" 00023 #include "mythmiscutil.h" 00024 #include "previewgenerator.h" 00025 #include "compat.h" 00026 #include "recordingprofile.h" 00027 #include "recordinginfo.h" 00028 #include "mthread.h" 00029 00030 #include "mythdb.h" 00031 #include "mythdirs.h" 00032 #include "mythlogging.h" 00033 00034 #ifndef O_STREAMING 00035 #define O_STREAMING 0 00036 #endif 00037 00038 #ifndef O_LARGEFILE 00039 #define O_LARGEFILE 0 00040 #endif 00041 00042 #define LOC QString("JobQueue: ") 00043 00044 JobQueue::JobQueue(bool master) : 00045 m_hostname(gCoreContext->GetHostName()), 00046 jobsRunning(0), 00047 jobQueueCPU(0), 00048 m_pginfo(NULL), 00049 runningJobsLock(new QMutex(QMutex::Recursive)), 00050 isMaster(master), 00051 queueThread(new MThread("JobQueue", this)), 00052 processQueue(false) 00053 { 00054 jobQueueCPU = gCoreContext->GetNumSetting("JobQueueCPU", 0); 00055 00056 #ifndef USING_VALGRIND 00057 QMutexLocker locker(&queueThreadCondLock); 00058 processQueue = true; 00059 queueThread->start(); 00060 #else 00061 LOG(VB_GENERAL, LOG_ERR, LOC + 00062 "The JobQueue has been disabled because " 00063 "you compiled with the --enable-valgrind option."); 00064 #endif // USING_VALGRIND 00065 00066 gCoreContext->addListener(this); 00067 } 00068 00069 JobQueue::~JobQueue(void) 00070 { 00071 queueThreadCondLock.lock(); 00072 processQueue = false; 00073 queueThreadCond.wakeAll(); 00074 queueThreadCondLock.unlock(); 00075 00076 queueThread->wait(); 00077 delete queueThread; 00078 queueThread = NULL; 00079 00080 gCoreContext->removeListener(this); 00081 00082 delete runningJobsLock; 00083 } 00084 00085 void JobQueue::customEvent(QEvent *e) 00086 { 00087 if ((MythEvent::Type)(e->type()) == MythEvent::MythEventMessage) 00088 { 00089 MythEvent *me = (MythEvent *)e; 00090 QString message = me->Message(); 00091 00092 if (message.left(9) == "LOCAL_JOB") 00093 { 00094 // LOCAL_JOB action ID jobID 00095 // LOCAL_JOB action type chanid recstartts hostname 00096 QString msg; 00097 message = message.simplified(); 00098 QStringList tokens = message.split(" ", QString::SkipEmptyParts); 00099 QString action = tokens[1]; 00100 int jobID = -1; 00101 00102 if (tokens[2] == "ID") 00103 jobID = tokens[3].toInt(); 00104 else 00105 { 00106 jobID = GetJobID( 00107 tokens[2].toInt(), 00108 tokens[3].toUInt(), 00109 QDateTime::fromString(tokens[4], Qt::ISODate)); 00110 } 00111 00112 runningJobsLock->lock(); 00113 if (!runningJobs.contains(jobID)) 00114 { 00115 msg = QString("Unable to determine jobID for message: " 00116 "%1. Program will not be flagged.") 00117 .arg(message); 00118 LOG(VB_GENERAL, LOG_ERR, LOC + msg); 00119 runningJobsLock->unlock(); 00120 return; 00121 } 00122 runningJobsLock->unlock(); 00123 00124 msg = QString("Received message '%1'").arg(message); 00125 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg); 00126 00127 if ((action == "STOP") || 00128 (action == "PAUSE") || 00129 (action == "RESTART") || 00130 (action == "RESUME" )) 00131 { 00132 runningJobsLock->lock(); 00133 00134 if (action == "STOP") 00135 runningJobs[jobID].flag = JOB_STOP; 00136 else if (action == "PAUSE") 00137 runningJobs[jobID].flag = JOB_PAUSE; 00138 else if (action == "RESUME") 00139 runningJobs[jobID].flag = JOB_RUN; 00140 else if (action == "RESTART") 00141 runningJobs[jobID].flag = JOB_RESTART; 00142 00143 runningJobsLock->unlock(); 00144 } 00145 } 00146 } 00147 } 00148 00149 void JobQueue::run(void) 00150 { 00151 queueThreadCondLock.lock(); 00152 queueThreadCond.wakeAll(); 00153 queueThreadCondLock.unlock(); 00154 00155 RecoverQueue(); 00156 00157 queueThreadCondLock.lock(); 00158 queueThreadCond.wait(&queueThreadCondLock, 10 * 1000); 00159 queueThreadCondLock.unlock(); 00160 00161 ProcessQueue(); 00162 } 00163 00164 void JobQueue::ProcessQueue(void) 00165 { 00166 LOG(VB_JOBQUEUE, LOG_INFO, LOC + "ProcessQueue() started"); 00167 00168 QString logInfo; 00169 int jobID; 00170 int cmds; 00171 int flags; 00172 int status; 00173 QString hostname; 00174 int sleepTime; 00175 00176 QMap<int, int> jobStatus; 00177 int maxJobs; 00178 QString message; 00179 QMap<int, JobQueueEntry> jobs; 00180 bool atMax = false; 00181 bool inTimeWindow = true; 00182 bool startedJobAlready = false; 00183 QMap<int, RunningJobInfo>::Iterator rjiter; 00184 00185 QMutexLocker locker(&queueThreadCondLock); 00186 while (processQueue) 00187 { 00188 locker.unlock(); 00189 00190 startedJobAlready = false; 00191 sleepTime = gCoreContext->GetNumSetting("JobQueueCheckFrequency", 30); 00192 maxJobs = gCoreContext->GetNumSetting("JobQueueMaxSimultaneousJobs", 3); 00193 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 00194 QString("Currently set to run up to %1 job(s) max.") 00195 .arg(maxJobs)); 00196 00197 jobStatus.clear(); 00198 00199 runningJobsLock->lock(); 00200 for (rjiter = runningJobs.begin(); rjiter != runningJobs.end(); 00201 ++rjiter) 00202 { 00203 if ((*rjiter).pginfo) 00204 (*rjiter).pginfo->UpdateInUseMark(); 00205 } 00206 runningJobsLock->unlock(); 00207 00208 jobsRunning = 0; 00209 GetJobsInQueue(jobs); 00210 00211 if (jobs.size()) 00212 { 00213 inTimeWindow = InJobRunWindow(); 00214 for (int x = 0; x < jobs.size(); x++) 00215 { 00216 status = jobs[x].status; 00217 hostname = jobs[x].hostname; 00218 00219 if (((status == JOB_RUNNING) || 00220 (status == JOB_STARTING) || 00221 (status == JOB_PAUSED)) && 00222 (hostname == m_hostname)) 00223 jobsRunning++; 00224 } 00225 00226 message = QString("Currently Running %1 jobs.") 00227 .arg(jobsRunning); 00228 if (!inTimeWindow) 00229 { 00230 message += QString(" Jobs in Queue, but we are outside of the " 00231 "Job Queue time window, no new jobs can be " 00232 "started."); 00233 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00234 } 00235 else if (jobsRunning >= maxJobs) 00236 { 00237 message += " (At Maximum, no new jobs can be started until " 00238 "a running job completes)"; 00239 00240 if (!atMax) 00241 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00242 00243 atMax = true; 00244 } 00245 else 00246 { 00247 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00248 atMax = false; 00249 } 00250 00251 00252 for ( int x = 0; 00253 (x < jobs.size()) && (jobsRunning < maxJobs); x++) 00254 { 00255 jobID = jobs[x].id; 00256 cmds = jobs[x].cmds; 00257 flags = jobs[x].flags; 00258 status = jobs[x].status; 00259 hostname = jobs[x].hostname; 00260 00261 if (!jobs[x].chanid) 00262 logInfo = QString("jobID #%1").arg(jobID); 00263 else 00264 logInfo = QString("chanid %1 @ %2").arg(jobs[x].chanid) 00265 .arg(jobs[x].startts); 00266 00267 // Should we even be looking at this job? 00268 if ((inTimeWindow) && 00269 (!hostname.isEmpty()) && 00270 (hostname != m_hostname)) 00271 { 00272 // Setting the status here will prevent us from processing 00273 // any other jobs for this recording until this one is 00274 // completed on the remote host. 00275 jobStatus[jobID] = status; 00276 00277 message = QString("Skipping '%1' job for %2, " 00278 "should run on '%3' instead") 00279 .arg(JobText(jobs[x].type)).arg(logInfo) 00280 .arg(hostname); 00281 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00282 continue; 00283 } 00284 00285 // Check to see if there was a previous job that is not done 00286 if (inTimeWindow) 00287 { 00288 int otherJobID = GetRunningJobID(jobs[x].chanid, 00289 jobs[x].recstartts); 00290 if (otherJobID && (jobStatus.contains(otherJobID)) && 00291 (!(jobStatus[otherJobID] & JOB_DONE))) 00292 { 00293 message = 00294 QString("Skipping '%1' job for %2, " 00295 "Job ID %3 is already running for " 00296 "this recording with a status of '%4'") 00297 .arg(JobText(jobs[x].type)).arg(logInfo) 00298 .arg(otherJobID) 00299 .arg(StatusText(jobStatus[otherJobID])); 00300 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00301 continue; 00302 } 00303 } 00304 00305 jobStatus[jobID] = status; 00306 00307 // Are we allowed to run this job? 00308 if ((inTimeWindow) && (!AllowedToRun(jobs[x]))) 00309 { 00310 message = QString("Skipping '%1' job for %2, " 00311 "not allowed to run on this backend.") 00312 .arg(JobText(jobs[x].type)).arg(logInfo); 00313 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00314 continue; 00315 } 00316 00317 // Is this job scheduled for the future 00318 if (jobs[x].schedruntime > QDateTime::currentDateTime()) 00319 { 00320 message = QString("Skipping '%1' job for %2, this job is " 00321 "not scheduled to run until %3.") 00322 .arg(JobText(jobs[x].type)).arg(logInfo) 00323 .arg(jobs[x].schedruntime 00324 .toString(Qt::ISODate)); 00325 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00326 continue; 00327 } 00328 00329 if (cmds & JOB_STOP) 00330 { 00331 // if we're trying to stop a job and it's not queued 00332 // then lets send a STOP command 00333 if (status != JOB_QUEUED) { 00334 message = QString("Stopping '%1' job for %2") 00335 .arg(JobText(jobs[x].type)) 00336 .arg(logInfo); 00337 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00338 00339 runningJobsLock->lock(); 00340 if (runningJobs.contains(jobID)) 00341 runningJobs[jobID].flag = JOB_STOP; 00342 runningJobsLock->unlock(); 00343 00344 // ChangeJobCmds(m_db, jobID, JOB_RUN); 00345 continue; 00346 00347 // if we're trying to stop a job and it's still queued 00348 // then let's just change the status to cancelled so 00349 // we don't try to run it from the queue 00350 } else { 00351 message = QString("Cancelling '%1' job for %2") 00352 .arg(JobText(jobs[x].type)) 00353 .arg(logInfo); 00354 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00355 00356 // at the bottom of this loop we requeue any jobs that 00357 // are not currently queued and also not associated 00358 // with a hostname so we must claim this job before we 00359 // can cancel it 00360 if (!ChangeJobHost(jobID, m_hostname)) 00361 { 00362 message = QString("Unable to claim '%1' job for %2") 00363 .arg(JobText(jobs[x].type)) 00364 .arg(logInfo); 00365 LOG(VB_JOBQUEUE, LOG_ERR, LOC + message); 00366 continue; 00367 } 00368 00369 ChangeJobStatus(jobID, JOB_CANCELLED, ""); 00370 ChangeJobCmds(jobID, JOB_RUN); 00371 continue; 00372 } 00373 } 00374 00375 if ((cmds & JOB_PAUSE) && (status != JOB_QUEUED)) 00376 { 00377 message = QString("Pausing '%1' job for %2") 00378 .arg(JobText(jobs[x].type)).arg(logInfo); 00379 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00380 00381 runningJobsLock->lock(); 00382 if (runningJobs.contains(jobID)) 00383 runningJobs[jobID].flag = JOB_PAUSE; 00384 runningJobsLock->unlock(); 00385 00386 ChangeJobCmds(jobID, JOB_RUN); 00387 continue; 00388 } 00389 00390 if ((cmds & JOB_RESTART) && (status != JOB_QUEUED)) 00391 { 00392 message = QString("Restart '%1' job for %2") 00393 .arg(JobText(jobs[x].type)).arg(logInfo); 00394 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00395 00396 runningJobsLock->lock(); 00397 if (runningJobs.contains(jobID)) 00398 runningJobs[jobID].flag = JOB_RUN; 00399 runningJobsLock->unlock(); 00400 00401 ChangeJobCmds(jobID, JOB_RUN); 00402 continue; 00403 } 00404 00405 if (status != JOB_QUEUED) 00406 { 00407 00408 if (hostname.isEmpty()) 00409 { 00410 message = QString("Resetting '%1' job for %2 to %3 " 00411 "status, because no hostname is set.") 00412 .arg(JobText(jobs[x].type)) 00413 .arg(logInfo) 00414 .arg(StatusText(JOB_QUEUED)); 00415 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00416 00417 ChangeJobStatus(jobID, JOB_QUEUED, ""); 00418 ChangeJobCmds(jobID, JOB_RUN); 00419 } 00420 else if (inTimeWindow) 00421 { 00422 message = QString("Skipping '%1' job for %2, " 00423 "current job status is '%3'") 00424 .arg(JobText(jobs[x].type)) 00425 .arg(logInfo) 00426 .arg(StatusText(status)); 00427 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00428 } 00429 continue; 00430 } 00431 00432 // never start or claim more than one job in a single run 00433 if (startedJobAlready) 00434 continue; 00435 00436 if ((inTimeWindow) && 00437 (hostname.isEmpty()) && 00438 (!ChangeJobHost(jobID, m_hostname))) 00439 { 00440 message = QString("Unable to claim '%1' job for %2") 00441 .arg(JobText(jobs[x].type)).arg(logInfo); 00442 LOG(VB_JOBQUEUE, LOG_ERR, LOC + message); 00443 continue; 00444 } 00445 00446 if (!inTimeWindow) 00447 { 00448 message = QString("Skipping '%1' job for %2, " 00449 "current time is outside of the " 00450 "Job Queue processing window.") 00451 .arg(JobText(jobs[x].type)).arg(logInfo); 00452 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00453 continue; 00454 } 00455 00456 message = QString("Processing '%1' job for %2, " 00457 "current status is '%3'") 00458 .arg(JobText(jobs[x].type)).arg(logInfo) 00459 .arg(StatusText(status)); 00460 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00461 00462 ProcessJob(jobs[x]); 00463 00464 startedJobAlready = true; 00465 } 00466 } 00467 00468 if (QCoreApplication::applicationName() == MYTH_APPNAME_MYTHJOBQUEUE) 00469 { 00470 if (jobsRunning > 0) 00471 { 00472 if (!(gCoreContext->IsBlockingClient())) 00473 { 00474 gCoreContext->BlockShutdown(); 00475 LOG(VB_JOBQUEUE, LOG_INFO, QString("%1 jobs running. " 00476 "Blocking shutdown.").arg(jobsRunning)); 00477 } 00478 } 00479 else 00480 { 00481 if (gCoreContext->IsBlockingClient()) 00482 { 00483 gCoreContext->AllowShutdown(); 00484 LOG(VB_JOBQUEUE, LOG_INFO, "No jobs running. " 00485 "Allowing shutdown."); 00486 } 00487 } 00488 } 00489 00490 00491 locker.relock(); 00492 if (processQueue) 00493 { 00494 int st = (startedJobAlready) ? (5 * 1000) : (sleepTime * 1000); 00495 if (st > 0) 00496 queueThreadCond.wait(locker.mutex(), st); 00497 } 00498 } 00499 } 00500 00501 bool JobQueue::QueueRecordingJobs(const RecordingInfo &recinfo, int jobTypes) 00502 { 00503 if (jobTypes == JOB_NONE) 00504 jobTypes = recinfo.GetAutoRunJobs(); 00505 00506 if (recinfo.IsCommercialFree()) 00507 jobTypes &= (~JOB_COMMFLAG); 00508 00509 if (jobTypes != JOB_NONE) 00510 { 00511 QString jobHost = QString(""); 00512 00513 if (gCoreContext->GetNumSetting("JobsRunOnRecordHost", 0)) 00514 jobHost = recinfo.GetHostname(); 00515 00516 return JobQueue::QueueJobs( 00517 jobTypes, recinfo.GetChanID(), recinfo.GetRecordingStartTime(), 00518 "", "", jobHost); 00519 } 00520 else 00521 return false; 00522 00523 return true; 00524 } 00525 00526 bool JobQueue::QueueJob(int jobType, uint chanid, const QDateTime &recstartts, 00527 QString args, QString comment, QString host, 00528 int flags, int status, QDateTime schedruntime) 00529 { 00530 int tmpStatus = JOB_UNKNOWN; 00531 int tmpCmd = JOB_UNKNOWN; 00532 int jobID = -1; 00533 int chanidInt = -1; 00534 00535 if(!schedruntime.isValid()) 00536 schedruntime = QDateTime::currentDateTime(); 00537 00538 MSqlQuery query(MSqlQuery::InitCon()); 00539 00540 // In order to replace a job, we must have a chanid/recstartts combo 00541 if (chanid) 00542 { 00543 query.prepare("SELECT status, id, cmds FROM jobqueue " 00544 "WHERE chanid = :CHANID AND starttime = :STARTTIME " 00545 "AND type = :JOBTYPE;"); 00546 query.bindValue(":CHANID", chanid); 00547 query.bindValue(":STARTTIME", recstartts); 00548 query.bindValue(":JOBTYPE", jobType); 00549 00550 if (!query.exec()) 00551 { 00552 MythDB::DBError("Error in JobQueue::QueueJob()", query); 00553 return false; 00554 } 00555 else 00556 { 00557 if (query.next()) 00558 { 00559 tmpStatus = query.value(0).toInt(); 00560 jobID = query.value(1).toInt(); 00561 tmpCmd = query.value(2).toInt(); 00562 } 00563 } 00564 switch (tmpStatus) 00565 { 00566 case JOB_UNKNOWN: 00567 break; 00568 case JOB_STARTING: 00569 case JOB_RUNNING: 00570 case JOB_PAUSED: 00571 case JOB_STOPPING: 00572 case JOB_ERRORING: 00573 case JOB_ABORTING: 00574 return false; 00575 default: 00576 DeleteJob(jobID); 00577 break; 00578 } 00579 if (! (tmpStatus & JOB_DONE) && (tmpCmd & JOB_STOP)) 00580 return false; 00581 00582 chanidInt = chanid; 00583 } 00584 00585 if (host.isNull()) 00586 host = QString(""); 00587 00588 query.prepare("INSERT INTO jobqueue (chanid, starttime, inserttime, type, " 00589 "status, statustime, schedruntime, hostname, args, comment, " 00590 "flags) " 00591 "VALUES (:CHANID, :STARTTIME, now(), :JOBTYPE, :STATUS, " 00592 "now(), :SCHEDRUNTIME, :HOST, :ARGS, :COMMENT, :FLAGS);"); 00593 00594 query.bindValue(":CHANID", chanidInt); 00595 query.bindValue(":STARTTIME", recstartts); 00596 query.bindValue(":JOBTYPE", jobType); 00597 query.bindValue(":STATUS", status); 00598 query.bindValue(":SCHEDRUNTIME", schedruntime); 00599 query.bindValue(":HOST", host); 00600 query.bindValue(":ARGS", args); 00601 query.bindValue(":COMMENT", comment); 00602 query.bindValue(":FLAGS", flags); 00603 00604 if (!query.exec()) 00605 { 00606 MythDB::DBError("Error in JobQueue::StartJob()", query); 00607 return false; 00608 } 00609 00610 return true; 00611 } 00612 00613 bool JobQueue::QueueJobs(int jobTypes, uint chanid, const QDateTime &recstartts, 00614 QString args, QString comment, QString host) 00615 { 00616 if (gCoreContext->GetNumSetting("AutoTranscodeBeforeAutoCommflag", 0)) 00617 { 00618 if (jobTypes & JOB_METADATA) 00619 QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host); 00620 if (jobTypes & JOB_TRANSCODE) 00621 QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host); 00622 if (jobTypes & JOB_COMMFLAG) 00623 QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host); 00624 } 00625 else 00626 { 00627 if (jobTypes & JOB_METADATA) 00628 QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host); 00629 if (jobTypes & JOB_COMMFLAG) 00630 QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host); 00631 if (jobTypes & JOB_TRANSCODE) 00632 { 00633 QDateTime schedruntime = QDateTime::currentDateTime(); 00634 00635 int defer = gCoreContext->GetNumSetting("DeferAutoTranscodeDays", 0); 00636 if (defer) 00637 { 00638 schedruntime = schedruntime.addDays(defer); 00639 schedruntime.setTime(QTime(0,0)); 00640 } 00641 00642 QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host, 00643 0, JOB_QUEUED, schedruntime); 00644 } 00645 } 00646 00647 if (jobTypes & JOB_USERJOB1) 00648 QueueJob(JOB_USERJOB1, chanid, recstartts, args, comment, host); 00649 if (jobTypes & JOB_USERJOB2) 00650 QueueJob(JOB_USERJOB2, chanid, recstartts, args, comment, host); 00651 if (jobTypes & JOB_USERJOB3) 00652 QueueJob(JOB_USERJOB3, chanid, recstartts, args, comment, host); 00653 if (jobTypes & JOB_USERJOB4) 00654 QueueJob(JOB_USERJOB4, chanid, recstartts, args, comment, host); 00655 00656 return true; 00657 } 00658 00659 int JobQueue::GetJobID(int jobType, uint chanid, const QDateTime &recstartts) 00660 { 00661 MSqlQuery query(MSqlQuery::InitCon()); 00662 00663 query.prepare("SELECT id FROM jobqueue " 00664 "WHERE chanid = :CHANID AND starttime = :STARTTIME " 00665 "AND type = :JOBTYPE;"); 00666 query.bindValue(":CHANID", chanid); 00667 query.bindValue(":STARTTIME", recstartts); 00668 query.bindValue(":JOBTYPE", jobType); 00669 00670 if (!query.exec()) 00671 { 00672 MythDB::DBError("Error in JobQueue::GetJobID()", query); 00673 return -1; 00674 } 00675 else 00676 { 00677 if (query.next()) 00678 return query.value(0).toInt(); 00679 } 00680 00681 return -1; 00682 } 00683 00684 bool JobQueue::GetJobInfoFromID( 00685 int jobID, int &jobType, uint &chanid, QDateTime &recstartts) 00686 { 00687 MSqlQuery query(MSqlQuery::InitCon()); 00688 00689 query.prepare("SELECT type, chanid, starttime FROM jobqueue " 00690 "WHERE id = :ID;"); 00691 00692 query.bindValue(":ID", jobID); 00693 00694 if (!query.exec()) 00695 { 00696 MythDB::DBError("Error in JobQueue::GetJobInfoFromID()", query); 00697 return false; 00698 } 00699 else 00700 { 00701 if (query.next()) 00702 { 00703 jobType = query.value(0).toInt(); 00704 chanid = query.value(1).toUInt(); 00705 recstartts = query.value(2).toDateTime(); 00706 return true; 00707 } 00708 } 00709 00710 return false; 00711 } 00712 00713 bool JobQueue::GetJobInfoFromID( 00714 int jobID, int &jobType, uint &chanid, QString &recstartts) 00715 { 00716 QDateTime tmpStarttime; 00717 00718 bool result = JobQueue::GetJobInfoFromID( 00719 jobID, jobType, chanid, tmpStarttime); 00720 00721 if (result) 00722 recstartts = tmpStarttime.toString("yyyyMMddhhmmss"); 00723 00724 return result; 00725 } 00726 00727 bool JobQueue::PauseJob(int jobID) 00728 { 00729 QString message = QString("GLOBAL_JOB PAUSE ID %1").arg(jobID); 00730 MythEvent me(message); 00731 gCoreContext->dispatch(me); 00732 00733 return ChangeJobCmds(jobID, JOB_PAUSE); 00734 } 00735 00736 bool JobQueue::ResumeJob(int jobID) 00737 { 00738 QString message = QString("GLOBAL_JOB RESUME ID %1").arg(jobID); 00739 MythEvent me(message); 00740 gCoreContext->dispatch(me); 00741 00742 return ChangeJobCmds(jobID, JOB_RESUME); 00743 } 00744 00745 bool JobQueue::RestartJob(int jobID) 00746 { 00747 QString message = QString("GLOBAL_JOB RESTART ID %1").arg(jobID); 00748 MythEvent me(message); 00749 gCoreContext->dispatch(me); 00750 00751 return ChangeJobCmds(jobID, JOB_RESTART); 00752 } 00753 00754 bool JobQueue::StopJob(int jobID) 00755 { 00756 QString message = QString("GLOBAL_JOB STOP ID %1").arg(jobID); 00757 MythEvent me(message); 00758 gCoreContext->dispatch(me); 00759 00760 return ChangeJobCmds(jobID, JOB_STOP); 00761 } 00762 00763 bool JobQueue::DeleteAllJobs(uint chanid, const QDateTime &recstartts) 00764 { 00765 MSqlQuery query(MSqlQuery::InitCon()); 00766 QString message; 00767 00768 query.prepare("UPDATE jobqueue SET status = :CANCELLED " 00769 "WHERE chanid = :CHANID AND starttime = :STARTTIME " 00770 "AND status = :QUEUED;"); 00771 00772 query.bindValue(":CANCELLED", JOB_CANCELLED); 00773 query.bindValue(":CHANID", chanid); 00774 query.bindValue(":STARTTIME", recstartts); 00775 query.bindValue(":QUEUED", JOB_QUEUED); 00776 00777 if (!query.exec()) 00778 MythDB::DBError("Cancel Pending Jobs", query); 00779 00780 query.prepare("UPDATE jobqueue SET cmds = :CMD " 00781 "WHERE chanid = :CHANID AND starttime = :STARTTIME " 00782 "AND status <> :CANCELLED;"); 00783 query.bindValue(":CMD", JOB_STOP); 00784 query.bindValue(":CHANID", chanid); 00785 query.bindValue(":STARTTIME", recstartts); 00786 query.bindValue(":CANCELLED", JOB_CANCELLED); 00787 00788 if (!query.exec()) 00789 { 00790 MythDB::DBError("Stop Unfinished Jobs", query); 00791 return false; 00792 } 00793 00794 // wait until running job(s) are done 00795 bool jobsAreRunning = true; 00796 int totalSlept = 0; 00797 int maxSleep = 90; 00798 while (jobsAreRunning && totalSlept < maxSleep) 00799 { 00800 usleep(1000); 00801 query.prepare("SELECT id FROM jobqueue " 00802 "WHERE chanid = :CHANID and starttime = :STARTTIME " 00803 "AND status NOT IN " 00804 "(:FINISHED,:ABORTED,:ERRORED,:CANCELLED);"); 00805 query.bindValue(":CHANID", chanid); 00806 query.bindValue(":STARTTIME", recstartts); 00807 query.bindValue(":FINISHED", JOB_FINISHED); 00808 query.bindValue(":ABORTED", JOB_ABORTED); 00809 query.bindValue(":ERRORED", JOB_ERRORED); 00810 query.bindValue(":CANCELLED", JOB_CANCELLED); 00811 00812 if (!query.exec()) 00813 { 00814 MythDB::DBError("Stop Unfinished Jobs", query); 00815 return false; 00816 } 00817 00818 if (query.size() == 0) 00819 { 00820 jobsAreRunning = false; 00821 break; 00822 } 00823 else if ((totalSlept % 5) == 0) 00824 { 00825 message = QString("Waiting on %1 jobs still running for " 00826 "chanid %2 @ %3").arg(query.size()) 00827 .arg(chanid).arg(recstartts.toString()); 00828 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message); 00829 } 00830 00831 sleep(1); 00832 totalSlept++; 00833 } 00834 00835 if (totalSlept <= maxSleep) 00836 { 00837 query.prepare("DELETE FROM jobqueue " 00838 "WHERE chanid = :CHANID AND starttime = :STARTTIME;"); 00839 query.bindValue(":CHANID", chanid); 00840 query.bindValue(":STARTTIME", recstartts); 00841 00842 if (!query.exec()) 00843 MythDB::DBError("Delete All Jobs", query); 00844 } 00845 else 00846 { 00847 query.prepare("SELECT id, type, status, comment FROM jobqueue " 00848 "WHERE chanid = :CHANID AND starttime = :STARTTIME " 00849 "AND status <> :CANCELLED ORDER BY id;"); 00850 00851 query.bindValue(":CHANID", chanid); 00852 query.bindValue(":STARTTIME", recstartts); 00853 query.bindValue(":CANCELLED", JOB_CANCELLED); 00854 00855 if (!query.exec()) 00856 { 00857 MythDB::DBError("Error in JobQueue::DeleteAllJobs(), Unable " 00858 "to query list of Jobs left in Queue.", query); 00859 return 0; 00860 } 00861 00862 LOG(VB_GENERAL, LOG_ERR, LOC + 00863 QString( "In DeleteAllJobs: There are Jobs " 00864 "left in the JobQueue that are still running for " 00865 "chanid %1 @ %2.").arg(chanid) 00866 .arg(recstartts.toString())); 00867 00868 while (query.next()) 00869 { 00870 LOG(VB_GENERAL, LOG_ERR, LOC + 00871 QString("Job ID %1: '%2' with status '%3' and comment '%4'") 00872 .arg(query.value(0).toInt()) 00873 .arg(JobText(query.value(1).toInt())) 00874 .arg(StatusText(query.value(2).toInt())) 00875 .arg(query.value(3).toString())); 00876 } 00877 00878 return false; 00879 } 00880 00881 return true; 00882 } 00883 00884 bool JobQueue::DeleteJob(int jobID) 00885 { 00886 if (jobID < 0) 00887 return false; 00888 00889 MSqlQuery query(MSqlQuery::InitCon()); 00890 00891 query.prepare("DELETE FROM jobqueue WHERE id = :ID;"); 00892 00893 query.bindValue(":ID", jobID); 00894 00895 if (!query.exec()) 00896 { 00897 MythDB::DBError("Error in JobQueue::DeleteJob()", query); 00898 return false; 00899 } 00900 00901 return true; 00902 } 00903 00904 bool JobQueue::ChangeJobCmds(int jobID, int newCmds) 00905 { 00906 if (jobID < 0) 00907 return false; 00908 00909 MSqlQuery query(MSqlQuery::InitCon()); 00910 00911 query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE id = :ID;"); 00912 00913 query.bindValue(":CMDS", newCmds); 00914 query.bindValue(":ID", jobID); 00915 00916 if (!query.exec()) 00917 { 00918 MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query); 00919 return false; 00920 } 00921 00922 return true; 00923 } 00924 00925 bool JobQueue::ChangeJobCmds(int jobType, uint chanid, 00926 const QDateTime &recstartts, int newCmds) 00927 { 00928 MSqlQuery query(MSqlQuery::InitCon()); 00929 00930 query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE type = :TYPE " 00931 "AND chanid = :CHANID AND starttime = :STARTTIME;"); 00932 00933 query.bindValue(":CMDS", newCmds); 00934 query.bindValue(":TYPE", jobType); 00935 query.bindValue(":CHANID", chanid); 00936 query.bindValue(":STARTTIME", recstartts); 00937 00938 if (!query.exec()) 00939 { 00940 MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query); 00941 return false; 00942 } 00943 00944 return true; 00945 } 00946 00947 bool JobQueue::ChangeJobFlags(int jobID, int newFlags) 00948 { 00949 if (jobID < 0) 00950 return false; 00951 00952 MSqlQuery query(MSqlQuery::InitCon()); 00953 00954 query.prepare("UPDATE jobqueue SET flags = :FLAGS WHERE id = :ID;"); 00955 00956 query.bindValue(":FLAGS", newFlags); 00957 query.bindValue(":ID", jobID); 00958 00959 if (!query.exec()) 00960 { 00961 MythDB::DBError("Error in JobQueue::ChangeJobFlags()", query); 00962 return false; 00963 } 00964 00965 return true; 00966 } 00967 00968 bool JobQueue::ChangeJobStatus(int jobID, int newStatus, QString comment) 00969 { 00970 if (jobID < 0) 00971 return false; 00972 00973 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobStatus(%1, %2, '%3')") 00974 .arg(jobID).arg(StatusText(newStatus)).arg(comment)); 00975 00976 MSqlQuery query(MSqlQuery::InitCon()); 00977 00978 query.prepare("UPDATE jobqueue SET status = :STATUS, comment = :COMMENT " 00979 "WHERE id = :ID AND status <> :NEWSTATUS;"); 00980 00981 query.bindValue(":STATUS", newStatus); 00982 query.bindValue(":COMMENT", comment); 00983 query.bindValue(":ID", jobID); 00984 query.bindValue(":NEWSTATUS", newStatus); 00985 00986 if (!query.exec()) 00987 { 00988 MythDB::DBError("Error in JobQueue::ChangeJobStatus()", query); 00989 return false; 00990 } 00991 00992 return true; 00993 } 00994 00995 bool JobQueue::ChangeJobComment(int jobID, QString comment) 00996 { 00997 if (jobID < 0) 00998 return false; 00999 01000 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobComment(%1, '%2')") 01001 .arg(jobID).arg(comment)); 01002 01003 MSqlQuery query(MSqlQuery::InitCon()); 01004 01005 query.prepare("UPDATE jobqueue SET comment = :COMMENT " 01006 "WHERE id = :ID;"); 01007 01008 query.bindValue(":COMMENT", comment); 01009 query.bindValue(":ID", jobID); 01010 01011 if (!query.exec()) 01012 { 01013 MythDB::DBError("Error in JobQueue::ChangeJobComment()", query); 01014 return false; 01015 } 01016 01017 return true; 01018 } 01019 01020 bool JobQueue::ChangeJobArgs(int jobID, QString args) 01021 { 01022 if (jobID < 0) 01023 return false; 01024 01025 MSqlQuery query(MSqlQuery::InitCon()); 01026 01027 query.prepare("UPDATE jobqueue SET args = :ARGS " 01028 "WHERE id = :ID;"); 01029 01030 query.bindValue(":ARGS", args); 01031 query.bindValue(":ID", jobID); 01032 01033 if (!query.exec()) 01034 { 01035 MythDB::DBError("Error in JobQueue::ChangeJobArgs()", query); 01036 return false; 01037 } 01038 01039 return true; 01040 } 01041 01042 int JobQueue::GetRunningJobID(uint chanid, const QDateTime &recstartts) 01043 { 01044 runningJobsLock->lock(); 01045 QMap<int, RunningJobInfo>::iterator it = runningJobs.begin(); 01046 for (; it != runningJobs.end(); ++it) 01047 { 01048 RunningJobInfo jInfo = *it; 01049 01050 if ((jInfo.pginfo->GetChanID() == chanid) && 01051 (jInfo.pginfo->GetRecordingStartTime() == recstartts)) 01052 { 01053 runningJobsLock->unlock(); 01054 01055 return jInfo.id; 01056 } 01057 } 01058 runningJobsLock->unlock(); 01059 01060 return 0; 01061 } 01062 01063 bool JobQueue::IsJobRunning(int jobType, 01064 uint chanid, const QDateTime &recstartts) 01065 { 01066 int tmpStatus = GetJobStatus(jobType, chanid, recstartts); 01067 01068 if ((tmpStatus != JOB_UNKNOWN) && (tmpStatus != JOB_QUEUED) && 01069 (!(tmpStatus & JOB_DONE))) 01070 return true; 01071 01072 return false; 01073 } 01074 01075 bool JobQueue::IsJobRunning(int jobType, const ProgramInfo &pginfo) 01076 { 01077 return JobQueue::IsJobRunning( 01078 jobType, pginfo.GetChanID(), pginfo.GetRecordingStartTime()); 01079 } 01080 01081 bool JobQueue::IsJobQueuedOrRunning( 01082 int jobType, uint chanid, const QDateTime &recstartts) 01083 { 01084 int tmpStatus = GetJobStatus(jobType, chanid, recstartts); 01085 01086 if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE))) 01087 return true; 01088 01089 return false; 01090 } 01091 01092 bool JobQueue::IsJobQueued( 01093 int jobType, uint chanid, const QDateTime &recstartts) 01094 { 01095 int tmpStatus = GetJobStatus(jobType, chanid, recstartts); 01096 01097 if (tmpStatus & JOB_QUEUED) 01098 return true; 01099 01100 return false; 01101 } 01102 01103 QString JobQueue::JobText(int jobType) 01104 { 01105 switch (jobType) 01106 { 01107 case JOB_TRANSCODE: return tr("Transcode"); 01108 case JOB_COMMFLAG: return tr("Flag Commercials"); 01109 case JOB_METADATA: return tr("Look up Metadata"); 01110 } 01111 01112 if (jobType & JOB_USERJOB) 01113 { 01114 QString settingName = 01115 QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType)); 01116 return gCoreContext->GetSetting(settingName, settingName); 01117 } 01118 01119 return tr("Unknown Job"); 01120 } 01121 01122 QString JobQueue::StatusText(int status) 01123 { 01124 switch (status) 01125 { 01126 #define JOBSTATUS_STATUSTEXT(A,B,C) case A: return C; 01127 JOBSTATUS_MAP(JOBSTATUS_STATUSTEXT) 01128 default: break; 01129 } 01130 return tr("Undefined"); 01131 } 01132 01133 bool JobQueue::InJobRunWindow(int orStartsWithinMins) 01134 { 01135 QString queueStartTimeStr; 01136 QString queueEndTimeStr; 01137 QTime queueStartTime; 01138 QTime queueEndTime; 01139 QTime curTime = QTime::currentTime(); 01140 bool inTimeWindow = false; 01141 orStartsWithinMins = orStartsWithinMins < 0 ? 0 : orStartsWithinMins; 01142 01143 queueStartTimeStr = gCoreContext->GetSetting("JobQueueWindowStart", "00:00"); 01144 queueEndTimeStr = gCoreContext->GetSetting("JobQueueWindowEnd", "23:59"); 01145 01146 queueStartTime = QTime::fromString(queueStartTimeStr, "hh:mm"); 01147 if (!queueStartTime.isValid()) 01148 { 01149 LOG(VB_GENERAL, LOG_ERR, 01150 QString("Invalid JobQueueWindowStart time '%1', using 00:00") 01151 .arg(queueStartTimeStr)); 01152 queueStartTime = QTime(0, 0); 01153 } 01154 01155 queueEndTime = QTime::fromString(queueEndTimeStr, "hh:mm"); 01156 if (!queueEndTime.isValid()) 01157 { 01158 LOG(VB_GENERAL, LOG_ERR, 01159 QString("Invalid JobQueueWindowEnd time '%1', using 23:59") 01160 .arg(queueEndTimeStr)); 01161 queueEndTime = QTime(23, 59); 01162 } 01163 01164 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01165 QString("Currently set to run new jobs from %1 to %2") 01166 .arg(queueStartTimeStr).arg(queueEndTimeStr)); 01167 01168 if ((queueStartTime <= curTime) && (curTime < queueEndTime)) 01169 { 01170 inTimeWindow = true; 01171 } 01172 else if ((queueStartTime > queueEndTime) && 01173 ((curTime < queueEndTime) || (queueStartTime <= curTime))) 01174 { 01175 inTimeWindow = true; 01176 } 01177 else if (orStartsWithinMins > 0) 01178 { 01179 // Check if the window starts soon 01180 if (curTime <= queueStartTime) 01181 { 01182 // Start time hasn't passed yet today 01183 if (queueStartTime.secsTo(curTime) <= (orStartsWithinMins * 60)) 01184 { 01185 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01186 QString("Job run window will start within %1 minutes") 01187 .arg(orStartsWithinMins)); 01188 inTimeWindow = true; 01189 } 01190 } 01191 else 01192 { 01193 // We passed the start time for today, try tomorrow 01194 QDateTime curDateTime = QDateTime::currentDateTime(); 01195 QDateTime startDateTime = QDateTime(QDate::currentDate(), 01196 queueStartTime).addDays(1); 01197 01198 if (curDateTime.secsTo(startDateTime) <= (orStartsWithinMins * 60)) 01199 { 01200 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01201 QString("Job run window will start " 01202 "within %1 minutes (tomorrow)") 01203 .arg(orStartsWithinMins)); 01204 inTimeWindow = true; 01205 } 01206 } 01207 } 01208 01209 return inTimeWindow; 01210 } 01211 01212 bool JobQueue::HasRunningOrPendingJobs(int startingWithinMins) 01213 { 01214 /* startingWithinMins <= 0 - look for any pending jobs 01215 > 0 - only consider pending starting within this time */ 01216 QMap<int, JobQueueEntry> jobs; 01217 QMap<int, JobQueueEntry>::Iterator it; 01218 QDateTime maxSchedRunTime = QDateTime::currentDateTime(); 01219 int tmpStatus = 0; 01220 bool checkForQueuedJobs = (startingWithinMins <= 0 01221 || InJobRunWindow(startingWithinMins)); 01222 01223 if (checkForQueuedJobs && startingWithinMins > 0) { 01224 maxSchedRunTime = maxSchedRunTime.addSecs(startingWithinMins * 60); 01225 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01226 QString("HasRunningOrPendingJobs: checking for jobs " 01227 "starting before: %1").arg(maxSchedRunTime.toString())); 01228 } 01229 01230 JobQueue::GetJobsInQueue(jobs, JOB_LIST_NOT_DONE); 01231 01232 if (jobs.size()) { 01233 for (it = jobs.begin(); it != jobs.end(); ++it) 01234 { 01235 tmpStatus = (*it).status; 01236 if (tmpStatus == JOB_RUNNING) { 01237 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01238 QString("HasRunningOrPendingJobs: found running job")); 01239 return true; 01240 } 01241 01242 if (checkForQueuedJobs) { 01243 if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE))) { 01244 if (startingWithinMins <= 0) { 01245 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01246 "HasRunningOrPendingJobs: found pending job"); 01247 return true; 01248 } 01249 else if ((*it).schedruntime <= maxSchedRunTime) { 01250 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01251 QString("HasRunningOrPendingJobs: found pending " 01252 "job scheduled to start at: %1") 01253 .arg((*it).schedruntime.toString())); 01254 return true; 01255 } 01256 } 01257 } 01258 } 01259 } 01260 return false; 01261 } 01262 01263 01264 int JobQueue::GetJobsInQueue(QMap<int, JobQueueEntry> &jobs, int findJobs) 01265 { 01266 JobQueueEntry thisJob; 01267 MSqlQuery query(MSqlQuery::InitCon()); 01268 QDateTime recentDate = QDateTime::currentDateTime().addSecs(-4 * 3600); 01269 QString logInfo; 01270 int jobCount = 0; 01271 bool commflagWhileRecording = 01272 gCoreContext->GetNumSetting("AutoCommflagWhileRecording", 0); 01273 01274 jobs.clear(); 01275 01276 query.prepare("SELECT j.id, j.chanid, j.starttime, j.inserttime, j.type, " 01277 "j.cmds, j.flags, j.status, j.statustime, j.hostname, " 01278 "j.args, j.comment, r.endtime, j.schedruntime " 01279 "FROM jobqueue j " 01280 "LEFT JOIN recorded r " 01281 " ON j.chanid = r.chanid AND j.starttime = r.starttime " 01282 "ORDER BY j.schedruntime, j.id;"); 01283 01284 if (!query.exec()) 01285 { 01286 MythDB::DBError("Error in JobQueue::GetJobs(), Unable to " 01287 "query list of Jobs in Queue.", query); 01288 return 0; 01289 } 01290 01291 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01292 QString("GetJobsInQueue: findJobs search bitmask %1, " 01293 "found %2 total jobs") 01294 .arg(findJobs).arg(query.size())); 01295 01296 while (query.next()) 01297 { 01298 bool wantThisJob = false; 01299 01300 thisJob.id = query.value(0).toInt(); 01301 thisJob.recstartts = query.value(2).toDateTime(); 01302 thisJob.schedruntime = query.value(13).toDateTime(); 01303 thisJob.type = query.value(4).toInt(); 01304 thisJob.status = query.value(7).toInt(); 01305 thisJob.statustime = query.value(8).toDateTime(); 01306 thisJob.startts = thisJob.recstartts.toString("yyyyMMddhhmmss"); 01307 01308 // -1 indicates the chanid is empty 01309 if (query.value(1).toInt() == -1) 01310 { 01311 thisJob.chanid = 0; 01312 logInfo = QString("jobID #%1").arg(thisJob.id); 01313 } 01314 else 01315 { 01316 thisJob.chanid = query.value(1).toUInt(); 01317 logInfo = QString("chanid %1 @ %2").arg(thisJob.chanid) 01318 .arg(thisJob.startts); 01319 } 01320 01321 if ((query.value(12).toDateTime() > QDateTime::currentDateTime()) && 01322 ((!commflagWhileRecording) || 01323 ((thisJob.type != JOB_COMMFLAG) && 01324 (thisJob.type != JOB_METADATA)))) 01325 { 01326 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01327 QString("GetJobsInQueue: Ignoring '%1' Job " 01328 "for %2 in %3 state. Endtime in future.") 01329 .arg(JobText(thisJob.type)) 01330 .arg(logInfo).arg(StatusText(thisJob.status))); 01331 continue; 01332 } 01333 01334 if ((findJobs & JOB_LIST_ALL) || 01335 ((findJobs & JOB_LIST_DONE) && 01336 (thisJob.status & JOB_DONE)) || 01337 ((findJobs & JOB_LIST_NOT_DONE) && 01338 (!(thisJob.status & JOB_DONE))) || 01339 ((findJobs & JOB_LIST_ERROR) && 01340 (thisJob.status == JOB_ERRORED)) || 01341 ((findJobs & JOB_LIST_RECENT) && 01342 (thisJob.statustime > recentDate))) 01343 wantThisJob = true; 01344 01345 if (!wantThisJob) 01346 { 01347 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01348 QString("GetJobsInQueue: Ignore '%1' Job for %2 in %3 state.") 01349 .arg(JobText(thisJob.type)) 01350 .arg(logInfo).arg(StatusText(thisJob.status))); 01351 continue; 01352 } 01353 01354 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01355 QString("GetJobsInQueue: Found '%1' Job for %2 in %3 state.") 01356 .arg(JobText(thisJob.type)) 01357 .arg(logInfo).arg(StatusText(thisJob.status))); 01358 01359 thisJob.inserttime = query.value(3).toDateTime(); 01360 thisJob.cmds = query.value(5).toInt(); 01361 thisJob.flags = query.value(6).toInt(); 01362 thisJob.hostname = query.value(9).toString(); 01363 thisJob.args = query.value(10).toString(); 01364 thisJob.comment = query.value(11).toString(); 01365 01366 if ((thisJob.type & JOB_USERJOB) && 01367 (UserJobTypeToIndex(thisJob.type) == 0)) 01368 { 01369 thisJob.type = JOB_NONE; 01370 LOG(VB_JOBQUEUE, LOG_INFO, LOC + 01371 QString("GetJobsInQueue: Unknown Job Type: %1") 01372 .arg(thisJob.type)); 01373 } 01374 01375 if (thisJob.type != JOB_NONE) 01376 jobs[jobCount++] = thisJob; 01377 } 01378 01379 return jobCount; 01380 } 01381 01382 bool JobQueue::ChangeJobHost(int jobID, QString newHostname) 01383 { 01384 MSqlQuery query(MSqlQuery::InitCon()); 01385 01386 if (!newHostname.isEmpty()) 01387 { 01388 query.prepare("UPDATE jobqueue SET hostname = :NEWHOSTNAME " 01389 "WHERE hostname = :EMPTY AND id = :ID;"); 01390 query.bindValue(":NEWHOSTNAME", newHostname); 01391 query.bindValue(":EMPTY", ""); 01392 query.bindValue(":ID", jobID); 01393 } 01394 else 01395 { 01396 query.prepare("UPDATE jobqueue SET hostname = :EMPTY " 01397 "WHERE id = :ID;"); 01398 query.bindValue(":EMPTY", ""); 01399 query.bindValue(":ID", jobID); 01400 } 01401 01402 if (!query.exec()) 01403 { 01404 MythDB::DBError(QString("Error in JobQueue::ChangeJobHost(), " 01405 "Unable to set hostname to '%1' for " 01406 "job %2.").arg(newHostname).arg(jobID), 01407 query); 01408 return false; 01409 } 01410 01411 if (query.numRowsAffected() > 0) 01412 return true; 01413 01414 return false; 01415 } 01416 01417 bool JobQueue::AllowedToRun(JobQueueEntry job) 01418 { 01419 QString allowSetting; 01420 01421 if ((!job.hostname.isEmpty()) && 01422 (job.hostname != m_hostname)) 01423 return false; 01424 01425 if (job.type & JOB_USERJOB) 01426 { 01427 allowSetting = 01428 QString("JobAllowUserJob%1").arg(UserJobTypeToIndex(job.type)); 01429 } 01430 else 01431 { 01432 switch (job.type) 01433 { 01434 case JOB_TRANSCODE: allowSetting = "JobAllowTranscode"; 01435 break; 01436 case JOB_COMMFLAG: allowSetting = "JobAllowCommFlag"; 01437 break; 01438 case JOB_METADATA: allowSetting = "JobAllowMetadata"; 01439 break; 01440 default: return false; 01441 } 01442 } 01443 01444 if (gCoreContext->GetNumSetting(allowSetting, 1)) 01445 return true; 01446 01447 return false; 01448 } 01449 01450 enum JobCmds JobQueue::GetJobCmd(int jobID) 01451 { 01452 MSqlQuery query(MSqlQuery::InitCon()); 01453 01454 query.prepare("SELECT cmds FROM jobqueue WHERE id = :ID;"); 01455 01456 query.bindValue(":ID", jobID); 01457 01458 if (query.exec()) 01459 { 01460 if (query.next()) 01461 return (enum JobCmds)query.value(0).toInt(); 01462 } 01463 else 01464 { 01465 MythDB::DBError("Error in JobQueue::GetJobCmd()", query); 01466 } 01467 01468 return JOB_RUN; 01469 } 01470 01471 QString JobQueue::GetJobArgs(int jobID) 01472 { 01473 MSqlQuery query(MSqlQuery::InitCon()); 01474 01475 query.prepare("SELECT args FROM jobqueue WHERE id = :ID;"); 01476 01477 query.bindValue(":ID", jobID); 01478 01479 if (query.exec()) 01480 { 01481 if (query.next()) 01482 return query.value(0).toString(); 01483 } 01484 else 01485 { 01486 MythDB::DBError("Error in JobQueue::GetJobArgs()", query); 01487 } 01488 01489 return QString(""); 01490 } 01491 01492 enum JobFlags JobQueue::GetJobFlags(int jobID) 01493 { 01494 MSqlQuery query(MSqlQuery::InitCon()); 01495 01496 query.prepare("SELECT flags FROM jobqueue WHERE id = :ID;"); 01497 01498 query.bindValue(":ID", jobID); 01499 01500 if (query.exec()) 01501 { 01502 if (query.next()) 01503 return (enum JobFlags)query.value(0).toInt(); 01504 } 01505 else 01506 { 01507 MythDB::DBError("Error in JobQueue::GetJobFlags()", query); 01508 } 01509 01510 return JOB_NO_FLAGS; 01511 } 01512 01513 enum JobStatus JobQueue::GetJobStatus(int jobID) 01514 { 01515 MSqlQuery query(MSqlQuery::InitCon()); 01516 01517 query.prepare("SELECT status FROM jobqueue WHERE id = :ID;"); 01518 01519 query.bindValue(":ID", jobID); 01520 01521 if (query.exec()) 01522 { 01523 if (query.next()) 01524 return (enum JobStatus)query.value(0).toInt(); 01525 } 01526 else 01527 { 01528 MythDB::DBError("Error in JobQueue::GetJobStatus()", query); 01529 } 01530 return JOB_UNKNOWN; 01531 } 01532 01533 enum JobStatus JobQueue::GetJobStatus( 01534 int jobType, uint chanid, const QDateTime &recstartts) 01535 { 01536 MSqlQuery query(MSqlQuery::InitCon()); 01537 01538 query.prepare("SELECT status FROM jobqueue WHERE type = :TYPE " 01539 "AND chanid = :CHANID AND starttime = :STARTTIME;"); 01540 01541 query.bindValue(":TYPE", jobType); 01542 query.bindValue(":CHANID", chanid); 01543 query.bindValue(":STARTTIME", recstartts); 01544 01545 if (query.exec()) 01546 { 01547 if (query.next()) 01548 return (enum JobStatus)query.value(0).toInt(); 01549 } 01550 else 01551 { 01552 MythDB::DBError("Error in JobQueue::GetJobStatus()", query); 01553 } 01554 return JOB_UNKNOWN; 01555 } 01556 01557 void JobQueue::RecoverQueue(bool justOld) 01558 { 01559 QMap<int, JobQueueEntry> jobs; 01560 QString msg; 01561 QString logInfo; 01562 01563 msg = QString("RecoverQueue: Checking for unfinished jobs to " 01564 "recover."); 01565 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg); 01566 01567 GetJobsInQueue(jobs); 01568 01569 if (jobs.size()) 01570 { 01571 QMap<int, JobQueueEntry>::Iterator it; 01572 QDateTime oldDate = QDateTime::currentDateTime().addDays(-1); 01573 QString hostname = gCoreContext->GetHostName(); 01574 int tmpStatus; 01575 int tmpCmds; 01576 01577 for (it = jobs.begin(); it != jobs.end(); ++it) 01578 { 01579 tmpCmds = (*it).cmds; 01580 tmpStatus = (*it).status; 01581 01582 if (!(*it).chanid) 01583 logInfo = QString("jobID #%1").arg((*it).id); 01584 else 01585 logInfo = QString("chanid %1 @ %2").arg((*it).chanid) 01586 .arg((*it).startts); 01587 01588 if (((tmpStatus == JOB_STARTING) || 01589 (tmpStatus == JOB_RUNNING) || 01590 (tmpStatus == JOB_PAUSED) || 01591 (tmpCmds & JOB_STOP) || 01592 (tmpStatus == JOB_STOPPING)) && 01593 (((!justOld) && 01594 ((*it).hostname == hostname)) || 01595 ((*it).statustime < oldDate))) 01596 { 01597 msg = QString("RecoverQueue: Recovering '%1' for %2 " 01598 "from '%3' state.") 01599 .arg(JobText((*it).type)) 01600 .arg(logInfo).arg(StatusText((*it).status)); 01601 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg); 01602 01603 ChangeJobStatus((*it).id, JOB_QUEUED, ""); 01604 ChangeJobCmds((*it).id, JOB_RUN); 01605 if (!gCoreContext->GetNumSetting("JobsRunOnRecordHost", 0)) 01606 ChangeJobHost((*it).id, ""); 01607 } 01608 else 01609 { 01610 #if 0 01611 msg = QString("RecoverQueue: Ignoring '%1' for %2 " 01612 "in '%3' state.") 01613 .arg(JobText((*it).type)) 01614 .arg(logInfo).arg(StatusText((*it).status)); 01615 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg); 01616 #endif 01617 } 01618 } 01619 } 01620 } 01621 01622 void JobQueue::CleanupOldJobsInQueue() 01623 { 01624 MSqlQuery delquery(MSqlQuery::InitCon()); 01625 QDateTime donePurgeDate = QDateTime::currentDateTime().addDays(-2); 01626 QDateTime errorsPurgeDate = QDateTime::currentDateTime().addDays(-4); 01627 01628 delquery.prepare("DELETE FROM jobqueue " 01629 "WHERE (status in (:FINISHED, :ABORTED, :CANCELLED) " 01630 "AND statustime < :DONEPURGEDATE) " 01631 "OR (status in (:ERRORED) " 01632 "AND statustime < :ERRORSPURGEDATE) "); 01633 delquery.bindValue(":FINISHED", JOB_FINISHED); 01634 delquery.bindValue(":ABORTED", JOB_ABORTED); 01635 delquery.bindValue(":CANCELLED", JOB_CANCELLED); 01636 delquery.bindValue(":ERRORED", JOB_ERRORED); 01637 delquery.bindValue(":DONEPURGEDATE", donePurgeDate); 01638 delquery.bindValue(":ERRORSPURGEDATE", errorsPurgeDate); 01639 01640 if (!delquery.exec()) 01641 { 01642 MythDB::DBError("JobQueue::CleanupOldJobsInQueue: Error deleting " 01643 "old finished jobs.", delquery); 01644 } 01645 } 01646 01647 void JobQueue::ProcessJob(JobQueueEntry job) 01648 { 01649 int jobID = job.id; 01650 QString name = QString("jobqueue%1%2").arg(jobID).arg(random()); 01651 01652 if (!MSqlQuery::testDBConnection()) 01653 { 01654 LOG(VB_JOBQUEUE, LOG_ERR, LOC + 01655 "ProcessJob(): Unable to open database connection"); 01656 return; 01657 } 01658 01659 ChangeJobStatus(jobID, JOB_PENDING); 01660 ProgramInfo *pginfo = NULL; 01661 01662 if (job.chanid) 01663 { 01664 pginfo = new ProgramInfo(job.chanid, job.recstartts); 01665 01666 if (!pginfo->GetChanID()) 01667 { 01668 LOG(VB_JOBQUEUE, LOG_ERR, LOC + 01669 QString("Unable to retrieve program info for chanid %1 @ %2") 01670 .arg(job.chanid).arg(job.recstartts.toString())); 01671 01672 ChangeJobStatus(jobID, JOB_ERRORED, 01673 "Unable to retrieve Program Info from database"); 01674 01675 delete pginfo; 01676 01677 return; 01678 } 01679 01680 pginfo->SetPathname(pginfo->GetPlaybackURL()); 01681 } 01682 01683 01684 runningJobsLock->lock(); 01685 01686 ChangeJobStatus(jobID, JOB_STARTING); 01687 RunningJobInfo jInfo; 01688 jInfo.type = job.type; 01689 jInfo.id = jobID; 01690 jInfo.flag = JOB_RUN; 01691 jInfo.desc = GetJobDescription(job.type); 01692 jInfo.command = GetJobCommand(jobID, job.type, pginfo); 01693 jInfo.pginfo = pginfo; 01694 01695 runningJobs[jobID] = jInfo; 01696 01697 if (pginfo) 01698 pginfo->MarkAsInUse(true, kJobQueueInUseID); 01699 01700 if (pginfo && pginfo->GetRecordingGroup() == "Deleted") 01701 { 01702 ChangeJobStatus(jobID, JOB_CANCELLED, 01703 "Program has been deleted"); 01704 RemoveRunningJob(jobID); 01705 } 01706 else if ((job.type == JOB_TRANSCODE) || 01707 (runningJobs[jobID].command == "mythtranscode")) 01708 { 01709 StartChildJob(TranscodeThread, jobID); 01710 } 01711 else if ((job.type == JOB_COMMFLAG) || 01712 (runningJobs[jobID].command == "mythcommflag")) 01713 { 01714 StartChildJob(FlagCommercialsThread, jobID); 01715 } 01716 else if ((job.type == JOB_METADATA) || 01717 (runningJobs[jobID].command == "mythmetadatalookup")) 01718 { 01719 StartChildJob(MetadataLookupThread, jobID); 01720 } 01721 else if (job.type & JOB_USERJOB) 01722 { 01723 StartChildJob(UserJobThread, jobID); 01724 } 01725 else 01726 { 01727 ChangeJobStatus(jobID, JOB_ERRORED, 01728 "UNKNOWN JobType, unable to process!"); 01729 RemoveRunningJob(jobID); 01730 } 01731 01732 runningJobsLock->unlock(); 01733 } 01734 01735 void JobQueue::StartChildJob(void *(*ChildThreadRoutine)(void *), int jobID) 01736 { 01737 JobThreadStruct *jts = new JobThreadStruct; 01738 jts->jq = this; 01739 jts->jobID = jobID; 01740 01741 pthread_t childThread; 01742 pthread_attr_t attr; 01743 pthread_attr_init(&attr); 01744 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 01745 pthread_create(&childThread, &attr, ChildThreadRoutine, jts); 01746 pthread_attr_destroy(&attr); 01747 } 01748 01749 QString JobQueue::GetJobDescription(int jobType) 01750 { 01751 if (jobType == JOB_TRANSCODE) 01752 return "Transcode"; 01753 else if (jobType == JOB_COMMFLAG) 01754 return "Commercial Detection"; 01755 else if (!(jobType & JOB_USERJOB)) 01756 return "Unknown Job"; 01757 01758 QString descSetting = 01759 QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType)); 01760 01761 return gCoreContext->GetSetting(descSetting, "Unknown Job"); 01762 } 01763 01764 QString JobQueue::GetJobCommand(int id, int jobType, ProgramInfo *tmpInfo) 01765 { 01766 QString command; 01767 MSqlQuery query(MSqlQuery::InitCon()); 01768 01769 if (jobType == JOB_TRANSCODE) 01770 { 01771 command = gCoreContext->GetSetting("JobQueueTranscodeCommand"); 01772 if (command.trimmed().isEmpty()) 01773 command = "mythtranscode"; 01774 01775 if (command == "mythtranscode") 01776 return command; 01777 } 01778 else if (jobType == JOB_COMMFLAG) 01779 { 01780 command = gCoreContext->GetSetting("JobQueueCommFlagCommand"); 01781 if (command.trimmed().isEmpty()) 01782 command = "mythcommflag"; 01783 01784 if (command == "mythcommflag") 01785 return command; 01786 } 01787 else if (jobType & JOB_USERJOB) 01788 { 01789 command = gCoreContext->GetSetting( 01790 QString("UserJob%1").arg(UserJobTypeToIndex(jobType)), ""); 01791 } 01792 01793 if (!command.isEmpty()) 01794 { 01795 command.replace("%JOBID%", QString("%1").arg(id)); 01796 } 01797 01798 if (!command.isEmpty() && tmpInfo) 01799 { 01800 tmpInfo->SubstituteMatches(command); 01801 01802 command.replace("%VERBOSELEVEL%", QString("%1").arg(verboseMask)); 01803 command.replace("%VERBOSEMODE%", QString("%1").arg(logPropagateArgs)); 01804 01805 uint transcoder = tmpInfo->QueryTranscoderID(); 01806 command.replace("%TRANSPROFILE%", 01807 (RecordingProfile::TranscoderAutodetect == transcoder) ? 01808 "autodetect" : QString::number(transcoder)); 01809 } 01810 01811 return command; 01812 } 01813 01814 void JobQueue::RemoveRunningJob(int id) 01815 { 01816 runningJobsLock->lock(); 01817 01818 if (runningJobs.contains(id)) 01819 { 01820 ProgramInfo *pginfo = runningJobs[id].pginfo; 01821 if (pginfo) 01822 { 01823 pginfo->MarkAsInUse(false, kJobQueueInUseID); 01824 delete pginfo; 01825 } 01826 01827 runningJobs.remove(id); 01828 } 01829 01830 runningJobsLock->unlock(); 01831 } 01832 01833 QString JobQueue::PrettyPrint(off_t bytes) 01834 { 01835 // Pretty print "bytes" as KB, MB, GB, TB, etc., subject to the desired 01836 // number of units 01837 static const struct { 01838 const char *suffix; 01839 unsigned int max; 01840 int precision; 01841 } pptab[] = { 01842 { "bytes", 9999, 0 }, 01843 { "kB", 999, 0 }, 01844 { "MB", 999, 1 }, 01845 { "GB", 999, 1 }, 01846 { "TB", 999, 1 }, 01847 { "PB", 999, 1 }, 01848 { "EB", 999, 1 }, 01849 { "ZB", 999, 1 }, 01850 { "YB", 0, 0 }, 01851 }; 01852 unsigned int ii; 01853 float fbytes = bytes; 01854 01855 ii = 0; 01856 while (pptab[ii].max && fbytes > pptab[ii].max) { 01857 fbytes /= 1024; 01858 ii++; 01859 } 01860 01861 return QString("%1 %2") 01862 .arg(fbytes, 0, 'f', pptab[ii].precision) 01863 .arg(pptab[ii].suffix); 01864 } 01865 01866 void *JobQueue::TranscodeThread(void *param) 01867 { 01868 JobThreadStruct *jts = (JobThreadStruct *)param; 01869 JobQueue *jq = jts->jq; 01870 01871 MThread::ThreadSetup(QString("Transcode_%1").arg(jts->jobID)); 01872 jq->DoTranscodeThread(jts->jobID); 01873 MThread::ThreadCleanup(); 01874 01875 delete jts; 01876 01877 return NULL; 01878 } 01879 01880 void JobQueue::DoTranscodeThread(int jobID) 01881 { 01882 // We can't currently transcode non-recording files w/o a ProgramInfo 01883 runningJobsLock->lock(); 01884 if (!runningJobs[jobID].pginfo) 01885 { 01886 LOG(VB_JOBQUEUE, LOG_ERR, LOC + 01887 "The JobQueue cannot currently transcode files that do not " 01888 "have a chanid/starttime in the recorded table."); 01889 ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found"); 01890 RemoveRunningJob(jobID); 01891 runningJobsLock->unlock(); 01892 return; 01893 } 01894 01895 ProgramInfo *program_info = runningJobs[jobID].pginfo; 01896 runningJobsLock->unlock(); 01897 01898 ChangeJobStatus(jobID, JOB_RUNNING); 01899 01900 // make sure flags are up to date 01901 program_info->Reload(); 01902 01903 bool useCutlist = program_info->HasCutlist() && 01904 !!(GetJobFlags(jobID) & JOB_USE_CUTLIST); 01905 01906 uint transcoder = program_info->QueryTranscoderID(); 01907 QString profilearg = 01908 (RecordingProfile::TranscoderAutodetect == transcoder) ? 01909 "autodetect" : QString::number(transcoder); 01910 01911 QString path; 01912 QString command; 01913 01914 runningJobsLock->lock(); 01915 if (runningJobs[jobID].command == "mythtranscode") 01916 { 01917 path = GetInstallPrefix() + "/bin/mythtranscode"; 01918 command = QString("%1 -j %2 --profile %3") 01919 .arg(path).arg(jobID).arg(profilearg); 01920 if (useCutlist) 01921 command += " --honorcutlist"; 01922 command += logPropagateArgs; 01923 } 01924 else 01925 { 01926 command = runningJobs[jobID].command; 01927 01928 QStringList tokens = command.split(" ", QString::SkipEmptyParts); 01929 if (!tokens.empty()) 01930 path = tokens[0]; 01931 } 01932 runningJobsLock->unlock(); 01933 01934 if (jobQueueCPU < 2) 01935 { 01936 myth_nice(17); 01937 myth_ioprio((0 == jobQueueCPU) ? 8 : 7); 01938 } 01939 01940 QString transcoderName; 01941 if (transcoder == RecordingProfile::TranscoderAutodetect) 01942 { 01943 transcoderName = "Autodetect"; 01944 } 01945 else 01946 { 01947 MSqlQuery query(MSqlQuery::InitCon()); 01948 query.prepare("SELECT name FROM recordingprofiles WHERE id = :ID;"); 01949 query.bindValue(":ID", transcoder); 01950 if (query.exec() && query.next()) 01951 { 01952 transcoderName = query.value(0).toString(); 01953 } 01954 else 01955 { 01956 /* Unexpected value; log it. */ 01957 transcoderName = QString("Autodetect(%1)").arg(transcoder); 01958 } 01959 } 01960 01961 QString msg; 01962 bool retry = true; 01963 int retrylimit = 3; 01964 while (retry) 01965 { 01966 retry = false; 01967 01968 ChangeJobStatus(jobID, JOB_STARTING); 01969 program_info->SaveTranscodeStatus(TRANSCODING_RUNNING); 01970 01971 QString filename = program_info->GetPlaybackURL(false, true); 01972 01973 long long filesize = 0; 01974 long long origfilesize = QFileInfo(filename).size(); 01975 01976 QString msg = QString("Transcode %1") 01977 .arg(StatusText(GetJobStatus(jobID))); 01978 01979 QString detailstr = QString("%1: %2 (%3)") 01980 .arg(program_info->toString(ProgramInfo::kTitleSubtitle)) 01981 .arg(transcoderName) 01982 .arg(PrettyPrint(origfilesize)); 01983 QByteArray details = detailstr.toLocal8Bit(); 01984 01985 LOG(VB_GENERAL, LOG_INFO, LOC + QString("%1 for %2") 01986 .arg(msg).arg(details.constData())); 01987 01988 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'") 01989 .arg(command)); 01990 01991 GetMythDB()->GetDBManager()->CloseDatabases(); 01992 uint result = myth_system(command); 01993 int status = GetJobStatus(jobID); 01994 01995 if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) || 01996 (result == GENERIC_EXIT_CMD_NOT_FOUND)) 01997 { 01998 ChangeJobStatus(jobID, JOB_ERRORED, 01999 "ERROR: Unable to find mythtranscode, check backend logs."); 02000 program_info->SaveTranscodeStatus(TRANSCODING_NOT_TRANSCODED); 02001 02002 msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID))); 02003 detailstr = QString("%1: %2 does not exist or is not executable") 02004 .arg(program_info->toString(ProgramInfo::kTitleSubtitle)) 02005 .arg(path); 02006 details = detailstr.toLocal8Bit(); 02007 02008 LOG(VB_GENERAL, LOG_ERR, LOC + 02009 QString("%1 for %2").arg(msg).arg(details.constData())); 02010 } 02011 else if (result == GENERIC_EXIT_RESTART && retrylimit > 0) 02012 { 02013 LOG(VB_JOBQUEUE, LOG_INFO, LOC + "Transcode command restarting"); 02014 retry = true; 02015 retrylimit--; 02016 02017 program_info->SaveTranscodeStatus(TRANSCODING_NOT_TRANSCODED); 02018 } 02019 else 02020 { 02021 if (status == JOB_FINISHED) 02022 { 02023 ChangeJobStatus(jobID, JOB_FINISHED, "Finished."); 02024 retry = false; 02025 02026 filename = program_info->GetPlaybackURL(false, true); 02027 QFileInfo st(filename); 02028 02029 if (st.exists()) 02030 { 02031 filesize = st.size(); 02032 02033 QString comment = QString("%1: %2 => %3") 02034 .arg(transcoderName) 02035 .arg(PrettyPrint(origfilesize)) 02036 .arg(PrettyPrint(filesize)); 02037 ChangeJobComment(jobID, comment); 02038 02039 if (filesize > 0) 02040 program_info->SaveFilesize(filesize); 02041 02042 details = (QString("%1: %2 (%3)") 02043 .arg(program_info->toString( 02044 ProgramInfo::kTitleSubtitle)) 02045 .arg(transcoderName) 02046 .arg(PrettyPrint(filesize))).toLocal8Bit(); 02047 } 02048 else 02049 { 02050 QString comment = 02051 QString("could not stat '%1'").arg(filename); 02052 02053 ChangeJobStatus(jobID, JOB_FINISHED, comment); 02054 02055 details = (QString("%1: %2") 02056 .arg(program_info->toString( 02057 ProgramInfo::kTitleSubtitle)) 02058 .arg(comment)).toLocal8Bit(); 02059 } 02060 02061 program_info->SaveTranscodeStatus(TRANSCODING_COMPLETE); 02062 } 02063 else 02064 { 02065 program_info->SaveTranscodeStatus(TRANSCODING_NOT_TRANSCODED); 02066 02067 QString comment = 02068 QString("exit status %1, job status was \"%2\"") 02069 .arg(result) 02070 .arg(StatusText(status)); 02071 02072 ChangeJobStatus(jobID, JOB_ERRORED, comment); 02073 02074 details = (QString("%1: %2 (%3)") 02075 .arg(program_info->toString( 02076 ProgramInfo::kTitleSubtitle)) 02077 .arg(transcoderName) 02078 .arg(comment)).toLocal8Bit().constData(); 02079 } 02080 02081 msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID))); 02082 LOG(VB_GENERAL, LOG_INFO, LOC + msg + ": " + details); 02083 } 02084 } 02085 02086 if (retrylimit == 0) 02087 { 02088 LOG(VB_JOBQUEUE, LOG_ERR, LOC + "Retry limit exceeded for transcoder, " 02089 "setting job status to errored."); 02090 ChangeJobStatus(jobID, JOB_ERRORED, "Retry limit exceeded"); 02091 } 02092 02093 RemoveRunningJob(jobID); 02094 } 02095 02096 void *JobQueue::MetadataLookupThread(void *param) 02097 { 02098 JobThreadStruct *jts = (JobThreadStruct *)param; 02099 JobQueue *jq = jts->jq; 02100 02101 MThread::ThreadSetup(QString("Metadata_%1").arg(jts->jobID)); 02102 jq->DoMetadataLookupThread(jts->jobID); 02103 MThread::ThreadCleanup(); 02104 02105 delete jts; 02106 02107 return NULL; 02108 } 02109 02110 void JobQueue::DoMetadataLookupThread(int jobID) 02111 { 02112 // We can't currently lookup non-recording files w/o a ProgramInfo 02113 runningJobsLock->lock(); 02114 if (!runningJobs[jobID].pginfo) 02115 { 02116 LOG(VB_JOBQUEUE, LOG_ERR, LOC + 02117 "The JobQueue cannot currently perform lookups for items which do " 02118 "not have a chanid/starttime in the recorded table."); 02119 ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found"); 02120 RemoveRunningJob(jobID); 02121 runningJobsLock->unlock(); 02122 return; 02123 } 02124 02125 ProgramInfo *program_info = runningJobs[jobID].pginfo; 02126 runningJobsLock->unlock(); 02127 02128 QString detailstr = QString("%1 recorded from channel %3") 02129 .arg(program_info->toString(ProgramInfo::kTitleSubtitle)) 02130 .arg(program_info->toString(ProgramInfo::kRecordingKey)); 02131 QByteArray details = detailstr.toLocal8Bit(); 02132 02133 if (!MSqlQuery::testDBConnection()) 02134 { 02135 QString msg = QString("Metadata Lookup failed. Could not open " 02136 "new database connection for %1. " 02137 "Program cannot be looked up.") 02138 .arg(details.constData()); 02139 LOG(VB_GENERAL, LOG_ERR, LOC + msg); 02140 02141 ChangeJobStatus(jobID, JOB_ERRORED, 02142 "Could not open new database connection for " 02143 "metadata lookup."); 02144 02145 delete program_info; 02146 return; 02147 } 02148 02149 QString msg = tr("Metadata Lookup Starting"); 02150 LOG(VB_GENERAL, LOG_INFO, 02151 LOC + "Metadata Lookup Starting for " + detailstr); 02152 02153 uint retVal = 0; 02154 QString path; 02155 QString command; 02156 02157 path = GetInstallPrefix() + "/bin/mythmetadatalookup"; 02158 command = QString("%1 -j %2") 02159 .arg(path).arg(jobID); 02160 command += logPropagateArgs; 02161 02162 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'") 02163 .arg(command)); 02164 02165 GetMythDB()->GetDBManager()->CloseDatabases(); 02166 retVal = myth_system(command); 02167 int priority = LOG_NOTICE; 02168 QString comment; 02169 02170 runningJobsLock->lock(); 02171 02172 if ((retVal == GENERIC_EXIT_DAEMONIZING_ERROR) || 02173 (retVal == GENERIC_EXIT_CMD_NOT_FOUND)) 02174 { 02175 comment = tr("Unable to find mythmetadatalookup"); 02176 ChangeJobStatus(jobID, JOB_ERRORED, comment); 02177 priority = LOG_WARNING; 02178 } 02179 else if (runningJobs[jobID].flag == JOB_STOP) 02180 { 02181 comment = tr("Aborted by user"); 02182 ChangeJobStatus(jobID, JOB_ABORTED, comment); 02183 priority = LOG_WARNING; 02184 } 02185 else if (retVal == GENERIC_EXIT_NO_RECORDING_DATA) 02186 { 02187 comment = tr("Unable to open file or init decoder"); 02188 ChangeJobStatus(jobID, JOB_ERRORED, comment); 02189 priority = LOG_WARNING; 02190 } 02191 else if (retVal >= GENERIC_EXIT_NOT_OK) // 256 or above - error 02192 { 02193 comment = tr("Failed with exit status %1").arg(retVal); 02194 ChangeJobStatus(jobID, JOB_ERRORED, comment); 02195 priority = LOG_WARNING; 02196 } 02197 else 02198 { 02199 comment = tr("Metadata Lookup Complete."); 02200 ChangeJobStatus(jobID, JOB_FINISHED, comment); 02201 02202 program_info->SendUpdateEvent(); 02203 } 02204 02205 msg = tr("Metadata Lookup %1", "Job ID") 02206 .arg(StatusText(GetJobStatus(jobID))); 02207 02208 if (!comment.isEmpty()) 02209 { 02210 detailstr += QString(" (%1)").arg(comment); 02211 details = detailstr.toLocal8Bit(); 02212 } 02213 02214 if (priority <= LOG_WARNING) 02215 LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details.constData()); 02216 02217 RemoveRunningJob(jobID); 02218 runningJobsLock->unlock(); 02219 } 02220 02221 void *JobQueue::FlagCommercialsThread(void *param) 02222 { 02223 JobThreadStruct *jts = (JobThreadStruct *)param; 02224 JobQueue *jq = jts->jq; 02225 02226 MThread::ThreadSetup(QString("Commflag_%1").arg(jts->jobID)); 02227 jq->DoFlagCommercialsThread(jts->jobID); 02228 MThread::ThreadCleanup(); 02229 02230 delete jts; 02231 02232 return NULL; 02233 } 02234 02235 void JobQueue::DoFlagCommercialsThread(int jobID) 02236 { 02237 // We can't currently commflag non-recording files w/o a ProgramInfo 02238 runningJobsLock->lock(); 02239 if (!runningJobs[jobID].pginfo) 02240 { 02241 LOG(VB_JOBQUEUE, LOG_ERR, LOC + 02242 "The JobQueue cannot currently commflag files that do not " 02243 "have a chanid/starttime in the recorded table."); 02244 ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found"); 02245 RemoveRunningJob(jobID); 02246 runningJobsLock->unlock(); 02247 return; 02248 } 02249 02250 ProgramInfo *program_info = runningJobs[jobID].pginfo; 02251 runningJobsLock->unlock(); 02252 02253 QString detailstr = QString("%1 recorded from channel %3") 02254 .arg(program_info->toString(ProgramInfo::kTitleSubtitle)) 02255 .arg(program_info->toString(ProgramInfo::kRecordingKey)); 02256 QByteArray details = detailstr.toLocal8Bit(); 02257 02258 if (!MSqlQuery::testDBConnection()) 02259 { 02260 QString msg = QString("Commercial Detection failed. Could not open " 02261 "new database connection for %1. " 02262 "Program cannot be flagged.") 02263 .arg(details.constData()); 02264 LOG(VB_GENERAL, LOG_ERR, LOC + msg); 02265 02266 ChangeJobStatus(jobID, JOB_ERRORED, 02267 "Could not open new database connection for " 02268 "commercial detector."); 02269 02270 delete program_info; 02271 return; 02272 } 02273 02274 QString msg = tr("Commercial Detection Starting"); 02275 LOG(VB_GENERAL, LOG_INFO, 02276 LOC + "Commercial Detection Starting for " + detailstr); 02277 02278 uint breaksFound = 0; 02279 QString path; 02280 QString command; 02281 02282 runningJobsLock->lock(); 02283 if (runningJobs[jobID].command == "mythcommflag") 02284 { 02285 path = GetInstallPrefix() + "/bin/mythcommflag"; 02286 command = QString("%1 -j %2 --noprogress") 02287 .arg(path).arg(jobID); 02288 command += logPropagateArgs; 02289 } 02290 else 02291 { 02292 command = runningJobs[jobID].command; 02293 QStringList tokens = command.split(" ", QString::SkipEmptyParts); 02294 if (!tokens.empty()) 02295 path = tokens[0]; 02296 } 02297 runningJobsLock->unlock(); 02298 02299 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'") 02300 .arg(command)); 02301 02302 GetMythDB()->GetDBManager()->CloseDatabases(); 02303 breaksFound = myth_system(command, kMSLowExitVal); 02304 int priority = LOG_NOTICE; 02305 QString comment; 02306 02307 runningJobsLock->lock(); 02308 02309 if ((breaksFound == GENERIC_EXIT_DAEMONIZING_ERROR) || 02310 (breaksFound == GENERIC_EXIT_CMD_NOT_FOUND)) 02311 { 02312 comment = tr("Unable to find mythcommflag"); 02313 ChangeJobStatus(jobID, JOB_ERRORED, comment); 02314 priority = LOG_WARNING; 02315 } 02316 else if (runningJobs[jobID].flag == JOB_STOP) 02317 { 02318 comment = tr("Aborted by user"); 02319 ChangeJobStatus(jobID, JOB_ABORTED, comment); 02320 priority = LOG_WARNING; 02321 } 02322 else if (breaksFound == GENERIC_EXIT_NO_RECORDING_DATA) 02323 { 02324 comment = tr("Unable to open file or init decoder"); 02325 ChangeJobStatus(jobID, JOB_ERRORED, comment); 02326 priority = LOG_WARNING; 02327 } 02328 else if (breaksFound >= GENERIC_EXIT_NOT_OK) // 256 or above - error 02329 { 02330 comment = tr("Failed with exit status %1").arg(breaksFound); 02331 ChangeJobStatus(jobID, JOB_ERRORED, comment); 02332 priority = LOG_WARNING; 02333 } 02334 else 02335 { 02336 comment = tr("%n commercial break(s)", "", breaksFound); 02337 ChangeJobStatus(jobID, JOB_FINISHED, comment); 02338 02339 program_info->SendUpdateEvent(); 02340 02341 if (!program_info->IsLocal()) 02342 program_info->SetPathname(program_info->GetPlaybackURL(false,true)); 02343 if (program_info->IsLocal()) 02344 { 02345 PreviewGenerator *pg = new PreviewGenerator( 02346 program_info, QString(), PreviewGenerator::kLocal); 02347 pg->Run(); 02348 pg->deleteLater(); 02349 } 02350 } 02351 02352 msg = tr("Commercial Detection %1", "Job ID") 02353 .arg(StatusText(GetJobStatus(jobID))); 02354 02355 if (!comment.isEmpty()) 02356 { 02357 detailstr += QString(" (%1)").arg(comment); 02358 details = detailstr.toLocal8Bit(); 02359 } 02360 02361 if (priority <= LOG_WARNING) 02362 LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details.constData()); 02363 02364 RemoveRunningJob(jobID); 02365 runningJobsLock->unlock(); 02366 } 02367 02368 void *JobQueue::UserJobThread(void *param) 02369 { 02370 JobThreadStruct *jts = (JobThreadStruct *)param; 02371 JobQueue *jq = jts->jq; 02372 02373 MThread::ThreadSetup(QString("UserJob_%1").arg(jts->jobID)); 02374 jq->DoUserJobThread(jts->jobID); 02375 MThread::ThreadCleanup(); 02376 02377 delete jts; 02378 02379 return NULL; 02380 } 02381 02382 void JobQueue::DoUserJobThread(int jobID) 02383 { 02384 runningJobsLock->lock(); 02385 ProgramInfo *pginfo = runningJobs[jobID].pginfo; 02386 QString jobDesc = runningJobs[jobID].desc; 02387 QString command = runningJobs[jobID].command; 02388 runningJobsLock->unlock(); 02389 02390 ChangeJobStatus(jobID, JOB_RUNNING); 02391 02392 QString msg; 02393 02394 if (pginfo) 02395 { 02396 msg = QString("Started %1 for %2 recorded from channel %3") 02397 .arg(jobDesc) 02398 .arg(pginfo->toString(ProgramInfo::kTitleSubtitle)) 02399 .arg(pginfo->toString(ProgramInfo::kRecordingKey)); 02400 } 02401 else 02402 msg = QString("Started %1 for jobID %2").arg(jobDesc).arg(jobID); 02403 02404 LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData())); 02405 02406 switch (jobQueueCPU) 02407 { 02408 case 0: myth_nice(17); 02409 myth_ioprio(8); 02410 break; 02411 case 1: myth_nice(10); 02412 myth_ioprio(7); 02413 break; 02414 case 2: 02415 default: break; 02416 } 02417 02418 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'") 02419 .arg(command)); 02420 GetMythDB()->GetDBManager()->CloseDatabases(); 02421 uint result = myth_system(command); 02422 02423 if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) || 02424 (result == GENERIC_EXIT_CMD_NOT_FOUND)) 02425 { 02426 msg = QString("User Job '%1' failed, unable to find " 02427 "executable, check your PATH and backend logs.") 02428 .arg(command); 02429 LOG(VB_GENERAL, LOG_ERR, LOC + msg); 02430 LOG(VB_GENERAL, LOG_NOTICE, LOC + QString("Current PATH: '%1'") 02431 .arg(getenv("PATH"))); 02432 02433 ChangeJobStatus(jobID, JOB_ERRORED, 02434 "ERROR: Unable to find executable, check backend logs."); 02435 } 02436 else if (result != 0) 02437 { 02438 msg = QString("User Job '%1' failed.").arg(command); 02439 LOG(VB_GENERAL, LOG_ERR, LOC + msg); 02440 02441 ChangeJobStatus(jobID, JOB_ERRORED, 02442 "ERROR: User Job returned non-zero, check logs."); 02443 } 02444 else 02445 { 02446 if (pginfo) 02447 { 02448 msg = QString("Finished %1 for %2 recorded from channel %3") 02449 .arg(jobDesc) 02450 .arg(pginfo->toString(ProgramInfo::kTitleSubtitle)) 02451 .arg(pginfo->toString(ProgramInfo::kRecordingKey)); 02452 } 02453 else 02454 msg = QString("Finished %1 for jobID %2").arg(jobDesc).arg(jobID); 02455 02456 LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData())); 02457 02458 ChangeJobStatus(jobID, JOB_FINISHED, "Successfully Completed."); 02459 02460 if (pginfo) 02461 pginfo->SendUpdateEvent(); 02462 } 02463 02464 RemoveRunningJob(jobID); 02465 } 02466 02467 int JobQueue::UserJobTypeToIndex(int jobType) 02468 { 02469 if (jobType & JOB_USERJOB) 02470 { 02471 int x = ((jobType & JOB_USERJOB)>> 8); 02472 int bits = 1; 02473 while ((x != 0) && ((x & 0x01) == 0)) 02474 { 02475 bits++; 02476 x = x >> 1; 02477 } 02478 if ( bits > 4 ) 02479 return JOB_NONE; 02480 02481 return bits; 02482 } 02483 return JOB_NONE; 02484 } 02485 02486 /* vim: set expandtab tabstop=4 shiftwidth=4: */
1.7.6.1