RTMP协议

RTMP是工作在TCP之上的应用层协议。

握手包

使用OBS推流, Wireshark过滤网络层的源ip和目标ip: ip.dst == 10.248.154.188 || ip.src == 10.248.154.188, 10.248.154.188是推流服务器的地址. 之前使用ffmpeg推流时, 发现握手包的第一个4个字节全为0, 第二个4个字节不为全0, 第二个4个字节和协议规定不同符合, 所以换成了OBS推流, 握手的C0+C1格式符合协议.

C1的格式如下(1536 bytes):
time(时间戳) 4 bytes|zero(全0) 4 bytes|random(随机数) 1528 bytes

C2和S2的数据部分大小也是1536 bytes, 作为S1和C1的回应, 由下面几个字段组成:
time(时间戳) 4 bytes|time2(时间戳) 4 bytes|random(随机数) 1528 bytes

  • time: 字段必须包含对应的发送方发过来的包中的时间戳, C2对应S1, S2对应C1
  • time2: 字段必须包含前一个包(s1/c1)包含的时间戳
  • random: 随机数部分包含对方对应上一个包的随机数部分, C2对应S1, S2对应C1, 通信双方可以利用时间戳探测带宽和延迟, 随机数这个字段看起来没啥用处

分块(Chunking)

握手完成以后,连接包含一个或多个chunk (分块)流, 每个chunk流用于运输消息流中的一种类型的消息, 每个chunk创建的时候有个唯一的ID和它关联, 称之为chunkstream ID, 这些chunk通过网络传输, 在传输过程中, 下一个块必须等待上一个块被完全传输, 在接收方一端, 这些chunk基于stream ID组装成消息.

分块允许高级别协议的大消息拆分成更小的消息, 例如避免大的低优先级的消息(例如: 视频)阻塞更小的高优先级的消息(例如: 音频和控制消息).

分块同样允许小的消息以更少的开销发送, 块的头部包含表示压缩的信息, 这些信息必须包含在消息本身中.

块的大小是可以配置的. 可以通过使用Set Chunk Size控制消息来设置. 较大的块大小可以减少CPU占用, 但是同样的, 在低带宽连接下提交一个较大的块可能延迟其他内容. 较小的分块对高比特流来说并不好. 块的大小可以由每个目标独立维护.

1
2
3
4
5
6
7
+--------------+----------------+--------------------+--------------+
| Basic Header | Message Header | Extended Timestamp | Chunk Data |
+--------------+----------------+--------------------+--------------+
| |
|<------------------- Chunk Header ----------------->|

Chunk Format
  • Basic Header: 基本头(1到3个字节),这个字段编码了chunk的stream id和chunk的类型(chunk type), chunk的类型决定了编码的消息字段的格式, 这个字段的长度完全依赖于chunk的stream id, 是一个可变长度的字段
  • Message Header: 消息头(0、3、7或11个字节), 这个字段编码了消息被发送的一些信息, 长度由位于块头的块类型(chunk type)字段决定.
  • Extended Timestamp: 拓展时间戳(0或4个字节), 这个字段存在于具体的场景, 这些场景取决于编码时间戳或者块的Message Header的时间戳增量.
  • Chunk Data: 块数据(可变大小), 块的负载可以达到配置的最大块大小.

分块基础头部

协议支持多达65597个流, ID范围365599, ID 0, 1, 2是保留的. 值0表示2个字节的形式, ID在64319这个序列范围(第二个字节(0 ~ 255)+64). 值1表示3个字节的形式, ID在6465599这个序列范围(第三个字节(0 ~ 255) * 256 + 第二个字节(0 ~ 255) + 64). 值在3 ~ 63之间代表一个完整的流ID, 流ID的值等于2是为低级别的协议控制消息和命令保留的.

fmt占两位, 这个字段用来表示块消息头支持的格式,下一节介绍

1
2
3
4
5
6
/// 两个字节的形式 stream id 为第二个字节 + 64. 
0 1
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|fmt| 0 | cs id - 64 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
1
2
3
4
5
/// 三个字节的形式, 剩余的字节 + 64
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|fmt| 1 | cs id - 64 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

块的stream id占6位的情况

1
2
3
4
 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
|fmt| cs id |
+-+-+-+-+-+-+-+-+

用来表示2 ~ 63之间的ID, 值为0和1是用来指示这个字段两个字节和三个字节的版本.

chunk的stream ID在64 ~ 319的既可以用两个字节的版本表示也可以用三个字节的版本表示.

块消息头

块消息头有4种格式,由块基本头的fmt字段确定.

类型 0

