|
MythTV
0.26-pre
|
00001 /* 00002 An HttpServer Extension that manages subscriptions to UPnP services. 00003 00004 An object wishing to subscribe to a service needs to register as a listener for 00005 events and subscribe using a valid usn and subscription path. The subscriber 00006 is responsible for requesting a renewal before the subscription expires, 00007 removing any stale subscriptions, unsubsubscribing on exit and must re-implement 00008 QObject::customEvent to receive event notifications for subscribed services. 00009 */ 00010 00011 #include <QTextCodec> 00012 00013 #include "mythcorecontext.h" 00014 #include "mythlogging.h" 00015 #include "upnpsubscription.h" 00016 00017 // default requested time for subscription (actual is dictated by server) 00018 #define SUBSCRIPTION_TIME 1800 00019 // maximum time to wait for responses to subscription requests (UPnP spec. 30s) 00020 #define MAX_WAIT 30000 00021 00022 #define LOC QString("UPnPSub: ") 00023 00024 class Subscription 00025 { 00026 public: 00027 Subscription(QUrl url, QString path) 00028 : m_url(url), m_path(path), m_uuid(QString()) { } 00029 QUrl m_url; 00030 QString m_path; 00031 QString m_uuid; 00032 }; 00033 00034 UPNPSubscription::UPNPSubscription(const QString &share_path, int port) 00035 : HttpServerExtension("UPnPSubscriptionManager", share_path), 00036 m_subscriptionLock(QMutex::Recursive), m_callback(QString("NOTSET")) 00037 { 00038 QString host; 00039 if (!UPnp::g_IPAddrList.isEmpty()) 00040 host = UPnp::g_IPAddrList.at(0); 00041 00042 // taken from MythCoreContext 00043 #if !defined(QT_NO_IPV6) 00044 QHostAddress addr(host); 00045 if ((addr.protocol() == QAbstractSocket::IPv6Protocol) || 00046 (host.contains(":"))) 00047 host = "[" + host + "]"; 00048 #endif 00049 00050 m_callback = QString("http://%1:%2/Subscriptions/event?usn=") 00051 .arg(host).arg(QString::number(port)); 00052 } 00053 00054 UPNPSubscription::~UPNPSubscription() 00055 { 00056 m_subscriptionLock.lock(); 00057 QList<QString> usns = m_subscriptions.keys(); 00058 while (!usns.isEmpty()) 00059 Unsubscribe(usns.takeLast()); 00060 m_subscriptions.clear(); 00061 m_subscriptionLock.unlock(); 00062 00063 LOG(VB_UPNP, LOG_DEBUG, LOC + "Finished"); 00064 } 00065 00066 int UPNPSubscription::Subscribe(const QString &usn, const QUrl &url, 00067 const QString &path) 00068 { 00069 LOG(VB_UPNP, LOG_DEBUG, LOC + QString("Subscribe %1 %2 %3") 00070 .arg(usn).arg(url.toString()).arg(path)); 00071 00072 // N.B. this is called from the client object's thread. Hence we have to 00073 // lock until the subscription request has returned, otherwise we may 00074 // receive the first event notification (in the HttpServer thread) 00075 // before the subscription is processed and the event will fail 00076 00077 QMutexLocker lock(&m_subscriptionLock); 00078 if (m_subscriptions.contains(usn)) 00079 { 00080 if (m_subscriptions[usn]->m_url != url || 00081 m_subscriptions[usn]->m_path != path) 00082 { 00083 LOG(VB_GENERAL, LOG_WARNING, LOC + 00084 "Re-subscribing with different url and path."); 00085 m_subscriptions[usn]->m_url = url; 00086 m_subscriptions[usn]->m_path = path; 00087 m_subscriptions[usn]->m_uuid = QString(); 00088 } 00089 } 00090 else 00091 { 00092 m_subscriptions.insert(usn, new Subscription(url, path)); 00093 } 00094 00095 return SendSubscribeRequest(m_callback, usn, url, path, QString(), 00096 m_subscriptions[usn]->m_uuid); 00097 } 00098 00099 void UPNPSubscription::Unsubscribe(const QString &usn) 00100 { 00101 QUrl url; 00102 QString path; 00103 QString uuid = QString(); 00104 m_subscriptionLock.lock(); 00105 if (m_subscriptions.contains(usn)) 00106 { 00107 url = m_subscriptions[usn]->m_url; 00108 path = m_subscriptions[usn]->m_path; 00109 uuid = m_subscriptions[usn]->m_uuid; 00110 delete m_subscriptions.value(usn); 00111 m_subscriptions.remove(usn); 00112 } 00113 m_subscriptionLock.unlock(); 00114 00115 if (!uuid.isEmpty()) 00116 SendUnsubscribeRequest(usn, url, path, uuid); 00117 } 00118 00119 int UPNPSubscription::Renew(const QString &usn) 00120 { 00121 LOG(VB_UPNP, LOG_DEBUG, LOC + QString("Renew: %1").arg(usn)); 00122 00123 QUrl url; 00124 QString path; 00125 QString sid; 00126 00127 // see locking comment in Subscribe 00128 QMutexLocker lock(&m_subscriptionLock); 00129 if (m_subscriptions.contains(usn)) 00130 { 00131 url = m_subscriptions[usn]->m_url; 00132 path = m_subscriptions[usn]->m_path; 00133 sid = m_subscriptions[usn]->m_uuid; 00134 } 00135 else 00136 { 00137 LOG(VB_UPNP, LOG_ERR, LOC + QString("Unrecognised renewal usn: %1") 00138 .arg(usn)); 00139 return 0; 00140 } 00141 00142 if (!sid.isEmpty()) 00143 { 00144 return SendSubscribeRequest(m_callback, usn, url, path, sid, 00145 m_subscriptions[usn]->m_uuid); 00146 } 00147 00148 LOG(VB_UPNP, LOG_ERR, LOC + QString("No uuid - not renewing usn: %1") 00149 .arg(usn)); 00150 return 0; 00151 } 00152 00153 void UPNPSubscription::Remove(const QString &usn) 00154 { 00155 // this could be handled by hooking directly into the SSDPCache updates 00156 // but the subscribing object will also be doing so. Having the that 00157 // object initiate the removal avoids temoporary race conditions during 00158 // periods of UPnP/SSDP activity 00159 m_subscriptionLock.lock(); 00160 if (m_subscriptions.contains(usn)) 00161 { 00162 LOG(VB_UPNP, LOG_INFO, LOC + QString("Removing %1").arg(usn)); 00163 delete m_subscriptions.value(usn); 00164 m_subscriptions.remove(usn); 00165 } 00166 m_subscriptionLock.unlock(); 00167 } 00168 00169 bool UPNPSubscription::ProcessRequest(HTTPRequest *pRequest) 00170 { 00171 if (!pRequest) 00172 return false; 00173 00174 if (pRequest->m_sBaseUrl != "/Subscriptions") 00175 return false; 00176 if (pRequest->m_sMethod != "event") 00177 return false; 00178 00179 LOG(VB_UPNP, LOG_DEBUG, LOC + QString("%1\n%2") 00180 .arg(pRequest->m_sRawRequest).arg(pRequest->m_sPayload)); 00181 00182 if (pRequest->m_sPayload.isEmpty()) 00183 return true; 00184 00185 pRequest->m_eResponseType = ResponseTypeHTML; 00186 00187 QString nt = pRequest->m_mapHeaders["nt"]; 00188 QString nts = pRequest->m_mapHeaders["nts"]; 00189 bool no = pRequest->m_sRawRequest.startsWith("NOTIFY"); 00190 00191 if (nt.isEmpty() || nts.isEmpty() || !no) 00192 { 00193 pRequest->m_nResponseStatus = 400; 00194 return true; 00195 } 00196 00197 pRequest->m_nResponseStatus = 412; 00198 if (nt != "upnp:event" || nts != "upnp:propchange") 00199 return true; 00200 00201 QString usn = pRequest->m_mapParams["usn"]; 00202 QString sid = pRequest->m_mapHeaders["sid"]; 00203 if (usn.isEmpty() || sid.isEmpty()) 00204 return true; 00205 00206 // N.B. Validating the usn and uuid here might mean blocking for some time 00207 // while waiting for a subscription to complete. While this operates in a 00208 // worker thread, worker threads are a limited resource which we could 00209 // rapidly overload if a number of events arrive. Instead let the 00210 // subscribing objects validate the usn - the uuid should be superfluous. 00211 00212 QString seq = pRequest->m_mapHeaders["seq"]; 00213 00214 // mediatomb sends some extra character(s) at the end of the payload 00215 // which throw Qt, so try and trim them off 00216 int loc = pRequest->m_sPayload.lastIndexOf("propertyset>"); 00217 QString payload = (loc > -1) ? pRequest->m_sPayload.left(loc + 12) : 00218 pRequest->m_sPayload; 00219 00220 LOG(VB_UPNP, LOG_DEBUG, LOC + QString("Payload:\n%1").arg(payload)); 00221 00222 pRequest->m_nResponseStatus = 400; 00223 QDomDocument body; 00224 QString error; 00225 int errorCol = 0; 00226 int errorLine = 0; 00227 if (!body.setContent(payload, true, &error, &errorLine, &errorCol)) 00228 { 00229 LOG(VB_GENERAL, LOG_ERR, LOC + 00230 QString("Failed to parse event: Line: %1 Col: %2 Error: '%3'") 00231 .arg(errorLine).arg(errorCol).arg(error)); 00232 return true; 00233 } 00234 00235 LOG(VB_UPNP, LOG_DEBUG, LOC + "/n/n" + body.toString(4) + "/n/n"); 00236 00237 QDomNodeList properties = body.elementsByTagName("property"); 00238 QHash<QString,QString> results; 00239 00240 // this deals with both one argument per property (compliant) and mutliple 00241 // arguments per property as sent by mediatomb 00242 for (int i = 0; i < properties.size(); i++) 00243 { 00244 QDomNodeList arguments = properties.at(i).childNodes(); 00245 for (int j = 0; j < arguments.size(); j++) 00246 { 00247 QDomElement e = arguments.at(j).toElement(); 00248 if (!e.isNull() && !e.text().isEmpty() && !e.tagName().isEmpty()) 00249 results.insert(e.tagName(), e.text()); 00250 } 00251 } 00252 00253 // using MythObservable allows multiple objects to subscribe to the same 00254 // service but is less efficient from an eventing perspective, especially 00255 // if multiple objects are subscribing 00256 if (!results.isEmpty()) 00257 { 00258 pRequest->m_nResponseStatus = 200; 00259 results.insert("usn", usn); 00260 results.insert("seq", seq); 00261 MythInfoMapEvent me("UPNP_EVENT", results); 00262 dispatch(me); 00263 } 00264 00265 return true; 00266 } 00267 00268 bool UPNPSubscription::SendUnsubscribeRequest(const QString &usn, 00269 const QUrl &url, 00270 const QString &path, 00271 const QString &uuid) 00272 { 00273 bool success = false; 00274 QString host = url.host(); 00275 int port = url.port(); 00276 00277 QByteArray sub; 00278 QTextStream data(&sub); 00279 data.setCodec(QTextCodec::codecForName("UTF-8")); 00280 // N.B. Play On needs an extra space between UNSUBSCRIBE and path... 00281 data << QString("UNSUBSCRIBE %1 HTTP/1.1\r\n").arg(path); 00282 data << QString("HOST: %1:%2\r\n").arg(host).arg(QString::number(port)); 00283 data << QString("SID: uuid:%1\r\n").arg(uuid); 00284 data << "\r\n"; 00285 data.flush(); 00286 00287 LOG(VB_UPNP, LOG_DEBUG, LOC + "\n\n" + sub); 00288 00289 MSocketDevice *sockdev = new MSocketDevice(MSocketDevice::Stream); 00290 BufferedSocketDevice *sock = new BufferedSocketDevice(sockdev); 00291 sockdev->setBlocking(true); 00292 00293 if (sock->Connect(QHostAddress(host), port)) 00294 { 00295 if (sock->WriteBlockDirect(sub.constData(), sub.size()) != -1) 00296 { 00297 QString line = sock->ReadLine(MAX_WAIT); 00298 success = !line.isEmpty(); 00299 } 00300 else 00301 { 00302 LOG(VB_GENERAL, LOG_ERR, LOC + 00303 QString("Socket write error for %1:%2") .arg(host).arg(port)); 00304 } 00305 sock->Close(); 00306 } 00307 else 00308 { 00309 LOG(VB_GENERAL, LOG_ERR, LOC + 00310 QString("Failed to open socket for %1:%2") .arg(host).arg(port)); 00311 } 00312 00313 delete sock; 00314 delete sockdev; 00315 if (success) 00316 LOG(VB_GENERAL, LOG_INFO, LOC + QString("Unsubscribed to %1").arg(usn)); 00317 else 00318 LOG(VB_UPNP, LOG_WARNING, LOC + QString("Failed to unsubscribe to %1") 00319 .arg(usn)); 00320 return success; 00321 } 00322 00323 int UPNPSubscription::SendSubscribeRequest(const QString &callback, 00324 const QString &usn, 00325 const QUrl &url, 00326 const QString &path, 00327 const QString &uuidin, 00328 QString &uuidout) 00329 { 00330 QString host = url.host(); 00331 int port = url.port(); 00332 00333 QByteArray sub; 00334 QTextStream data(&sub); 00335 data.setCodec(QTextCodec::codecForName("UTF-8")); 00336 // N.B. Play On needs an extra space between SUBSCRIBE and path... 00337 data << QString("SUBSCRIBE %1 HTTP/1.1\r\n").arg(path); 00338 data << QString("HOST: %1:%2\r\n").arg(host).arg(QString::number(port)); 00339 00340 00341 if (uuidin.isEmpty()) // new subscription 00342 { 00343 data << QString("CALLBACK: <%1%2>\r\n") 00344 .arg(callback).arg(usn); 00345 data << "NT: upnp:event\r\n"; 00346 } 00347 else // renewal 00348 data << QString("SID: uuid:%1\r\n").arg(uuidin); 00349 00350 data << QString("TIMEOUT: Second-%1\r\n").arg(SUBSCRIPTION_TIME); 00351 data << "\r\n"; 00352 data.flush(); 00353 00354 LOG(VB_UPNP, LOG_DEBUG, LOC + "\n\n" + sub); 00355 00356 MSocketDevice *sockdev = new MSocketDevice(MSocketDevice::Stream); 00357 BufferedSocketDevice *sock = new BufferedSocketDevice(sockdev); 00358 sockdev->setBlocking(true); 00359 00360 QString uuid; 00361 QString timeout; 00362 uint result = 0; 00363 00364 if (sock->Connect(QHostAddress(host), port)) 00365 { 00366 if (sock->WriteBlockDirect(sub.constData(), sub.size()) != -1) 00367 { 00368 bool ok = false; 00369 QString line = sock->ReadLine(MAX_WAIT); 00370 while (!line.isEmpty()) 00371 { 00372 LOG(VB_UPNP, LOG_DEBUG, LOC + line); 00373 if (line.contains("HTTP/1.1 200 OK", Qt::CaseInsensitive)) 00374 ok = true; 00375 if (line.startsWith("SID:", Qt::CaseInsensitive)) 00376 uuid = line.mid(4).trimmed().mid(5).trimmed(); 00377 if (line.startsWith("TIMEOUT:", Qt::CaseInsensitive)) 00378 timeout = line.mid(8).trimmed().mid(7).trimmed(); 00379 if (ok && !uuid.isEmpty() && !timeout.isEmpty()) 00380 break; 00381 line = sock->ReadLine(MAX_WAIT); 00382 } 00383 00384 if (ok && !uuid.isEmpty() && !timeout.isEmpty()) 00385 { 00386 uuidout = uuid; 00387 result = timeout.toUInt(); 00388 } 00389 else 00390 { 00391 LOG(VB_GENERAL, LOG_ERR, LOC + 00392 QString("Failed to subscribe to %1").arg(usn)); 00393 } 00394 } 00395 else 00396 { 00397 LOG(VB_GENERAL, LOG_ERR, LOC + 00398 QString("Socket write error for %1:%2") .arg(host).arg(port)); 00399 } 00400 sock->Close(); 00401 } 00402 else 00403 { 00404 LOG(VB_GENERAL, LOG_ERR, LOC + 00405 QString("Failed to open socket for %1:%2") .arg(host).arg(port)); 00406 } 00407 00408 delete sock; 00409 delete sockdev; 00410 return result; 00411 }
1.7.6.1