首先,推流直播的配置文件如下:
# rtmp.conf
listen 1935;
max_connections 1000;
daemon off;
srs_log_tank console;
vhost __defaultVhost__ {
}
搭建的简陋直播步骤如下:
obs 推流过程分析可见如下链接:
下面即开始在第 8 步的基础上分析:vlc 连接 SRS,请求播放 obs 推流的的视频(前面建立连接的过程和上面 1~7 差不多)。
下面的分析会简化一下源码。
int SrsRtmpConn::stream_service_cycle()
{
int ret = ERROR_SUCCESS;
SrsRtmpConnType type;
/* 首先,鉴别客户端连接的类型,这里应该为 play */
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration))
!= ERROR_SUCCESS) {
...
}
/* 对 url、host 这些数据进行简化,如去掉空格或其他不必要的字符 */
req->strip();
/* 若配置文件中没有配置 security,则忽略 */
// sercurity check
if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
...
}
/* SRS 不支持空的流名称,因为对于 HLS 可能会通过空的流名称写到一个文件中 */
if (req->stream.empty()) {
...
}
/* 设置服务器 send/recv 的超时时间,这里都为 30*1000*1000LL */
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
/* 找到一个 source 来为该客户端提供服务 */
SrsSource* source = NULL;
if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) {
...
}
/* update the statistic when source disconveried. */
...
/* 若配置文件中没有配置 mode 的话,默认返回 false */
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
/* 若配置文件中没有配置 gop_cache,则默认开始 gop_cache */
bool enabled_cache = _srs_config->get_app_cache(req->vhost);
source->set_cache(enabled_cache);
/* 这里应为 SrsRtmpConnPlay */
client_type = type;
switch (type) {
case SrsRtmpConnPlay: {
/* response connection start play */
if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
...
}
/* 若配置文件中没有配置 http_hooks,则忽略该函数 */
if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
...
}
/* 这里开始向客户端发送 obs 推的流 */
ret = playing(source);
http_hooks_on_stop();
return ret;
}
...
}
return ret;
}
该函数是通过接收一些客户端发来的消息来鉴别该客户端的请求的类型:publish or play。
/*
* recv some mesage to identify the client.
* @stream_id, client will createStream to play or publish by flash,
* the stream_id used to response the createStream request.
* @type, output the client type.
* @stream_name, output the client publish/play stream name. @see: SrsRequest.stream
* @duration, output the play client duration. @see: SrsRequest.duration
*/
int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type,
string& stream_name, double& duration)
{
type = SrsRtmpConnUnknown;
int ret = ERROR_SUCCESS;
while (true) {
SrsCommonMessage* msg = NULL;
/* 获取一个完整的消息 */
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
...
}
SrsAutoFree(SrsCommonMessage, msg);
SrsMessageHeader& h = msg->header;
if (h.is_ackledgement() || h.is_set_chunk_size() ||
h.is_window_ackledgement_size() || h.is_user_control_message()) {
continue;
}
SrsPacket* pkt = NULL;
/* 解析该消息 */
if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
...
}
SrsAutoFree(SrsPacket, pkt);
/* 接收到的消息为 createStream(‘livestream‘) */
if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
srs_info("identify client by create stream, play or flash publish.");
return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt),
stream_id, type, stream_name, duration);
}
...
}
return ret;
}
该函数检测接收到的消息为 createStream 后,接着调用 identify_create_stream_client 做进一步的处理。
int SrsRtmpServer::identify_create_stream_client(SrsCreateStreamPacket* req, int stream_id,
SrsRtmpConnType& type, string& stream_name, double& duration)
{
int ret = ERROR_SUCCESS;
if (true) {
/* 构造一个用于响应 createStream 消息的类 */
SrsCreateStreamResPacket* pkt =
new SrsCreateStreamResPacket(req->transaction_id, stream_id);
if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
...
}
}
while (true) {
SrsCommonMessage* msg = NULL;
if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
...
}
SrsAutoFree(SrsCommonMessage, msg);
SrsMessageHeader& h = msg->header;
if (h.is_ackledgement() || h.is_set_chunk_size() ||
h.is_window_ackledgement_size() || h.is_user_control_message()) {
continue;
}
if (!h.is_amf0_command() && !h.is_amf3_command()) {
srs_trace("identify ignore messages except "
"AMF0/AMF3 command message. type=%#x", h.message_type);
continue;
}
SrsPacket* pkt = NULL;
if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
...
}
/* 直到接收到 play 才返回 */
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast<SrsPlayPacket*>(pkt)) {
srs_info("level1 identify client by play.");
return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt),
type, stream_name, duration);
}
...
}
return ret;
}
int SrsRtmpServer::identify_play_client(SrsPlayPacket* req, SrsRtmpConnType& type,
string& stream_name, double& duration)
{
int ret = ERROR_SUCCESS;
type = SrsRtmpConnPlay;
/* 客户端请求播放的流名称,可知为 livestream */
stream_name = req->stream_name;
duration = req->duration;
srs_info("identity client type=play, stream_name=%s, duration=%.2f",
stream_name.c_str(), duration);
return ret;
}
鉴别到客户端请求的行为为 play 后,接着为该请求获取一个 SrsSource 类的 source,用于为该客户端的请求提供服务。
/*
* create source when fetch from cache failed.
* @param r the client request.
* @param h the event handler for source.
* @param pps the matched source, if success never be NULL.
*/
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
int ret = ERROR_SUCCESS;
/* 先从全局变量 SrsSource::pool 中寻找是否存在该 stream_url 对应的 source */
SrsSource* source = NULL;
if ((source = fetch(r)) != NULL) {
*pps = source;
return ret;
}
/* 若不存在,下面则是新构建一个,并将该 source 放入到 pool 中 */
/* 根据 vhost/app/stream 生成一个 stream_url */
string stream_url = r->get_stream_url();
string vhost = r->vhost;
// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());
/* 构造一个新的 source */
source = new SrsSource();
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
}
/* 将该 source 以 stream_url 为索引值放入到 pool 中 */
pool[stream_url] = source;
srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
*pps = source;
return ret;
}
/* live streaming source. */
SrsSource::SrsSource()
{
_req = NULL;
jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
mix_correct = false;
mix_queue = new SrsMixQueue();
#ifdef SRS_AUTO_HLS
hls = new SrsHls();
#endif
#ifdef SRS_AUTO_DVR
dvr = new SrsDvr();
#endif
#ifdef SRS_AUTO_TRANSCODE
encoder = new SrsEncoder();
#endif
#ifdef SRS_AUTO_HDS
hds = new SrsHds(this);
#endif
/* cache_metadata: 缓存元数据
* cache_sh_video: 缓存 sps,pps
* cache_sh_audio: 缓存 audio sequence header */
cache_metadata = cache_sh_video = cache_sh_audio = NULL;
_can_publish = true;
_pre_source_id = _source_id = -1;
die_at = -1;
play_edge = new SrsPlayEdge();
publish_edge = new SrsPublishEdge();
/* 默认开启 gop_cache */
gop_cache = new SrsGopCache();
aggregate_stream = new SrsStream();
is_monotonically_increase = false;
last_packet_time = 0;
_srs_config->subscribe(this);
atc = false;
}
接下来,开始响应客户端的 play 命名。
int SrsRtmpServer::start_play(int stream_id)
{
int ret = ERROR_SUCCESS;
// StreamBegin
if (true) {
SrsUserControlPacket* pkt = new SrsUserControlPacket();
pkt->event_type = SrsPCUCStreamBegin;
pkt->event_data = stream_id;
/* 向客户端发送 Stream Begin 1 的用户控制消息 */
if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
...
}
}
// onStatus(NetStream.Play.Reset)
if (true) {
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamReset));
pkt->data->set(StatusDescription,
SrsAmf0Any::str("Playing and resetting stream."));
pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));
pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
...
}
}
// onStatus(NetStream.Play.Start)
if (true) {
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamStart));
pkt->data->set(StatusDescription, SrsAmf0Any::str("Started playing stream."));
pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));
pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
...
}
}
// |RtmpSampleAccess(false, false)
if (true) {
SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();
// allow audio/video sample.
// @see: https://github.com/ossrs/srs/issues/49
pkt->audio_sample_access = true;
pkt->video_sample_access = true;
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
...
}
srs_info("send |RtmpSampleAccess(false, false) message success.");
}
// onStatus(NetStream.Data.Start)
if (true) {
SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeDataStart));
if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
...
}
srs_info("send onStatus(NetStream.Data.Start) message success.");
}
srs_info("start play success.");
return ret;
}
该函数依次向客户端发送的消息如下几幅图。
int SrsRtmpConn::playing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
/* create consumer of source. */
SrsConsumer* consumer = NULL;
if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
...
}
SrsAutoFree(SrsConsumer, consumer);
/* use isolate thread to recv,
* @see: https://github.com/ossrs/srs/issues/217 */
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);
/* start isolate recv thread */
if ((ret = trd.start()) != ERROR_SUCCESS) {
...
}
/* delivery message for clients playing stream. */
wakable = consumer;
ret = do_playing(source, consumer, &trd);
wakable = NULL;
/* stop isolate recv thread */
trd.stop();
/* warn for the message is dropped. */
if (!trd.empty()) {
srs_warn("drop the received %d messages", trd.size());
}
return ret;
}
int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer,
bool ds, bool dm, bool dg)
{
int ret = ERROR_SUCCESS;
consumer = new SrsConsumer(this, conn);
consumers.push_back(consumer);
double queue_size = _srs_config->get_queue_length(_req->vhost);
consumer->set_queue_size(queue_size);
/* if atc, update the sequence header to gop cache time. */
if (atc && !gop_cache->empty()) {
if (cache_metadata) {
cache_metadata->timestamp = gop_cache->start_time();
}
if (cache_sh_video) {
cache_sh_video->timestamp = gop->cache->start_time();
}
if (cache_sh_audio) {
cache_sh_audio->timestamp = gop_cache->start_time();
}
}
/* copy metadata. */
if (dm && cache_metadata &&
(ret = consumer->enqueue(cache_metadata, atc, jitter_algorithm)) != ERROR_SUCCESS) {
...
}
/* copy sequence header
* copy audio sequence first, for hls to fast parse the "right" audio codec.
* @see https://github.com/ossrs/srs/issues/301 */
if (ds && cache_sh_audio && (ret = consumer->enqueue(cache_sh_audio, atc, jitter_algorithm)) != ERROR_SUCCESS) {
...
}
if (ds && cache_sh_video && (ret = consumer->enqueue(cache_sh_video, atc, jitter_algotithm)) != ERROR_SUCCESS) {
...
}
/* copy gop cache to client. */
if (dg && (ret = gop_cache->dump(consumer, atc, jitter_algorithm)) != ERROR_SUCCESS) {
return ret;
}
/* print status. */
if (dg) {
srs_trace("create consumer, queue_size=%.2f, jitter=%d", queue_size, jitter_algorithm);
} else {
srs_trace("create consumer, ignore gop cache, jitter=%d", jitter_algorithm);
}
/* for edge, when play edge stream, check the state */
if (_srs_config->get_vhost_is_edge(_req->vhost)) {
/* notice edge to start for the first client. */
if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) {
...
}
}
return ret;
}
原文:https://www.cnblogs.com/jimodetiantang/p/9113606.html