类型0的块头部占11个字节.

1
2
3
4
5
6
7
8
9
10
11
 0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp |message length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (cont) |message type id| msg stream id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message stream id (cont) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Chunk Message Header - Type 0
  • 时间戳字段占3个字节, 对于类型 0的块, 消息的绝对时间戳在这里发送, 如果时间戳部分大于或等于0xFFFFFF, 这个字段必须设置为0xFFFFFF, 表明拓展时间戳编码为完整的32位时间戳. 否则这个部分就是整个时间戳.

类型 1

类型 1的块消息头占7个字节, 消息的stream ID不包含在其中, 这个块使用的stream ID与前一个块相同. 流包含可变大小的消息(例如: 大部分视频格式)应该使用这种格式–第一个消息后的每个新的消息的第一个块使用这种格式

1
2
3
4
5
6
7
8
9
10
 0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp delta |message length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message length (cont) |message type id|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Chunk Message Header - Type 1

类型 2

类型 2的快消息占3个字节, 既不包含stream ID也不包含消息的长度. 这个消息的stream ID和消息长度与上一个块相同, 不变大小的消息流(例如: 某些音频和数据格式)应该使用这个格式–第一个消息后的每个消息的第一个块使用这种格式.

1
2
3
4
5
6
7
8
 0                   1                   2
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp delta |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Chunk Message Header - Type 2

类型 3

类型 3没有消息头, stream ID, 消息长度以及时间戳增量这些信息会从上一个相同的Chunk Stream ID的块中得到. 当一个单独的消息被分割成了许多块, 这个消息的所有块除了第一块都应该使用这个格式. 参考第二个例子, 一个流如果有许多相同大小、相同stream ID和时间戳字段占用相同大小, 一个块使用类型2以后其他所有的块都应该使用这种类型. 参考第一个例子, 如果第一个消息和第二个消息的增量和第一个消息的时间戳增量相同, 这是一个块使用类型3能够立即跟随一个类型0的块, 这个块就没必要使用类型2来注册时间戳增量. 如果一个类型3的块跟随一个类型0的块, 类型3的块的时间戳和类型0的块相同.

公共头部字段

拓展时间戳

拓展时间戳字段,用来编码时间戳或者时间增量, 这些时间戳或者时间增量无法在类型0、1和2的块中用24位表示, 这个字段编码完整32位时间戳或者时间增量, 这个字段是否存在由类型0的块配置时间戳字段, 或者类型1或类型2的块配置时间戳增量字段为0XFFFFFF. 最近类型0、类型1或者类型2的块有相同的chunk stream ID, 并且指明拓展时间戳字段存在,那么这个字段可以在类型3的块中存在

一些例子

协议控制消息

RTMP分块流协议控制消息使用消息类型的ID有1, 2, 3, 5, 6. 这些协议控制消息必须用0作为消息的stream ID, 并且需要在块的stream ID为2的流中发送, 协议消息在收到后尽可能快的生效, 它们的时间戳会被忽略.

命令的类型

客户端和服务交换AMF格式编码的命令. 发送方发送一个命令消息, 包含命令名称、事物ID以及包含相关参数的命令对象. 例如, connect这个命令包含app这个参数, 这个参数用来说明客户端连接的服务端应用. 接收方处理这个命令并且发送一个事务ID相同的响应. 响应的字符串不是_result_error就是一个方法的名称. 例如verifyClientcontactExternalServer

一个命令名为_result_error发出一个响应. 事务ID指明响应引用的未完成的命令. 这和IMAP以及很多其他协议中的tag完全相同. 命令的string字段中的方法名称表明,发送方尝试运行接收方的方法.

常见问题

  1. 网络序是大端法, 对于浮点数这类包含多个字节的数据类型, 字节顺序弄错了就可能造成很多问题, 比如RTMP协议中有一个字段是3个字节的时间戳, 如果这个时间戳大于0xFFFFFF, 就会使用拓展的时间戳字段, 使用拓展的时间戳字段, 那么Chunk的格式也会改变, 最常见的问题就是, C语言中没有提供3个字节的整数类型, 有4个字节的整数类型, 如果赋值时不小心将4个字节全部写进了报文就可能造成意想不到的问题

  2. 考虑到局部性原理和Cache的命中问题, 编写程序时需要注意字节对齐, 一般可以调整结构体中属性的顺序, 但是为了保证属性的顺序和报文的顺序一一对应, 一般通过pack指令来设置对齐, 比如将原始的PCM数据转换为WAV文件时, 需要添加一个音频数据的描述头.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#pragma pack(1)
