srs_app_mpegts_udp.cpp
其中的 on_udp_packet() 响应接收数据事件,并保存到 buffer 变量中,最后调用 on_udp_bytes() 进行处理
int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf) { std::string peer_ip = inet_ntoa(from->sin_addr); int peer_port = ntohs(from->sin_port); // append to buffer. buffer->append(buf, nb_buf); srs_info("udp: got %s:%d packet %d/%d bytes", peer_ip.c_str(), peer_port, nb_buf, buffer->length()); return on_udp_bytes(peer_ip, peer_port, buf, nb_buf); }
on_udp_bytes 收到数据后进行处理:
int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf) { // ... // use stream to parse ts packet. int nb_packet = buffer->length() / SRS_TS_PACKET_SIZE; for (int i = 0; i < nb_packet; i++) { char* p = buffer->bytes() + (i * SRS_TS_PACKET_SIZE); if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) { return ret; } // process each ts packet if ((ret = context->decode(stream, this)) != ERROR_SUCCESS) { srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret); continue; } srs_info("mpegts: parse ts packet completed"); } srs_info("mpegts: parse udp packet completed"); // erase consumed bytes if (nb_packet > 0) { buffer->erase(nb_packet * SRS_TS_PACKET_SIZE); } return ret; }
注意参数里的buf没有用到,而是用的成员变量 buffer
首先,将数据流切成ts组(188字节,sync byte 0x47已经切除)
然后,调用 SrsStream 类来存储每个 ts 分组(其实就是封装一下)
最后,调用 SrsTsContext::decode 解码
int SrsTsContext::decode(SrsStream* stream, ISrsTsHandler* handler) { int ret = ERROR_SUCCESS; // parse util EOF of stream. // for example, parse multiple times for the PES_packet_length(0) packet. while (!stream->empty()) { SrsTsPacket* packet = new SrsTsPacket(this); SrsAutoFree(SrsTsPacket, packet); SrsTsMessage* msg = NULL; if ((ret = packet->decode(stream, &msg)) != ERROR_SUCCESS) { srs_error("mpegts: decode ts packet failed. ret=%d", ret); return ret; } if (!msg) { continue; } SrsAutoFree(SrsTsMessage, msg); if ((ret = handler->on_ts_message(msg)) != ERROR_SUCCESS) { srs_error("mpegts: handler ts message failed. ret=%d", ret); return ret; } } return ret; }
参数 handler 就是 SrsMpegtsOverUdp 本身
首先还是调用 SrsTsPacket::decode 进行解码
int SrsTsPacket::decode(SrsStream* stream, SrsTsMessage** ppmsg) { int ret = ERROR_SUCCESS; int pos = stream->pos(); // 4B ts packet header. if (!stream->require(4)) { ret = ERROR_STREAM_CASTER_TS_HEADER; srs_error("ts: demux header failed. ret=%d", ret); return ret; } sync_byte = stream->read_1bytes(); if (sync_byte != 0x47) { ret = ERROR_STREAM_CASTER_TS_SYNC_BYTE; srs_error("ts: sync_bytes must be 0x47, actual=%#x. ret=%d", sync_byte, ret); return ret; } int16_t pidv = stream->read_2bytes(); transport_error_indicator = (pidv >> 15) & 0x01; payload_unit_start_indicator = (pidv >> 14) & 0x01; transport_priority = (pidv >> 13) & 0x01; pid = (SrsTsPid)(pidv & 0x1FFF); int8_t ccv = stream->read_1bytes(); transport_scrambling_control = (SrsTsScrambled)((ccv >> 6) & 0x03); adaption_field_control = (SrsTsAdaptationFieldType)((ccv >> 4) & 0x03); continuity_counter = ccv & 0x0F; // TODO: FIXME: create pids map when got new pid. srs_info("ts: header sync=%#x error=%d unit_start=%d priotiry=%d pid=%d scrambling=%d adaption=%d counter=%d", sync_byte, transport_error_indicator, payload_unit_start_indicator, transport_priority, pid, transport_scrambling_control, adaption_field_control, continuity_counter); // optional: adaptation field if (adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly || adaption_field_control == SrsTsAdaptationFieldTypeBoth) { srs_freep(adaptation_field); adaptation_field = new SrsTsAdaptationField(this); if ((ret = adaptation_field->decode(stream)) != ERROR_SUCCESS) { srs_error("ts: demux af faield. ret=%d", ret); return ret; } srs_verbose("ts: demux af ok."); } // calc the user defined data size for payload. int nb_payload = SRS_TS_PACKET_SIZE - (stream->pos() - pos); // optional: payload. if (adaption_field_control == SrsTsAdaptationFieldTypePayloadOnly || adaption_field_control == SrsTsAdaptationFieldTypeBoth) { if (pid == SrsTsPidPAT) { // 2.4.4.3 Program association Table srs_freep(payload); payload = new SrsTsPayloadPAT(this); } else { SrsTsChannel* channel = context->get(pid); if (channel && channel->apply == SrsTsPidApplyPMT) { // 2.4.4.8 Program Map Table srs_freep(payload); payload = new SrsTsPayloadPMT(this); } else if (channel && (channel->apply == SrsTsPidApplyVideo || channel->apply == SrsTsPidApplyAudio)) { // 2.4.3.6 PES packet srs_freep(payload); payload = new SrsTsPayloadPES(this); } else { // left bytes as reserved. stream->skip(nb_payload); } } if (payload && (ret = payload->decode(stream, ppmsg)) != ERROR_SUCCESS) { srs_error("ts: demux payload failed. ret=%d", ret); return ret; } } return ret; }
先分析数据得到 Packet ID ,然后new出对应的解码类。
解码得到 SrsTsMessage,再回到 SrsMpegtsOverUdp::on_ts_message 进行处理
int SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg) { // ... // parse the stream. SrsStream avs; if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) { srs_error("mpegts: initialize av stream failed. ret=%d", ret); return ret; } // publish audio or video. if (msg->channel->stream == SrsTsStreamVideoH264) { return on_ts_video(msg, &avs); } if (msg->channel->stream == SrsTsStreamAudioAAC) { return on_ts_audio(msg, &avs); } // TODO: FIXME: implements it. return ret; }
其重点是把 ts message 中的 payload 数据封装在 SrsStream 中,
然后调用 on_ts_video 和 on_ts_audio 分别进行处理。
对于video来说已经解码的是H.264流,所以进行了frame提取,并重新封装成flv格式,再通过rtmp转发出去。
对于audio,是对AAC数据提取frame,再封装成ADTS进行传输。
原文:https://www.cnblogs.com/zoneofmine/p/10893497.html