前面说到channel与router的建立,实际上channel是node服务器与底层C++之间信令交互的通道。transport才是router给用户推流拉流的通道,而创建完Router过后就开始创建Transport了
用户想要加入房间,这时在应用层便会发送creat transport 的信令,传递给Worker
当Worker处理不了用户的请求时,它便把消息传递下一层,即Router
request便传递给router->HandleRequest这个函数
default: { // This may throw. RTC::Router* router = GetRouterFromInternal(request->internal); router->HandleRequest(request); break; }进入到Router这层,系统想要create transport,由于transport是个基类,它实际上是有很多个子类
case Channel::Request::MethodId::ROUTER_CREATE_WEBRTC_TRANSPORT: { std::string transportId; // This may throw. SetNewTransportIdFromInternal(request->internal, transportId); // This may throw. auto* webRtcTransport = new RTC::WebRtcTransport(transportId, this, request->data); // Insert into the map. this->mapTransports[transportId] = webRtcTransport; MS_DEBUG_DEV("WebRtcTransport created [transportId:%s]", transportId.c_str()); json data = json::object(); webRtcTransport->FillJson(data); request->Accept(data); break; }进入SetNewTransportIdFromInternal这个函数
这段代码就是从internal中搜索transport的ID,找到ID后返回
回到Router后,执行auto* webRtcTransport = new RTC::WebRtcTransport(transportId, this, request->data); 接着进入RTC::WebRtcTransport
从源码来看,这个构造函数的前半部分是解析json数据,它包括
enableUdpenableTcppreferUdppreferTcplistenIps等等 WebRtcTransport::WebRtcTransp ort(const std::string& id, RTC::Transport::Listener* listener, json& data) : RTC::Transport::Transport(id, listener, data) { MS_TRACE(); bool enableUdp{ true }; auto jsonEnableUdpIt = data.find("enableUdp"); if (jsonEnableUdpIt != data.end()) { if (!jsonEnableUdpIt->is_boolean()) MS_THROW_TYPE_ERROR("wrong enableUdp (not a boolean)"); enableUdp = jsonEnableUdpIt->get<bool>(); } bool enableTcp{ false }; auto jsonEnableTcpIt = data.find("enableTcp"); if (jsonEnableTcpIt != data.end()) { if (!jsonEnableTcpIt->is_boolean()) MS_THROW_TYPE_ERROR("wrong enableTcp (not a boolean)"); enableTcp = jsonEnableTcpIt->get<bool>(); } bool preferUdp{ false }; auto jsonPreferUdpIt = data.find("preferUdp"); if (jsonPreferUdpIt != data.end()) { if (!jsonPreferUdpIt->is_boolean()) MS_THROW_TYPE_ERROR("wrong preferUdp (not a boolean)"); preferUdp = jsonPreferUdpIt->get<bool>(); } bool preferTcp{ false }; auto jsonPreferTcpIt = data.find("preferTcp"); if (jsonPreferTcpIt != data.end()) { if (!jsonPreferTcpIt->is_boolean()) MS_THROW_TYPE_ERROR("wrong preferTcp (not a boolean)"); preferTcp = jsonPreferTcpIt->get<bool>(); } auto jsonListenIpsIt = data.find("listenIps"); if (jsonListenIpsIt == data.end()) MS_THROW_TYPE_ERROR("missing listenIps"); else if (!jsonListenIpsIt->is_array()) MS_THROW_TYPE_ERROR("wrong listenIps (not an array)"); else if (jsonListenIpsIt->empty()) MS_THROW_TYPE_ERROR("wrong listenIps (empty array)"); else if (jsonListenIpsIt->size() > 8) MS_THROW_TYPE_ERROR("wrong listenIps (too many IPs)"); std::vector<ListenIp> listenIps(jsonListenIpsIt->size()); for (size_t i{ 0 }; i < jsonListenIpsIt->size(); ++i) { auto& jsonListenIp = (*jsonListenIpsIt)[i]; auto& listenIp = listenIps[i]; if (!jsonListenIp.is_object()) MS_THROW_TYPE_ERROR("wrong listenIp (not an object)"); auto jsonIpIt = jsonListenIp.find("ip"); if (jsonIpIt == jsonListenIp.end()) MS_THROW_TYPE_ERROR("missing listenIp.ip"); else if (!jsonIpIt->is_string()) MS_THROW_TYPE_ERROR("wrong listenIp.ip (not an string"); listenIp.ip.assign(jsonIpIt->get<std::string>()); // This may throw. Utils::IP::NormalizeIp(listenIp.ip); auto jsonAnnouncedIpIt = jsonListenIp.find("announcedIp"); if (jsonAnnouncedIpIt != jsonListenIp.end()) { if (!jsonAnnouncedIpIt->is_string()) MS_THROW_TYPE_ERROR("wrong listenIp.announcedIp (not an string)"); listenIp.announcedIp.assign(jsonAnnouncedIpIt->get<std::string>()); } }解析完数据后,后半部分通过ip生成服务端的candidate,利用listenIp : listenIps来控制,有candidate就有权限
ICE通过IceServer创建,GetRandomString(16),就是产生一个16位的随机值,32就是32位的随机值
进入IceServer的构造函数可以发现,当有用户连接这个Transport时,将会生成16位的用户名和32位的密码
try { uint16_t iceLocalPreferenceDecrement{ 0 }; if (enableUdp && enableTcp) this->iceCandidates.reserve(2 * jsonListenIpsIt->size()); else this->iceCandidates.reserve(jsonListenIpsIt->size()); for (auto& listenIp : listenIps) { if (enableUdp) { uint16_t iceLocalPreference = IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement; if (preferUdp) iceLocalPreference += 1000; uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference); // This may throw. auto* udpSocket = new RTC::UdpSocket(this, listenIp.ip); this->udpSockets[udpSocket] = listenIp.announcedIp; if (listenIp.announcedIp.empty()) this->iceCandidates.emplace_back(udpSocket, icePriority); else this->iceCandidates.emplace_back(udpSocket, icePriority, listenIp.announcedIp); } if (enableTcp) { uint16_t iceLocalPreference = IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement; if (preferTcp) iceLocalPreference += 1000; uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference); // This may throw. auto* tcpServer = new RTC::TcpServer(this, this, listenIp.ip); this->tcpServers[tcpServer] = listenIp.announcedIp; if (listenIp.announcedIp.empty()) this->iceCandidates.emplace_back(tcpServer, icePriority); else this->iceCandidates.emplace_back(tcpServer, icePriority, listenIp.announcedIp); } // Decrement initial ICE local preference for next IP. iceLocalPreferenceDecrement += 100; } // Create a ICE server. this->iceServer = new RTC::IceServer( this, Utils::Crypto::GetRandomString(16), Utils::Crypto::GetRandomString(32)); // Create a DTLS transport. this->dtlsTransport = new RTC::DtlsTransport(this); } catch (const MediaSoupError& error) { // Must delete everything since the destructor won't be called. delete this->dtlsTransport; this->dtlsTransport = nullptr; delete this->iceServer; this->iceServer = nullptr; for (auto& kv : this->udpSockets) { auto* udpSocket = kv.first; delete udpSocket; } this->udpSockets.clear(); for (auto& kv : this->tcpServers) { auto* tcpServer = kv.first; delete tcpServer; } this->tcpServers.clear(); this->iceCandidates.clear(); throw; } }它还创建了一个DTLS transport 的对象
在DtlsTransport中,首先创建了一个SSL对象,通过sslCtx上下文,因为在上下文中是有证书和私钥的
SSL就相当于一个连接,类似socket。SSL本身也有一些数据,创建的时候通过this能够获取到这些参数
然后通过创建BIO(Buffer IO)来形成两个管道,通过管道分别实现接、收数据的功能,再通过SSL_set_bio(this->ssl, this->sslBioFromNetwork, this->sslBioToNetwork);完成SSL对两个管道的绑定
接着还设置timer,用来控制DTLS握手的时间,超时后会做其他处理
DtlsTransport::DtlsTransport(Listener* listener) : listener(listener) { MS_TRACE(); /* Set SSL. */ this->ssl = SSL_new(DtlsTransport::sslCtx); if (!this->ssl) { LOG_OPENSSL_ERROR("SSL_new() failed"); goto error; } // Set this as custom data. SSL_set_ex_data(this->ssl, 0, static_cast<void*>(this)); this->sslBioFromNetwork = BIO_new(BIO_s_mem()); if (!this->sslBioFromNetwork) { LOG_OPENSSL_ERROR("BIO_new() failed"); SSL_free(this->ssl); goto error; } this->sslBioToNetwork = BIO_new(BIO_s_mem()); if (!this->sslBioToNetwork) { LOG_OPENSSL_ERROR("BIO_new() failed"); BIO_free(this->sslBioFromNetwork); SSL_free(this->ssl); goto error; } SSL_set_bio(this->ssl, this->sslBioFromNetwork, this->sslBioToNetwork); // Set the MTU so that we don't send packets that are too large with no fragmentation. SSL_set_mtu(this->ssl, DtlsMtu); DTLS_set_link_mtu(this->ssl, DtlsMtu); // Set callback handler for setting DTLS timer interval. DTLS_set_timer_cb(this->ssl, onSslDtlsTimer); // Set the DTLS timer. this->timer = new Timer(this); return; error: // NOTE: At this point SSL_set_bio() was not called so we must free BIOs as // well. if (this->sslBioFromNetwork) BIO_free(this->sslBioFromNetwork); if (this->sslBioToNetwork) BIO_free(this->sslBioToNetwork); if (this->ssl) SSL_free(this->ssl); // NOTE: If this is not catched by the caller the program will abort, but // this should never happen. MS_THROW_ERROR("DtlsTransport instance creation failed"); }接下来进入HandleRequest,本文跳转至 第二章第2节
首先在main.cpp中各对与connect相关的所有模块进行了初始化
// Initialize static stuff. DepOpenSSL::ClassInit(); DepLibSRTP::ClassInit(); DepUsrSCTP::ClassInit(); DepLibWebRTC::ClassInit(); Utils::Crypto::ClassInit(); RTC::DtlsTransport::ClassInit(); RTC::SrtpSession::ClassInit(); Channel::Notifier::ClassInit(channel); PayloadChannel::Notifier::ClassInit(payloadChannel);这部分主要是对加密算法的随机数的生成
这部分调用了srtp_init 这个API,这个API的返回值对后面Session的初始化有着重要的作用
创建了Crypto的种子,使用的算法是HMAC,其中H代表的是hash哈希
在代码中首先判断有没有证书和私钥,若没有便通过 GenerateCertificateAndPrivateKey 这个函数产生一个证书和私钥
这部分代码一连串的if语句,调用API创建了证书,生成其版本、序列号等等
// Create the X509 certificate. DtlsTransport::certificate = X509_new();若有自己的证书和API,则进入这个函数
函数体用的开始是用文件类型打开,接着调用API读取证书和私钥
void DtlsTransport::ReadCertificateAndPrivateKeyFromFiles() { MS_TRACE(); FILE* file{ nullptr }; file = fopen(Settings::configuration.dtlsCertificateFile.c_str(), "r"); if (!file) { MS_ERROR("error reading DTLS certificate file: %s", std::strerror(errno)); goto error; } DtlsTransport::certificate = PEM_read_X509(file, nullptr, nullptr, nullptr); if (!DtlsTransport::certificate) { LOG_OPENSSL_ERROR("PEM_read_X509() failed"); goto error; } fclose(file); file = fopen(Settings::configuration.dtlsPrivateKeyFile.c_str(), "r"); if (!file) { MS_ERROR("error reading DTLS private key file: %s", std::strerror(errno)); goto error; } DtlsTransport::privateKey = PEM_read_PrivateKey(file, nullptr, nullptr, nullptr); if (!DtlsTransport::privateKey) { LOG_OPENSSL_ERROR("PEM_read_PrivateKey() failed"); goto error; } fclose(file); return; error: MS_THROW_ERROR("error reading DTLS certificate and private key PEM files"); }拿到证书和私钥后,将会创建一个SSL上下文。首先对Dtls的版本做了判断
// Both DTLS 1.0 and 1.2 (requires OpenSSL >= 1.1.0). DtlsTransport::sslCtx = SSL_CTX_new(DTLS_method());然后会把证书与上下文绑定
ret = SSL_CTX_use_certificate(DtlsTransport::sslCtx, DtlsTransport::certificate);key也会与上下文绑定
ret = SSL_CTX_use_PrivateKey(DtlsTransport::sslCtx, DtlsTransport::privateKey);接着对key做了检查,设置了一些选项
// Set options. SSL_CTX_set_options( DtlsTransport::sslCtx, SSL_OP_CIPHER_SERVER_PREFERENCE | SSL_OP_NO_TICKET | SSL_OP_SINGLE_ECDH_USE | SSL_OP_NO_QUERY_MTU);设置ciphers,它是一个密语的套件,具体是哪些加密算法就是通过它来设置的,用一个字符串来描述
// Set ciphers. ret = SSL_CTX_set_cipher_list( DtlsTransport::sslCtx, "DEFAULT:!NULL:!aNULL:!SHA256:!SHA384:!aECDH:!AESGCM+AES256:!aPSK");除此之外,Dtls使用srtp来加密解密。这个for循环通过对srtpCryptoSuites的遍历,累加在dtlsSrtpCryptoSuites后面,然后利用SSL_CTX_set_tlsext_use_srtp这个API设置进去
// Set the "use_srtp" DTLS extension. for (auto it = DtlsTransport::srtpCryptoSuites.begin(); it != DtlsTransport::srtpCryptoSuites.end(); ++it) { if (it != DtlsTransport::srtpCryptoSuites.begin()) dtlsSrtpCryptoSuites += ":"; SrtpCryptoSuiteMapEntry* cryptoSuiteEntry = std::addressof(*it); dtlsSrtpCryptoSuites += cryptoSuiteEntry->name; }指纹是通过证书来产生的,这个函数体内定义了多种产生指纹的算法,如
SHA1SHA224SHA256SHA384SHA512对于每种不同的算法它都有对应的函数,然后通过ret = X509_digest(DtlsTransport::certificate, hashFunction, binaryFingerprint, &size);这个API计算证书的指纹,结果存放在binaryFingerprint里
接着把binaryFingerprint转化为十六进制的格式std::sprintf(hexFingerprint + (i * 3), "%.2X:", binaryFingerprint[i]);
完成后把结果的值存放进fingerprint.value,使用的算法是fingerprint.algorithm
最后再把fingerprint这个对象存放进localFingerprints
这样指纹便创建好了
Session这块代码主要是在OnSrtpEvent()这个函数中定义了对不同事件的处理
如ssrc冲突、软件硬件的限制,主要是打印错误信息
当以上准备工作做好后了,WebRtcTransport要给客户端返回一个数据data,进入WebRtcTransport::FillJson
在FillJson里,iceRole总是设为被控制端
当创建一个transport时,会收到从服务端返回的ice_ufrag和ice_password
iceLite是指使用的ICE不是完整的ICE,而是其中部分ICE协议
// Add iceCandidates. jsonObject["iceCandidates"] = json::array(); auto jsonIceCandidatesIt = jsonObject.find("iceCandidates"); for (size_t i{ 0 }; i < this->iceCandidates.size(); ++i) { jsonIceCandidatesIt->emplace_back(json::value_t::object); auto& jsonEntry = (*jsonIceCandidatesIt)[i]; auto& iceCandidate = this->iceCandidates[i]; iceCandidate.FillJson(jsonEntry); }我们还从服务端收到了许多IP,它们形成了candidate,然后把这些candidate组成数组放入json中
// Add iceState. switch (this->iceServer->GetState()) { case RTC::IceServer::IceState::NEW: jsonObject["iceState"] = "new"; break; case RTC::IceServer::IceState::CONNECTED: jsonObject["iceState"] = "connected"; break; case RTC::IceServer::IceState::COMPLETED: jsonObject["iceState"] = "completed"; break; case RTC::IceServer::IceState::DISCONNECTED: jsonObject["iceState"] = "disconnected"; break; }ICE的状态,一般为new
// Add dtlsParameters. jsonObject["dtlsParameters"] = json::object(); auto jsonDtlsParametersIt = jsonObject.find("dtlsParameters"); // Add dtlsParameters.fingerprints. (*jsonDtlsParametersIt)["fingerprints"] = json::array(); auto jsonDtlsParametersFingerprintsIt = jsonDtlsParametersIt->find("fingerprints"); auto& fingerprints = this->dtlsTransport->GetLocalFingerprints(); for (size_t i{ 0 }; i < fingerprints.size(); ++i) { jsonDtlsParametersFingerprintsIt->emplace_back(json::value_t::object); auto& jsonEntry = (*jsonDtlsParametersFingerprintsIt)[i]; auto& fingerprint = fingerprints[i]; jsonEntry["algorithm"] = RTC::DtlsTransport::GetFingerprintAlgorithmString(fingerprint.algorithm); jsonEntry["value"] = fingerprint.value; }dtls的参数,其中最重要的是fingerprint,它是一组数据(因为用了不同的加密算法),然后形成一个数组把它塞入fingerprints,同时传给客户端,客户端就拿到了各种指纹和证书了
// Add dtlsParameters.role. switch (this->dtlsRole) { case RTC::DtlsTransport::Role::NONE: (*jsonDtlsParametersIt)["role"] = "none"; break; case RTC::DtlsTransport::Role::AUTO: (*jsonDtlsParametersIt)["role"] = "auto"; break; case RTC::DtlsTransport::Role::CLIENT: (*jsonDtlsParametersIt)["role"] = "client"; break; case RTC::DtlsTransport::Role::SERVER: (*jsonDtlsParametersIt)["role"] = "server"; break; } // Add dtlsState. switch (this->dtlsTransport->GetState()) { case RTC::DtlsTransport::DtlsState::NEW: jsonObject["dtlsState"] = "new"; break; case RTC::DtlsTransport::DtlsState::CONNECTING: jsonObject["dtlsState"] = "connecting"; break; case RTC::DtlsTransport::DtlsState::CONNECTED: jsonObject["dtlsState"] = "connected"; break; case RTC::DtlsTransport::DtlsState::FAILED: jsonObject["dtlsState"] = "failed"; break; case RTC::DtlsTransport::DtlsState::CLOSED: jsonObject["dtlsState"] = "closed"; break; } }除此之外还有dtls的角色和传输状态
以上所有信息都会一起打包传给应用层,应用层就拿到了服务端的信息,然后就可以进行验证
当收到客户端发送过来的TRANSPORT_CONNECT请求时,会创建远端的dtlsRemoteFingerprint和远端的dtlsRemoteRole,然后在for循环中找寻dtlsRemoteFingerprint.algorithm所使用的算法,以及对应的值jsonValueIt
这样客户端就拿到了证书的指纹、证书的值以及使用的算法
这些都配置好后便进入MayRunDtlsTransport()
这个函数会根据Dtls的角色来进行不同的操作
void WebRtcTransport::MayRunDtlsTransport() { MS_TRACE(); // Do nothing if we have the same local DTLS role as the DTLS transport. // NOTE: local role in DTLS transport can be NONE, but not ours. if (this->dtlsTransport->GetLocalRole() == this->dtlsRole) return; // Check our local DTLS role. switch (this->dtlsRole) { // If still 'auto' then transition to 'server' if ICE is 'connected' or // 'completed'. case RTC::DtlsTransport::Role::AUTO: { // clang-format off if ( this->iceServer->GetState() == RTC::IceServer::IceState::CONNECTED || this->iceServer->GetState() == RTC::IceServer::IceState::COMPLETED ) // clang-format on { MS_DEBUG_TAG( dtls, "transition from DTLS local role 'auto' to 'server' and running DTLS transport"); this->dtlsRole = RTC::DtlsTransport::Role::SERVER; this->dtlsTransport->Run(RTC::DtlsTransport::Role::SERVER); } break; } // 'client' is only set if a 'connect' request was previously called with // remote DTLS role 'server'. // // If 'client' then wait for ICE to be 'completed' (got USE-CANDIDATE). // // NOTE: This is the theory, however let's be more flexible as told here: // https://bugs.chromium.org/p/webrtc/issues/detail?id=3661 case RTC::DtlsTransport::Role::CLIENT: { // clang-format off if ( this->iceServer->GetState() == RTC::IceServer::IceState::CONNECTED || this->iceServer->GetState() == RTC::IceServer::IceState::COMPLETED ) // clang-format on { MS_DEBUG_TAG(dtls, "running DTLS transport in local role 'client'"); this->dtlsTransport->Run(RTC::DtlsTransport::Role::CLIENT); } break; } // If 'server' then run the DTLS transport if ICE is 'connected' (not yet // USE-CANDIDATE) or 'completed'. case RTC::DtlsTransport::Role::SERVER: { // clang-format off if ( this->iceServer->GetState() == RTC::IceServer::IceState::CONNECTED || this->iceServer->GetState() == RTC::IceServer::IceState::COMPLETED ) // clang-format on { MS_DEBUG_TAG(dtls, "running DTLS transport in local role 'server'"); this->dtlsTransport->Run(RTC::DtlsTransport::Role::SERVER); } break; } case RTC::DtlsTransport::Role::NONE: { MS_ABORT("local DTLS role not set"); } } }以客户端为例,进入this->dtlsTransport->Run()
// Set state and notify the listener. this->state = DtlsState::CONNECTING; this->listener->OnDtlsTransportConnecting(this);先将Dtls状态设置为CONNECTING,然后把dtlsTransport对象设置给侦听者(WebRtcTransport)
case Role::CLIENT: { MS_DEBUG_TAG(dtls, "running [role:client]"); SSL_set_connect_state(this->ssl); SSL_do_handshake(this->ssl); SendPendingOutgoingDtlsData(); SetTimeout(); break; }当用户发送连接请求,dtlsTransport的角色为客户端时,dtls就会主动发起连接请求调用SSL_set_connect_state这个API,将客户端当作dtls的服务端,对方收到这个连接请求时,就开始握手,调用SSL_do_handshake这个API主动握手,接着调用SendPendingOutgoingDtlsData()发送dtls的一些数据
case Role::SERVER: { MS_DEBUG_TAG(dtls, "running [role:server]"); SSL_set_accept_state(this->ssl); SSL_do_handshake(this->ssl); break; }若dtlsTransport的角色为服务端时,就会把SSL设置为阻塞模式,然后等待握手
通过以上便建立起了连接
建立连接的结果会赋给json data = json::object();
然后通过request->Accept(data);反馈给应用层
进入SetNewProducerIdFromInternal这个函数
搜索producerId,拿到ID后返回,进入构造函数RTC::Producer
在这个构造函数中,主要也是对json数据data进行解析,它包括
kind:audio和videortpparameters:mid、codecs、headerExtensions、encodings、rtcp等等 Producer::Producer(const std::string& id, RTC::Producer::Listener* listener, json& data) : id(id), listener(listener) { MS_TRACE(); auto jsonKindIt = data.find("kind"); if (jsonKindIt == data.end() || !jsonKindIt->is_string()) MS_THROW_TYPE_ERROR("missing kind"); // This may throw. this->kind = RTC::Media::GetKind(jsonKindIt->get<std::string>()); if (this->kind == RTC::Media::Kind::ALL) MS_THROW_TYPE_ERROR("invalid empty kind"); auto jsonRtpParametersIt = data.find("rtpParameters"); if (jsonRtpParametersIt == data.end() || !jsonRtpParametersIt->is_object()) MS_THROW_TYPE_ERROR("missing rtpParameters"); // This may throw. this->rtpParameters = RTC::RtpParameters(*jsonRtpParametersIt); // Evaluate type. this->type = RTC::RtpParameters::GetType(this->rtpParameters); // Reserve a slot in rtpStreamByEncodingIdx and rtpStreamsScores vectors // for each RTP stream. this->rtpStreamByEncodingIdx.resize(this->rtpParameters.encodings.size(), nullptr); this->rtpStreamScores.resize(this->rtpParameters.encodings.size(), 0u); auto& encoding = this->rtpParameters.encodings[0]; auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding); if (!RTC::Codecs::Tools::IsValidTypeForCodec(this->type, mediaCodec->mimeType)) { MS_THROW_TYPE_ERROR( "%s codec not supported for %s", mediaCodec->mimeType.ToString().c_str(), RTC::RtpParameters::GetTypeString(this->type).c_str()); } auto jsonRtpMappingIt = data.find("rtpMapping"); if (jsonRtpMappingIt == data.end() || !jsonRtpMappingIt->is_object()) MS_THROW_TYPE_ERROR("missing rtpMapping"); auto jsonCodecsIt = jsonRtpMappingIt->find("codecs"); if (jsonCodecsIt == jsonRtpMappingIt->end() || !jsonCodecsIt->is_array()) MS_THROW_TYPE_ERROR("missing rtpMapping.codecs"); for (auto& codec : *jsonCodecsIt) { if (!codec.is_object()) MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.codecs (not an object)"); auto jsonPayloadTypeIt = codec.find("payloadType"); // clang-format off if ( jsonPayloadTypeIt == codec.end() || !Utils::Json::IsPositiveInteger(*jsonPayloadTypeIt) ) // clang-format on { MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.codecs (missing payloadType)"); } auto jsonMappedPayloadTypeIt = codec.find("mappedPayloadType"); // clang-format off if ( jsonMappedPayloadTypeIt == codec.end() || !Utils::Json::IsPositiveInteger(*jsonMappedPayloadTypeIt) ) // clang-format on { MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.codecs (missing mappedPayloadType)"); } this->rtpMapping.codecs[jsonPayloadTypeIt->get<uint8_t>()] = jsonMappedPayloadTypeIt->get<uint8_t>(); } auto jsonEncodingsIt = jsonRtpMappingIt->find("encodings"); if (jsonEncodingsIt == jsonRtpMappingIt->end() || !jsonEncodingsIt->is_array()) { MS_THROW_TYPE_ERROR("missing rtpMapping.encodings"); } this->rtpMapping.encodings.reserve(jsonEncodingsIt->size()); for (auto& encoding : *jsonEncodingsIt) { if (!encoding.is_object()) MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.encodings"); this->rtpMapping.encodings.emplace_back(); auto& encodingMapping = this->rtpMapping.encodings.back(); // ssrc is optional. auto jsonSsrcIt = encoding.find("ssrc"); // clang-format off if ( jsonSsrcIt != encoding.end() && Utils::Json::IsPositiveInteger(*jsonSsrcIt) ) // clang-format on { encodingMapping.ssrc = jsonSsrcIt->get<uint32_t>(); } // rid is optional. auto jsonRidIt = encoding.find("rid"); if (jsonRidIt != encoding.end() && jsonRidIt->is_string()) encodingMapping.rid = jsonRidIt->get<std::string>(); // However ssrc or rid must be present (if more than 1 encoding). // clang-format off if ( jsonEncodingsIt->size() > 1 && jsonSsrcIt == encoding.end() && jsonRidIt == encoding.end() ) // clang-format on { MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.encodings (missing ssrc or rid)"); } // If there is no mid and a single encoding, ssrc or rid must be present. // clang-format off if ( this->rtpParameters.mid.empty() && jsonEncodingsIt->size() == 1 && jsonSsrcIt == encoding.end() && jsonRidIt == encoding.end() ) // clang-format on { MS_THROW_TYPE_ERROR( "wrong entry in rtpMapping.encodings (missing ssrc or rid, or rtpParameters.mid)"); } // mappedSsrc is mandatory. auto jsonMappedSsrcIt = encoding.find("mappedSsrc"); // clang-format off if ( jsonMappedSsrcIt == encoding.end() || !Utils::Json::IsPositiveInteger(*jsonMappedSsrcIt) ) // clang-format on { MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.encodings (missing mappedSsrc)"); } encodingMapping.mappedSsrc = jsonMappedSsrcIt->get<uint32_t>(); } auto jsonPausedIt = data.find("paused"); if (jsonPausedIt != data.end() && jsonPausedIt->is_boolean()) this->paused = jsonPausedIt->get<bool>(); // The number of encodings in rtpParameters must match the number of encodings // in rtpMapping. if (this->rtpParameters.encodings.size() != this->rtpMapping.encodings.size()) { MS_THROW_TYPE_ERROR("rtpParameters.encodings size does not match rtpMapping.encodings size"); } // Fill RTP header extension ids. // This may throw. for (auto& exten : this->rtpParameters.headerExtensions) { if (exten.id == 0u) MS_THROW_TYPE_ERROR("RTP extension id cannot be 0"); if (this->rtpHeaderExtensionIds.mid == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::MID) { this->rtpHeaderExtensionIds.mid = exten.id; } if (this->rtpHeaderExtensionIds.rid == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::RTP_STREAM_ID) { this->rtpHeaderExtensionIds.rid = exten.id; } if (this->rtpHeaderExtensionIds.rrid == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::REPAIRED_RTP_STREAM_ID) { this->rtpHeaderExtensionIds.rrid = exten.id; } if (this->rtpHeaderExtensionIds.absSendTime == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::ABS_SEND_TIME) { this->rtpHeaderExtensionIds.absSendTime = exten.id; } if (this->rtpHeaderExtensionIds.transportWideCc01 == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::TRANSPORT_WIDE_CC_01) { this->rtpHeaderExtensionIds.transportWideCc01 = exten.id; } // NOTE: Remove this once framemarking draft becomes RFC. if (this->rtpHeaderExtensionIds.frameMarking07 == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING_07) { this->rtpHeaderExtensionIds.frameMarking07 = exten.id; } if (this->rtpHeaderExtensionIds.frameMarking == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING) { this->rtpHeaderExtensionIds.frameMarking = exten.id; } if (this->rtpHeaderExtensionIds.ssrcAudioLevel == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::SSRC_AUDIO_LEVEL) { this->rtpHeaderExtensionIds.ssrcAudioLevel = exten.id; } if (this->rtpHeaderExtensionIds.videoOrientation == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::VIDEO_ORIENTATION) { this->rtpHeaderExtensionIds.videoOrientation = exten.id; } if (this->rtpHeaderExtensionIds.toffset == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::TOFFSET) { this->rtpHeaderExtensionIds.toffset = exten.id; } }当RTC::Producer构造完成,返回Transport.cpp执行cpp this->rtpListener.AddProducer(producer);
这会把producer加入rtpListener,接着继续执行cpp this->listener->OnTransportNewProducer(this, producer);
这个listener是Router的侦听者,返回Router::OnTransportNewProducer
创建好producer之后,当有数据传来时,便可以根据ssrc找到它对应的producer,然后由producer查看哪些consumer订阅了它,接着就转发数据流了
在Transport.cpp里进入下面的case
case Channel::Request::MethodId::TRANSPORT_CONSUME在这个case里,会获取到producerId auto jsonProducerIdIt = request->internal.find("producerId");
然后还会获取到consumerId,不过它是调用的一个函数 SetNewConsumerIdFromInternal(request->internal, consumerId);
接着就开始创建consumer,由于它是多态的,所以会进入一个switch case,它有4种类型:
SIMPLE:简单的consumerSIMULCAST:SVCPIPE switch (type) { case RTC::RtpParameters::Type::NONE: { MS_THROW_TYPE_ERROR("invalid type 'none'"); break; } case RTC::RtpParameters::Type::SIMPLE: { // This may throw. consumer = new RTC::SimpleConsumer(consumerId, producerId, this, request->data); break; } case RTC::RtpParameters::Type::SIMULCAST: { // This may throw. consumer = new RTC::SimulcastConsumer(consumerId, producerId, this, request->data); break; } case RTC::RtpParameters::Type::SVC: { // This may throw. consumer = new RTC::SvcConsumer(consumerId, producerId, this, request->data); break; } case RTC::RtpParameters::Type::PIPE: { // This may throw. consumer = new RTC::PipeConsumer(consumerId, producerId, this, request->data); break; } }以SIMPLE为例,进入RTC::SimpleConsumer
SimpleConsumer::SimpleConsumer( const std::string& id, const std::string& producerId, RTC::Consumer::Listener* listener, json& data) : RTC::Consumer::Consumer(id, producerId, listener, data, RTC::RtpParameters::Type::SIMPLE) { MS_TRACE(); // Ensure there is a single encoding. if (this->consumableRtpEncodings.size() != 1u) MS_THROW_TYPE_ERROR("invalid consumableRtpEncodings with size != 1"); auto& encoding = this->rtpParameters.encodings[0]; const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding); this->keyFrameSupported = RTC::Codecs::Tools::CanBeKeyFrame(mediaCodec->mimeType); // Create RtpStreamSend instance for sending a single stream to the remote. CreateRtpStream(); }这段代码主要也是对data数据的解析,然后在代码的末尾段构造了一个Rtp数据流,进入CreateRtpStream()
Consumer的本质就是一个Rtp数据流
void SimpleConsumer::CreateRtpStream() { MS_TRACE(); auto& encoding = this->rtpParameters.encodings[0]; const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding); MS_DEBUG_TAG( rtp, "[ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]", encoding.ssrc, mediaCodec->payloadType); // Set stream params. RTC::RtpStream::Params params; params.ssrc = encoding.ssrc; params.payloadType = mediaCodec->payloadType; params.mimeType = mediaCodec->mimeType; params.clockRate = mediaCodec->clockRate; params.cname = this->rtpParameters.rtcp.cname; // Check in band FEC in codec parameters. if (mediaCodec->parameters.HasInteger("useinbandfec") && mediaCodec->parameters.GetInteger("useinbandfec") == 1) { MS_DEBUG_TAG(rtp, "in band FEC enabled"); params.useInBandFec = true; } // Check DTX in codec parameters. if (mediaCodec->parameters.HasInteger("usedtx") && mediaCodec->parameters.GetInteger("usedtx") == 1) { MS_DEBUG_TAG(rtp, "DTX enabled"); params.useDtx = true; } // Check DTX in the encoding. if (encoding.dtx) { MS_DEBUG_TAG(rtp, "DTX enabled"); params.useDtx = true; } for (const auto& fb : mediaCodec->rtcpFeedback) { if (!params.useNack && fb.type == "nack" && fb.parameter.empty()) { MS_DEBUG_2TAGS(rtp, rtcp, "NACK supported"); params.useNack = true; } else if (!params.usePli && fb.type == "nack" && fb.parameter == "pli") { MS_DEBUG_2TAGS(rtp, rtcp, "PLI supported"); params.usePli = true; } else if (!params.useFir && fb.type == "ccm" && fb.parameter == "fir") { MS_DEBUG_2TAGS(rtp, rtcp, "FIR supported"); params.useFir = true; } } // Create a RtpStreamSend for sending a single media stream. size_t bufferSize = params.useNack ? 600u : 0u; this->rtpStream = new RTC::RtpStreamSend(this, params, bufferSize); this->rtpStreams.push_back(this->rtpStream); // If the Consumer is paused, tell the RtpStreamSend. if (IsPaused() || IsProducerPaused()) this->rtpStream->Pause(); const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding); if (rtxCodec && encoding.hasRtx) this->rtpStream->SetRtx(rtxCodec->payloadType, encoding.rtx.ssrc); }通过创建params获得了5个参数,分别是
ssrcpayloadTypemimeTypeclockRatecname以及后面通过if判断来确定是否使用FEC、DTX,以及Feedback丢包重传机制是使用NACK、PLI、FIR的哪一种
接着创建了一个RTC::RtpStreamSend对象,传入的三个参数分别是
this——这个RtpStreamSend对象params——之前获得了5个参数bufferSize——缓存数组创建好了这个对象后,就把它压入rtpStreams中保存起来
当共享者的数据源源不断地传过来,mediasoup就会根据它的ssrc找到对应的生产者,通过生产者再找到对应的消费者,接着再通过RtpStreamSend发送给用户