typedef unsigned char byte;
typedef struct WAVHeader {
// big endian
char no[4];
// little endian
uint32_t size;
// big endian
char format[4];
char formatTag[4];
// little endian
uint32_t formatTagSize;
// little endian
uint16_t audioFormat;
uint16_t audioChannel;
uint32_t audioSampleRate;
uint32_t audioByteRate;
uint16_t audioBlockAlign;
uint16_t audioBitsPerSample;

// big endian
char dataTag[4];
uint32_t dataTagSize;
} WAVHeader;
#pragma pack()

对每个属性赋值以后, 结构体内存中的数据就是我们需要的, 就不需要手动处理了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int channels       = option.channels;
int samplerate = option.samplerate;
int bitspeersample = option.bitspeersample;
int byterate = channels * samplerate * bitspeersample / 8;
WAVHeader header = {.no = {'R', 'I', 'F', 'F'},
.size = i2l((dataSize + 36)),
.format = {'W', 'A', 'V', 'E'},
.formatTag = {'f', 'm', 't', ' '},
.formatTagSize = i2l(16),
.audioFormat = s2l(1),
.audioChannel = s2l(channels),
.audioSampleRate = i2l(samplerate),
.audioByteRate = i2l(byterate),
.audioBlockAlign = s2l(4),
.audioBitsPerSample = s2l(bitspeersample),
.dataTag = {'d', 'a', 't', 'a'},
.dataTagSize = i2l(dataSize)};

fwrite(&header, sizeof(WAVHeader), 1, file);
byte * buff = new byte[128];
int sizeRead = 0;
while ((sizeRead = fread(buff, 1, 128, pcmFile)) != 0) {
fwrite(buff, 1, sizeRead, file);
}
delete[] buff;
fclose(pcmFile);
fclose(file);
  1. 字符串这种是由字符组成, 每个字符是单个字节, 所以不需要考虑字节序, C语言里面向内存中写入字符串, 一般可以通过字符数组直接赋值, "example"这个双号包裹的, 得到的是一个地址, 还得使用memcpy复制到指定内存

  2. 苹果提供的硬编码函数AudioConverterFillComplexBuffer, 你塞的数据不够的时候, 它就会认为遇到了EOF, 音频采集的数据有时候很少, 就必须缓存下来, 数据足够多时再编码。苹果提供了一个音频编码的例子AudioConverter and AudioFile Sample

  3. 内存管理, 消息分块, 每个块需要拼接块头, 有两种方法, 都需要手动管理内存.

SRS源码分析

read_message_payload 读取消息体,包括消息拼接

分块消息缓存与拼接, 在RTMP协议中,一个重要的内容就是如何高效的传输音频和视频数据, 协议中例举一些具体的应用场景, 我们可以看看服务进程是如何处理分块的。
获取缓存的分块,当一个消息太大了, 协议要求分块,除了最后一块, 每一块有效载荷的大小都必须和协商的块大小一致, 因为TCP能保证分组按序达到, 所以除了第一块需要设置有效载荷的总长度, 后面的块格式也需要修改, 服务进程会将这些块拼接

下面是获取缓存的块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// get the cached chunk stream.
SrsChunkStream* chunk = NULL;

// use chunk stream cache to get the chunk info.
// @see https://github.com/ossrs/srs/issues/249
if (cid < SRS_PERF_CHUNK_STREAM_CACHE) {
// already init, use it direclty
chunk = cs_cache[cid];
} else {
// chunk stream cache miss, use map.
if (chunk_streams.find(cid) == chunk_streams.end()) {
chunk = chunk_streams[cid] = new SrsChunkStream(cid);
// set the perfer cid of chunk,
// which will copy to the message received.
chunk->header.perfer_cid = cid;
} else {
chunk = chunk_streams[cid];
}
}

拼接剩余的块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// the chunk payload size.
int payload_size = chunk->header.payload_length - chunk->msg->size;
payload_size = srs_min(payload_size, in_chunk_size);

// create msg payload if not initialized
if (!chunk->msg->payload) {
chunk->msg->create_payload(chunk->header.payload_length);
}

// read payload to buffer
if ((err = in_buffer->grow(skt, payload_size)) != srs_success) {
return srs_error_wrap(err, "read %d bytes payload", payload_size);
}
memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->read_slice(payload_size), payload_size);
chunk->msg->size += payload_size;

// got entire RTMP message?
if (chunk->header.payload_length == chunk->msg->size) {
*pmsg = chunk->msg;
chunk->msg = NULL;
return err;
}

其中in_chunk_size默认是128, 支持配置, 可以通过setChunkSize设置。

参考文档