SRS的线程池介绍SrsThreadPool—SRS源码分析

作者:罗上文,微信:Loken1,公众号:FFmpeg弦外之音

SRS 里面跟线程池相关的数据结构是 SrsThreadPoolSrsThreadEntry,他们的关系如下:

1-1

刚开始看这个这个结构的时候,我以为 SRS 的线程池的设计是,预先创建一堆线程,然后把任务丢进去,根据任务的数量或者相关的情况去自动消耗线程,

但后来我发现,SRS 的线程池设计并没有那么复杂,他很简单的。SrsThreadEntry 类就是单个线程的信息,它的 start() 函数就是执行的具体的任务。而 SrsThreadPool 类可以理解为线程的管理器,SrsThreadPool::threads_ 字段是一个 vector 数组,负责管理所有的线程。

SrsThreadPool 类的 execute 函数就负责创建一个 新的线程实例 SrsThreadEntryrun() 函数负责定时监控所有线程的状态。


下面我们来仔细讲一下 SrsThreadPoolSrsThreadEntry 这两个类里面的各个字段跟方法。

SrsThreadPool 类

SrsThreadPool 类的字段如下:

1,SrsThreadEntry* entry_

entry_ 存储的是始祖线程的信息,也就是 main 线程自己。

2,srs_utime_t interval_

interval_ 是间隔的意思吧?xxx,后面补充。

3,SrsThreadMutex* lock_

这个字段是配合 SrsThreadLocker() 函数用的, SrsThreadLocker() 是一个宏函数,他实际上就是创建一个 锁 对象实例,SrsThreadLocker() 的定义如下:

#define SrsThreadLocker(instance) \
    impl__SrsThreadLocker _SRS_free_##instance(instance)
SrsThreadLocker(lock_);
展开之后如下
impl__SrsThreadLocker _SRS_free_lock_(lock_)

实际上就是创建了一个 _SRS_free_lock_ 变量,而这个变量的类型是 impl__SrsThreadLockerimpl__SrsThreadLocker 类的构造函数就是加锁,而析构函数就是释放锁。如下:

1-2


4,std::vector<SrsThreadEntry*> threads_

threads_ 是线程列表,execute() 创建的线程 都会加进去 线程列表,方便管理。


5,SrsThreadEntry* hybrid_

hybrid_ 存储的是 混合线程 的信息,由于 SRS 的线程化还没有完成,所以他并不是每个模块里面开一个线程或者多个线程,例如 RTSP 一个线程,RTMP 一个线程,或者 WebRTC 开多个线程。他暂时还不是这样的。他是所有的模块都混在 hybrid_ 线程里面,目前还没完成线程化。但是线程池设计已经做好了的,我觉得线程池的设计后面改动应该不会太大。

所以 hybrid_ 顾名思义就是混合线程的意思,在 execute() 创建的线程,如果 labelhybird


6,int pid_fd

进程ID 的文件描述符,注意,这个不是进程 ID,是一个文件的描述符,这个文件的内容就是进程的ID。


SrsThreadPool 类的方法如下:

1,static srs_error_t setup_thread_locals()

setup_thread_locals() 里面就是调了一下 srs_st_init(),这个是 StateThreads 协程库的初始化函数。里面是对一些全局变量做初始化,由于 SRS 的多线程改造,已经把这些全局变量变成 thread local 模式了,所以在每次执行新的线程任务之前,都要调一下 setup_thread_locals(),如下:

1-3

2,srs_error_t initialize()

initialize() 函数非常简单,就是调 acquire_pid_file() 获取一个进程 ID,保存进去文件。还有获取配置文件的 interval_interval_ 是干什么的后面补充一下。

3,virtual srs_error_t acquire_pid_file()

acquire_pid_file() 的作用就是获取进程ID,保存进去文件。然后把文件描述符赋值给 SrsThreadPool::pid_fd 字段。

4,srs_error_t execute(std::string label,...)

execute() 函数是负责创建线程来执行任务的,他会先开线程运行 SrsThreadPool::start() 函数,然后再 start() 里面才真正运行 SrsThreadEntry 里的任务函数。流程 如下:

1-4

5,srs_error_t run();

run() 函数负责运行 始祖线程,始祖线程 的作用就是检测其他线程的运行状态,类似 watch dog,如下:

1-5

他的检测算法 还有 interval_ 变量比较难懂,上面的代码 for 循环里面有两个 i,逻辑看起来比较绕。不知道是写错了?还是故意设计出两个 i 的?

补充:是写错了,是个 bug。应该是 int iint j


SrsThreadEntry 类

SrsThreadEntry 类的字段如下:

1,SrsThreadPool* pool

你可以把 SrsThreadEntry 理解为一个线程实例,而 pool 负责把 线程实例 绑定到具体的线程池里面。

2,std::string label

label 是线程的标签,目前只有两个标签,primordialhybrid,其实我个人觉得这个 label 应该用枚举类型来实现更好一些。

3,std::string name

name 是线程的名称,他还用了 pthread_setname_np() 来把这个 name 设置成线程的名称,所以你在调试器里面是可以直接看到这个 名称,这样可以在调试的时候非常容易的区分各个线程。

#ifndef SRS_OSX
    // https://man7.org/linux/man-pages/man3/pthread_setname_np.3.html
    pthread_setname_np(pthread_self(), entry->name.c_str());
#else
    pthread_setname_np(entry->name.c_str());
#endif

1-6


4,int num

这个 num 好像就是简单统一一下线程的编号,例如第一个线程的 num 是 1,第二个线程的 num 是 2。

5,pid_t tid

tid 是线程的ID,通过 gettid() 函数来获取的。

6,srs_error_t err

线程任务的错误,默认是 srs_success 成功。始祖线程就是检测其他子线程的这个 err 字段,判断是不是 srs_success ,如果不是就写日志记录退出的原因等信息。


SrsThreadEntrystart() 函数指针 跟 arg 是一个重点。

srs_error_t (*start)(void* arg);
void* arg;

首先 argstart 函数的参数,而 start() 的指向是在 SrsThreadPool::execute() 的时候被赋值的,如下:

1-7

1-8


SrsThreadPoolSrsThreadEntry 这两个类的数据结构已经分析完毕,下面画一张图展示 SRS 从 main 开始 创建 线程的流程。如下:

1-9

上图中中总共就 2 个线程,已经用颜色标注出来了,一个是 hybird 线程,一个是 始祖线程,始祖线程本身就是存在的,不需要 pthread_create 创建,始祖线程是在创建全局变量 _srs_thread_pool 的时候在构造函数里面进行初始化的,如下:

2-1


虽然 SRS 的线程池设计做好了,我觉得至少完成了 80%,后面改动应该不会太大。虽然他做好了,但是并没有在项目里面完全用上这个设计,可以看到,除开 SRT 模块的线程,一共就 2 个线程,就是 primordialhybird 线程。

理想状态下,我觉得应该是 RTMP,RTSP,WebRTC 等等 各个模块有各自的线程。


版权所属 xianwanzhiyin.net 罗上文 2023 all right reserved,powered by Gitbook该文件修订时间: 2024-01-08 01:12:46

results matching ""

    No results matching ""