介绍
随着实时音视频技术的发展,音视频聊天已经走入寻常百姓家,微信2017年每天有4.1亿次的音视频通话。贝壳找房IM作为线上商机来源,也实现了该功能,作为经纪人和客户沟通的便捷工具。除了IM音视频通话,实时音视频技术在贝壳还用到了直播自动审核、经纪人讲盘训练等各种场景,这些场景中都用到了实时音频流网关。以经纪人讲盘训练为例,通过构造一个虚拟人进入音频房间,可以和经纪人进行实时问答,模拟买房和讲盘,提高经纪人作业效率。要达到该效果,虚拟人需要能够识别经纪人讲话并且进行回答或者提问。经纪人讲盘训练整个业务架构如下所示:
其中,自动语音识别技术ASR(Automatic Speech Recognition),是一种将人的语音转换为文本的技术,反之,TTS(Text To Speech),可以将文本转换为语音。实时音视频房间可以使用腾讯云或者声网等公司的技术,或者使用webRTC进行自研,本文重点关注图中标色部分的贝壳音频流网关,其功能为将经纪人的语音推送到后端ASR服务,进行识别和分析,之后通过TTS生成一个回答或者提问并通过音频流网关将语音回传给经纪人。本文主要介绍音频流网关的架构、演进、稳定性建设及使用场景。
音频基础知识
声音是振动产生的声波,经过空气、固体、液体这些介质传播并被人或动物听觉器官所感知的波动现象。音频信号采集需要将声音通过麦克风等转变为模拟信号,然后对模拟信号进行抽样、量化和编码转换成离散的数字信号。PCM(Pulse Code Modulation)就是一种将模拟信号数字化的方法,一般也用来表示未经过封装的音频原始文件。模数转换过程中涉及三个基本概念:采样位深、采样率和通道数。
采样位深:每个采样点用多少bit表示,该值越大,能够表达振动幅度的精确程度就越高。例如采样位深为16bit,则意味着可以将振动幅度划分为65536个等级。
采样率:每秒的采样点数,一般用 Hz来表示,比如1s如果有48000个采样点,则采样率就是48kHz。因为人耳的听觉范围是20Hz-20KHz,根据奈奎斯特采样定理,模数转换过程中,采样频率大于信号中最高频率的2倍时,采样后的数字信号可以完整地保留原始信号的信息。因此如果采样率为48kHz,可以完整保留24kHz以下频率的完整音频信息。
通道数:声音通道个数,常见的为单通道和双通道。双通道可以理解为单通道数据保存两份。人左右耳因为空间位置导致听到声音时间不同,双通道通过播放时模拟这种情况营造声音从不同方向传来的空间感。
音频采样过程中持续采样时间称为帧长,可以使用20ms,也可以使用200ms,时间越短延时越小。假设一次采样,采样位深是16bit,采样率为16kHz,单通道,帧长为20ms,使用PCM,则每帧的大小为:
帧大小 = 位深 * 采样率 * 帧长 * 通道/ (1000*8)
= 16 * 16000 * 20 * 1 /(1000*8)
= 640字节
贝壳音频流网关处理的每帧大小就是640字节。
架构介绍
贝壳音频流网关使用实时音视频云平台提供的服务端SDK进入房间、退出房间以及在房间内推拉流,除SDK功能外,主要需要解决如下问题:
控制进入、退出房间时机
分角色拉取,合并以及重传流
有状态服务横向扩展方法
服务稳定性建设
本文逐个解答这些问题。
架构图
整体架构为Dispatcher-Worker模式,Dispatcher通过对外暴露http接口管理任务,控制Worker启动停止,以及与ASR、TTS服务交互;每个Worker都是独立的进程,调用实时音视频云平台的服务端SDK进入房间进行推流和拉流。Dispatcher与Worker使用双向管道(PIPE)进行通信,例如推拉流的交互,控制信息的交互。贝壳音频流网关使用Go代码实现,Go标准库的os/exec包可以直接调起一个外部程序执行,并且生成和外部程序的双向管道进行通信,os/exec包的关键结构体及方法如下:
type Cmd struct {
Path string // 程序名称
Args []string // 程序参数
Env []string // 程序环境变量
Dir string // 程序路径
Stdin io.Reader // 程序标准输入
Stdout io.Writer // 程序标准输出
Stderr io.Writer // 程序标准错误
...
Process *os.Process //生成的进程
...
}
func Command(name string, arg ...string) *Cmd // 返回一个Cmd结构体
func (c *Cmd) Start() error // 启动一个新的进程
...
func (c *Cmd) StdinPipe() (io.WriteCloser, error) // 返回一个pipe,该pipe和新进程的标准输入进行连接
func (c *Cmd) StdoutPipe() (io.ReadCloser, error) // 返回一个pipe,该pipe和新进程的标准输出进行连接
...
func (c *Cmd) Wait() error // 等待新进程退出
...
Cmd结构体中的Process成员变量代表新生成的进程,通过向Process发送信号可以控制Worker进程的退出
cmd.Process.Signal(syscall.SIGTERM)
拉流
进入房间
音频流网关何时生成一个Worker进程进入房间取决于具体的业务场景,例如在经纪人训练场中,当经纪人点击开始训练之后需要将虚拟人拉入房间,进行交互。Dispatcher通过提供进房接口,由业务方根据业务场景灵活的控制开始拉流的时机,接口及返回值信息如下:
接口:POST /task HTTP/1.1
返回值:
{
"errno": 0,
"errmsg": "success",
"data": {
"address": "10.x.x.x:8888"
}
}
可以看到,返回值中还包含了处理该次请求的机器IP+端口,这是为了能够横向扩展做的一处优化,跟推流有关系,下文详细描述
流传输
业务方调用进房接口后,Dispatcher会生成一个Worker进入房间开始拉取房间音频流,之后通过管道传输给Dispatcher,Dispatcher再传输给ASR服务。在我们的业务场景中,音频流每帧是20ms,16KHz的采样率,16bit的位深,单声道PCM格式,因此每帧大小为640字节,我们做个简单的计算,假设每个房间中有一个经纪人,同时在线房间数为5000(即有5000个经纪人同时在线训练),则每秒钟的QPS为(1000ms/20ms)*5000 = 250000。为了提高效率,音频流网关会默认累积10帧(200ms)之后传输给ASR服务。此时示例中的QPS相应降为250000/10 = 25000。示意图如下:
从图中还可以看出两点信息:
最后一个包可能不满10帧
每个包传输到ASR时会加入一个Sequence字段,表明包的顺序,最后一个包会将顺序字段取负,表明该次拉流已经结束。Sequence字段会在流重传时使用。
流重传
ASR服务在某些场景丢包之后无法继续准确识别。因此将包传输到ASR服务时,除了正常的重试策略,还会在音频网关保留最近的30个包。当ASR服务长时间未收到一个包时,可以将该包的Sequence返回到音频流网关,网关启动时会开启一定数量的goroutine,这些goroutine收到重传信息后会从保留包中查找并重传。
退出房间
退出房间有两种机制:
当房间内有用户进入和退出时,实时音视频通道的SDK会有回调信息通知此类事件。因此可以通过设置状态标记和计数器,当房间内所有人都退出后,虚拟人也自动退出
提供退房接口,业务方需要退出房间时调用该接口,Dispatcher给Worker进程发送信号,通知其退出。
推流
推流也通过提供一个接口实现,如下:
POST /upstream HTTP/1.1
通过TTS生成语音之后,需要以16bit位深、16kHz的采样率、单声道PCM格式上传,同时需要指定对应的实时音视频房间号。上传成功之后音频流网关启动一个20ms间隔的Ticker(Ticker是Go语言中的一个标准库,经过一个固定时间间隔之后执行某项任务的组件),代码示例如下:
delayTicker := time.NewTicker(20 * time.Millisecond)
defer delayTicker.Stop()
count := len(alls) //alls中保存待推送的音频流
var err error
for i := 0; i < count; i++ {
...
<-delayTicker.C // 每20ms会返回一次
err = dispatcher.SendToTrtc(alls[i], e)// 发送音频流
...
}
间隔20ms发送一个音频包,直到发送完毕
如何找到推流机器
上文讲述拉流时,可以看到每个Worker会进入一个实时音视频的房间,而Worker是跟具体的机器绑定的有状态服务。那么推流时如何找到某个房间号对应的Worker属于哪台机器呢?
有两种方法,一是调用task接口进入房间时会返回该房间对应的机器IP+Port,业务方可以记录该返回信息,推流时直接使用。
第二种方法是音频流网关提供了一个查询接口,业务方可以通过查询接口得到房间号对应的机器IP+PORT。
演进及优化
心跳机制
第一版上线后发现虚拟人推流时,经纪人端的播放会出现缺失开头几个字的吞音现象。查看实时音视频通道侧的用户事件,发现推流之前会发生经纪人退出房间的现象,原因为经纪人侧认为房间内已没有其他人员,因此主动退房。改进机制为每秒钟发送一个静音帧作为心跳。
内存分配
上文提到每个音频帧是640字节,如果每次我们都直接新生成一个代表音频帧的结构体,则内存分配与GC耗时都会成为瓶颈,因此需要提前生成一批音频帧结构体进行复用,放入一个池子,使用时取出,使用完毕后放回。在Go语言中,很自然会想到sync.Pool标准包。
type Pool struct {
New func() interface{}
}
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
sync.Pool包中只有简单的Get与Put两个方法,用来获取和放回一个结构体;New成员是一个函数,作用为生成一个新的结构体。
高可用建设
音频流网关的高可用建设需要考虑如下几个方面:
入口处:限流
强依赖:无,不需考虑降级
单次请求生命周期平均为5-10分钟(即每个Worker进程会持续5-10分钟),需要根据单进程占用资源控制进程的总数量
实时音视频通道:现在使用某云平台的实时音视频通道,是一个单点,多通道建设成本高,暂不考虑
限流
通过使用Sentinel,配合配置中心下发限流开关以及限流值,达到接口维度的限流
单机容量
音频流网关为计算密集型服务,主要资源瓶颈为CPU,经过测试单Worker CPU使用率为5%,因此单CPU支持的Worker个数为20个。因此根据服务器总的CPU核心个数,计算Worker的容量值,通过配置中心下发进行限制。
容量状态
通过提供一个接口,可以观察音频流网关当前容量状态,并且业务方可以据此决定是否需要进行业务侧限流,接口如下:
GET /current/workers
{
"errno":0,
"errmsg":"success",
"data":{
"current_nums":807,
"max_total_nums":1620
}
}
上述示例表明当前总的Worker个数为807,音频流网关可承载的总的Worker个数为1620,即使用了接近50%的容量
场景
音频流网关主要用到如下5种场景:
1. 经纪人讲盘训练:将经纪人的语音推送到后端ASR服务,进行识别和分析,之后通过TTS生成一个回答或者提问并通过音频流网关将语音回传给经纪人。
2. 录制转码:拉取音频流之后可以分别按经纪人和客户录制为单流音频,也可以将多路单流合并后录制混流音频,转码之后上传存储进行音频分析和音频回
3. 经纪人提词器:通过拉取音频流转码为文本并进行分析,可以生成对应的答案给到经纪人端
4. 直播审核:传统方式需要审核人员进入直播间全程观看,通过音频流网关可以将多个直播间音频拉取到ASR服务转换为文本并通过九宫格方式展示,提高审核效率
5. 网络语音播报:使用TTS服务生成语音后向音频流网关推流并接通经纪人APP,可以进行网络语音播报,例如通知类消息或者提醒类消息
结论
从文字、书籍到电话、短信,进一步到IM、语音通话、视频通话,技术的进步使得人与人间的沟通交流更加方便。对企业来说,通过技术赋能,可以让服务者提供更好的服务,客户拥有更好的体验,技术的魅力就在于让人类生活的更美好吧。