本章内容解读SRS开源代码框架,无二次开发,以学习交流为目的。
SRS是国人开发的流媒体服务器,C++语言开发,本章使用版本:https://github.com/ossrs/srs/tree/5.0release。
目录
- SRS协程库ST的使用
- 源码
- ST协程库测试
- SrsAutoFree测试
SRS协程库ST的使用
C语言协程库state-threads(简称ST库):https://sourceforge.net/projects/state-threads/。
SRS对state-threads库进行了2次开发:https://github.com/ossrs/state-threads。
1、ST库的编译
在下载的srs-5.0release.zip安装包里有ST源码,直接编译:
cd /srs-5.0release/trunk/3rdparty/st-srs
make linux-debug #编译
在构建目录生成库文件libst.a,头文件st.h。
2、ST库的使用
SRS封装了协程类SrsSTCoroutine,通过C++类的继承和虚函数回调,实现了在回调函数执行协程处理函数(和linux线程库函数pthread_create用法类似)。
这部分代码还包含了SrsAutoFree定义,可以在离开作用域时自动释放指针,也是很有用的一个模块。
源码
源码结构如下:
├── chw_adapt.h
├── srs_app_st.cpp
├── srs_app_st.hpp
├── srs_kernel_error.cpp
├── srs_kernel_error.hpp
├── srs_kernel_io.cpp
├── srs_kernel_io.hpp
├── srs_protocol_io.cpp
├── srs_protocol_io.hpp
├── srs_protocol_st.cpp
└── srs_protocol_st.hpp
其中srs_kernel_io、srs_protocol_io和SRS源码一样,可以在SRS源码里找,srs_kernel_error源码参考这里:SRS开源代码框架,错误类(SrsCplxError)的使用。
日志打印使用printf代替,上下文SrsContextId使用std::string代替。
chw_adapt.h
#ifndef CHW_ADAPT_H
#define CHW_ADAPT_H#include <string>
typedef std::string SrsContextId; //减少依赖,上下文ID使用SrsContextId代替
typedef int64_t srs_utime_t;
#define SRS_UTIME_MILLISECONDS 1000
#define srsu2ms(us) ((us) / SRS_UTIME_MILLISECONDS)
#define srsu2msi(us) int((us) / SRS_UTIME_MILLISECONDS)// Never timeout.
#define SRS_UTIME_NO_TIMEOUT ((srs_utime_t) -1LL)// To delete object.
#define SrsAutoFree(className, instance) \impl_SrsAutoFree<className> _auto_free_##instance(&instance, false, false, NULL)
// To delete array.
#define SrsAutoFreeA(className, instance) \impl_SrsAutoFree<className> _auto_free_array_##instance(&instance, true, false, NULL)
// Use free instead of delete.
#define SrsAutoFreeF(className, instance) \impl_SrsAutoFree<className> _auto_free_##instance(&instance, false, true, NULL)
// Use hook instead of delete.
#define SrsAutoFreeH(className, instance, hook) \impl_SrsAutoFree<className> _auto_free_##instance(&instance, false, false, hook)
// The template implementation.
template<class T>
class impl_SrsAutoFree
{
private:T** ptr;bool is_array;bool _use_free;void (*_hook)(T*);
public:// If use_free, use free(void*) to release the p.// If specified hook, use hook(p) to release it.// Use delete to release p, or delete[] if p is an array.impl_SrsAutoFree(T** p, bool array, bool use_free, void (*hook)(T*)) {ptr = p;is_array = array;_use_free = use_free;_hook = hook;}virtual ~impl_SrsAutoFree() {if (ptr == NULL || *ptr == NULL) {return;}if (_use_free) {free(*ptr);} else if (_hook) {_hook(*ptr);} else {if (is_array) {delete[] *ptr;} else {delete *ptr;}}*ptr = NULL;}
};#endif // CHW_ADAPT_H
srs_app_st.hpp
#ifndef SRS_APP_ST_HPP
#define SRS_APP_ST_HPP#include <string>
#include "chw_adapt.h"#include <srs_kernel_error.hpp>
#include <srs_protocol_st.hpp>class SrsFastCoroutine;
// 每个协程都要继承这个类
class ISrsCoroutineHandler
{
public:ISrsCoroutineHandler();virtual ~ISrsCoroutineHandler();
public:// Do the work. The ST-coroutine will terminated normally if it returned.// @remark If the cycle has its own loop, it must check the thread pull.// 协程处理函数,如果返回则协程结束virtual srs_error_t cycle() = 0;
};// Start the object, generally a croutine.
// 通常是启动一个ST对象
class ISrsStartable
{
public:ISrsStartable();virtual ~ISrsStartable();
public:virtual srs_error_t start() = 0;
};// The corotine object.
// 协程基类
class SrsCoroutine : public ISrsStartable
{
public:SrsCoroutine();virtual ~SrsCoroutine();
public:virtual void stop() = 0;virtual void interrupt() = 0;// @return a copy of error, which should be freed by user.// NULL if not terminated and user should pull again.virtual srs_error_t pull() = 0;// Get and set the context id of coroutine.virtual const SrsContextId& cid() = 0;virtual void set_cid(const SrsContextId& cid) = 0;
};// An empty coroutine, user can default to this object before create any real coroutine.
// @see https://github.com/ossrs/srs/pull/908
// 一个空的协程,用户可以在创建任何真正的协程序之前默认为这个对象。
class SrsDummyCoroutine : public SrsCoroutine
{
private:SrsContextId cid_;
public:SrsDummyCoroutine();virtual ~SrsDummyCoroutine();
public:virtual srs_error_t start();virtual void stop();virtual void interrupt();virtual srs_error_t pull();virtual const SrsContextId& cid();virtual void set_cid(const SrsContextId& cid);
};// A ST-coroutine is a lightweight thread, just like the goroutine.
// But the goroutine maybe run on different thread, while ST-coroutine only
// run in single thread, because it use setjmp and longjmp, so it may cause
// problem in multiple threads. For SRS, we only use single thread module,
// like NGINX to get very high performance, with asynchronous and non-blocking
// sockets.
// ST-coroutine是一个轻量级的线程,就像goroutine一样。
// 但是goroutine可能在不同的线程上运行,而ST-coroutine只在单个线程中运行,因为它使用了setjmp和longjmp,所以它可能会在多个线程中导致问题。
// 对于SRS,我们只使用单线程模块,类似NGINX,来获得非常高的性能,具有异步和非阻塞套接字。
// @reamrk For multiple processes, please use go-oryx to fork many SRS processes.
// 对于多个进程,请使用go-oryx来fork多个SRS进程。
// Please read https://github.com/ossrs/go-oryx
// @remark For debugging of ST-coroutine, read _st_iterate_threads_flag of ST/README
// https://github.com/ossrs/state-threads/blob/st-1.9/README#L115
// @remark We always create joinable thread, so we must join it or memory leak,
// Please read https://github.com/ossrs/srs/issues/78
class SrsSTCoroutine : public SrsCoroutine
{
private:SrsFastCoroutine* impl_;
public:// Create a thread with name n and handler h.// @remark User can specify a cid for thread to use, or we will allocate a new one.SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h);SrsSTCoroutine(std::string n, ISrsCoroutineHandler* h, SrsContextId cid);virtual ~SrsSTCoroutine();
public:// Set the stack size of coroutine, default to 0(64KB).void set_stack_size(int v);
public:// Start the thread.// @remark Should never start it when stopped or terminated.virtual srs_error_t start();// Interrupt the thread then wait to terminated.// @remark If user want to notify thread to quit async, for example if there are// many threads to stop like the encoder, use the interrupt to notify all threads// to terminate then use stop to wait for each to terminate.virtual void stop();// Interrupt the thread and notify it to terminate, it will be wakeup if it's blocked// in some IO operations, such as st_read or st_write, then it will found should quit,// finally the thread should terminated normally, user can use the stop to join it.virtual void interrupt();// Check whether thread is terminated normally or error(stopped or termianted with error),// and the thread should be running if it return ERROR_SUCCESS.// @remark Return specified error when thread terminated normally with error.// @remark Return ERROR_THREAD_TERMINATED when thread terminated normally without error.// @remark Return ERROR_THREAD_INTERRUPED when thread is interrupted.virtual srs_error_t pull();// Get and set the context id of thread.virtual const SrsContextId& cid();virtual void set_cid(const SrsContextId& cid);
};// High performance coroutine.
// 高性能协程
class SrsFastCoroutine
{
private:std::string name;int stack_size;ISrsCoroutineHandler* handler;
private:srs_thread_t trd;SrsContextId cid_;srs_error_t trd_err;
private:bool started;bool interrupted;bool disposed;// Cycle done, no need to interrupt it.bool cycle_done;
private:// Sub state in disposed, we need to wait for thread to quit.// 子状态被处理后,我们需要等待线程退出。bool stopping_;SrsContextId stopping_cid_;
public:SrsFastCoroutine(std::string n, ISrsCoroutineHandler* h);SrsFastCoroutine(std::string n, ISrsCoroutineHandler* h, SrsContextId cid);virtual ~SrsFastCoroutine();
public:void set_stack_size(int v);
public:srs_error_t start();void stop();void interrupt();inline srs_error_t pull() {if (trd_err == srs_success) {return srs_success;}return srs_error_copy(trd_err);}const SrsContextId& cid();virtual void set_cid(const SrsContextId& cid);
private:srs_error_t cycle();static void* pfn(void* arg);
};// Like goroutine sync.WaitGroup.
// 类似go语言的sync.WaitGroup
class SrsWaitGroup
{
private:int nn_;srs_cond_t done_;
public:SrsWaitGroup();virtual ~SrsWaitGroup();
public:// When start for n coroutines.void add(int n);// When coroutine is done.void done();// Wait for all corotine to be done.void wait();
};#endif
srs_app_st.cpp
#include <srs_app_st.hpp>#include <string>
using namespace std;#include <srs_kernel_error.hpp>ISrsCoroutineHandler::ISrsCoroutineHandler()
{
}ISrsCoroutineHandler::~ISrsCoroutineHandler()
{
}ISrsStartable::ISrsStartable()
{
}ISrsStartable::~ISrsStartable()
{
}SrsCoroutine::SrsCoroutine()
{
}SrsCoroutine::~SrsCoroutine()
{
}SrsDummyCoroutine::SrsDummyCoroutine()
{
}SrsDummyCoroutine::~SrsDummyCoroutine()
{
}srs_error_t SrsDummyCoroutine::start()
{return srs_error_new(ERROR_THREAD, "dummy coroutine");
}void SrsDummyCoroutine::stop()
{
}void SrsDummyCoroutine::interrupt()
{
}srs_error_t SrsDummyCoroutine::pull()
{return srs_error_new(ERROR_THREAD, "dummy pull");
}const SrsContextId& SrsDummyCoroutine::cid()
{return cid_;
}void SrsDummyCoroutine::set_cid(const SrsContextId& cid)
{cid_ = cid;
}SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h)
{impl_ = new SrsFastCoroutine(n, h);
}SrsSTCoroutine::SrsSTCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid)
{impl_ = new SrsFastCoroutine(n, h, cid);
}SrsSTCoroutine::~SrsSTCoroutine()
{srs_freep(impl_);
}void SrsSTCoroutine::set_stack_size(int v)
{impl_->set_stack_size(v);
}srs_error_t SrsSTCoroutine::start()
{return impl_->start();
}void SrsSTCoroutine::stop()
{impl_->stop();
}void SrsSTCoroutine::interrupt()
{impl_->interrupt();
}srs_error_t SrsSTCoroutine::pull()
{return impl_->pull();
}const SrsContextId& SrsSTCoroutine::cid()
{return impl_->cid();
}void SrsSTCoroutine::set_cid(const SrsContextId& cid)
{impl_->set_cid(cid);
}SrsFastCoroutine::SrsFastCoroutine(string n, ISrsCoroutineHandler* h)
{// TODO: FIXME: Reduce duplicated code.name = n;handler = h;trd = NULL;trd_err = srs_success;started = interrupted = disposed = cycle_done = false;stopping_ = false;// 0 use default, default is 64K.stack_size = 0;
}SrsFastCoroutine::SrsFastCoroutine(string n, ISrsCoroutineHandler* h, SrsContextId cid)
{name = n;handler = h;cid_ = cid;trd = NULL;trd_err = srs_success;started = interrupted = disposed = cycle_done = false;stopping_ = false;// 0 use default, default is 64K.stack_size = 0;
}SrsFastCoroutine::~SrsFastCoroutine()
{stop();// TODO: FIXME: We must assert the cycle is done.srs_freep(trd_err);
}void SrsFastCoroutine::set_stack_size(int v)
{stack_size = v;
}srs_error_t SrsFastCoroutine::start()
{srs_error_t err = srs_success;if (started || disposed) {if (disposed) {err = srs_error_new(ERROR_THREAD, "disposed");} else {err = srs_error_new(ERROR_THREAD, "started");}if (trd_err == srs_success) {trd_err = srs_error_copy(err);}return err;}if ((trd = (srs_thread_t)_pfn_st_thread_create(pfn, this, 1, stack_size)) == NULL) {err = srs_error_new(ERROR_THREAD, "create failed");srs_freep(trd_err);trd_err = srs_error_copy(err);return err;}started = true;return err;
}void SrsFastCoroutine::stop()
{if (disposed) {if (stopping_) {/*srs_error*/printf("thread is stopping by %s\n", stopping_cid_.c_str());srs_assert(!stopping_);}return;}disposed = true;stopping_ = true;interrupt();// When not started, the trd is NULL.if (trd) {void* res = NULL;int r0 = srs_thread_join(trd, &res);if (r0) {// By st_thread_joinif (errno == EINVAL) srs_assert(!r0);if (errno == EDEADLK) srs_assert(!r0);// By st_cond_timedwaitif (errno == EINTR) srs_assert(!r0);if (errno == ETIME) srs_assert(!r0);// Otherssrs_assert(!r0);}srs_error_t err_res = (srs_error_t)res;if (err_res != srs_success) {// When worker cycle done, the error has already been overrided,// so the trd_err should be equal to err_res.srs_assert(trd_err == err_res);}}// If there's no error occur from worker, try to set to terminated error.if (trd_err == srs_success && !cycle_done) {trd_err = srs_error_new(ERROR_THREAD, "terminated");}// Now, we'are stopped.stopping_ = false;return;
}void SrsFastCoroutine::interrupt()
{if (!started || interrupted || cycle_done) {return;}interrupted = true;if (trd_err == srs_success) {trd_err = srs_error_new(ERROR_THREAD, "interrupted");}// Note that if another thread is stopping thread and waiting in st_thread_join,// the interrupt will make the st_thread_join fail.srs_thread_interrupt(trd);
}const SrsContextId& SrsFastCoroutine::cid()
{return cid_;
}void SrsFastCoroutine::set_cid(const SrsContextId& cid)
{cid_ = cid;
// srs_context_set_cid_of(trd, cid);
}srs_error_t SrsFastCoroutine::cycle()
{
// if (_srs_context) {
// if (cid_.empty()) {
// cid_ = _srs_context->generate_id();
// }
// _srs_context->set_id(cid_);
// }srs_error_t err = handler->cycle();if (err != srs_success) {return srs_error_wrap(err, "coroutine cycle");}// Set cycle done, no need to interrupt it.cycle_done = true;return err;
}void* SrsFastCoroutine::pfn(void* arg)
{SrsFastCoroutine* p = (SrsFastCoroutine*)arg;srs_error_t err = p->cycle();// Set the err for function pull to fetch it.// @see https://github.com/ossrs/srs/pull/1304#issuecomment-480484151if (err != srs_success) {srs_freep(p->trd_err);// It's ok to directly use it, because it's returned by st_thread_join.p->trd_err = err;}return (void*)err;
}SrsWaitGroup::SrsWaitGroup()
{nn_ = 0;done_ = srs_cond_new();
}SrsWaitGroup::~SrsWaitGroup()
{wait();srs_cond_destroy(done_);
}void SrsWaitGroup::add(int n)
{nn_ += n;
}void SrsWaitGroup::done()
{nn_--;if (nn_ <= 0) {srs_cond_signal(done_);}
}void SrsWaitGroup::wait()
{if (nn_ > 0) {srs_cond_wait(done_);}
}
srs_protocol_st.hpp
#ifndef SRS_PROTOCOL_ST_HPP
#define SRS_PROTOCOL_ST_HPP#include "chw_adapt.h"
#include <string>#include <srs_kernel_error.hpp>
#include "srs_protocol_io.hpp"// Wrap for coroutine.
typedef void* srs_netfd_t;
typedef void* srs_thread_t;
typedef void* srs_cond_t;
typedef void* srs_mutex_t;// Initialize ST, requires epoll for linux.
extern srs_error_t srs_st_init();
// Destroy ST, free resources for asan detecting.
extern void srs_st_destroy(void);// Close the netfd, and close the underlayer fd.
// @remark when close, user must ensure io completed.
extern void srs_close_stfd(srs_netfd_t& stfd);// Set the FD_CLOEXEC of FD.
extern srs_error_t srs_fd_closeexec(int fd);// Set the SO_REUSEADDR of fd.
extern srs_error_t srs_fd_reuseaddr(int fd);// Set the SO_REUSEPORT of fd.
extern srs_error_t srs_fd_reuseport(int fd);// Set the SO_KEEPALIVE of fd.
extern srs_error_t srs_fd_keepalive(int fd);// Get current coroutine/thread.
extern srs_thread_t srs_thread_self();
extern void srs_thread_exit(void* retval);
extern int srs_thread_join(srs_thread_t thread, void **retvalp);
extern void srs_thread_interrupt(srs_thread_t thread);
extern void srs_thread_yield();// For utest to mock the thread create.
typedef void* (*_ST_THREAD_CREATE_PFN)(void *(*start)(void *arg), void *arg, int joinable, int stack_size);
extern _ST_THREAD_CREATE_PFN _pfn_st_thread_create;// For client, to open socket and connect to server.
// @param tm The timeout in srs_utime_t.
extern srs_error_t srs_tcp_connect(std::string server, int port, srs_utime_t tm, srs_netfd_t* pstfd);// For server, listen at TCP endpoint.
extern srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd);// For server, listen at UDP endpoint.
extern srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd);// Wrap for coroutine.
extern srs_cond_t srs_cond_new();
extern int srs_cond_destroy(srs_cond_t cond);
extern int srs_cond_wait(srs_cond_t cond);
extern int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout);
extern int srs_cond_signal(srs_cond_t cond);
extern int srs_cond_broadcast(srs_cond_t cond);extern srs_mutex_t srs_mutex_new();
extern int srs_mutex_destroy(srs_mutex_t mutex);
extern int srs_mutex_lock(srs_mutex_t mutex);
extern int srs_mutex_unlock(srs_mutex_t mutex);extern int srs_key_create(int* keyp, void (*destructor)(void*));
extern int srs_thread_setspecific(int key, void* value);
extern int srs_thread_setspecific2(srs_thread_t thread, int key, void* value);
extern void* srs_thread_getspecific(int key);extern int srs_netfd_fileno(srs_netfd_t stfd);extern int srs_usleep(srs_utime_t usecs);extern srs_netfd_t srs_netfd_open_socket(int osfd);
extern srs_netfd_t srs_netfd_open(int osfd);extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout);
extern int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr *to, int tolen, srs_utime_t timeout);
extern int srs_recvmsg(srs_netfd_t stfd, struct msghdr *msg, int flags, srs_utime_t timeout);
extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout);extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout);extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout);extern bool srs_is_never_timeout(srs_utime_t tm);// The mutex locker.
#define SrsLocker(instance) \impl__SrsLocker _SRS_free_##instance(&instance)class impl__SrsLocker
{
private:srs_mutex_t* lock;
public:impl__SrsLocker(srs_mutex_t* l) {lock = l;int r0 = srs_mutex_lock(*lock);srs_assert(!r0);}virtual ~impl__SrsLocker() {int r0 = srs_mutex_unlock(*lock);srs_assert(!r0);}
};// the socket provides TCP socket over st,
// that is, the sync socket mechanism.
class SrsStSocket : public ISrsProtocolReadWriter
{
private:// The recv/send timeout in srs_utime_t.// @remark Use SRS_UTIME_NO_TIMEOUT for never timeout.srs_utime_t rtm;srs_utime_t stm;// The recv/send data in bytesint64_t rbytes;int64_t sbytes;// The underlayer st fd.srs_netfd_t stfd_;
public:SrsStSocket();SrsStSocket(srs_netfd_t fd);virtual ~SrsStSocket();
private:void init(srs_netfd_t fd);
public:virtual void set_recv_timeout(srs_utime_t tm);virtual srs_utime_t get_recv_timeout();virtual void set_send_timeout(srs_utime_t tm);virtual srs_utime_t get_send_timeout();virtual int64_t get_recv_bytes();virtual int64_t get_send_bytes();
public:// @param nread, the actual read bytes, ignore if NULL.virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);// @param nwrite, the actual write bytes, ignore if NULL.virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};// The client to connect to server over TCP.
// User must never reuse the client when close it.
// Usage:
// SrsTcpClient client("127.0.0.1", 1935, 9 * SRS_UTIME_SECONDS);
// client.connect();
// client.write("Hello world!", 12, NULL);
// client.read(buf, 4096, NULL);
// @remark User can directly free the object, which will close the fd.
class SrsTcpClient : public ISrsProtocolReadWriter
{
private:srs_netfd_t stfd_;SrsStSocket* io;
private:std::string host;int port;// The timeout in srs_utime_t.srs_utime_t timeout;
public:// Constructor.// @param h the ip or hostname of server.// @param p the port to connect to.// @param tm the timeout in srs_utime_t.SrsTcpClient(std::string h, int p, srs_utime_t tm);virtual ~SrsTcpClient();
public:// Connect to server over TCP.// @remark We will close the exists connection before do connect.virtual srs_error_t connect();
// Interface ISrsProtocolReadWriter
public:virtual void set_recv_timeout(srs_utime_t tm);virtual srs_utime_t get_recv_timeout();virtual void set_send_timeout(srs_utime_t tm);virtual srs_utime_t get_send_timeout();virtual int64_t get_recv_bytes();virtual int64_t get_send_bytes();virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};#endif
srs_protocol_st.cpp
#include <srs_protocol_st.hpp>#include <st.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string.h>
using namespace std;
#include <srs_kernel_error.hpp>// nginx also set to 512
#define SERVER_LISTEN_BACKLOG 512#ifdef __linux__
#include <sys/epoll.h>bool srs_st_epoll_is_supported(void)
{struct epoll_event ev;ev.events = EPOLLIN;ev.data.ptr = NULL;/* Guaranteed to fail */epoll_ctl(-1, EPOLL_CTL_ADD, -1, &ev);return (errno != ENOSYS);
}
#endifsrs_error_t srs_st_init()
{
#ifdef __linux__// check epoll, some old linux donot support epoll.if (!srs_st_epoll_is_supported()) {return srs_error_new(ERROR_THREAD, "linux epoll disabled");}
#endif// Select the best event system available on the OS. In Linux this is// epoll(). On BSD it will be kqueue.
#if defined(SRS_CYGWIN64)if (st_set_eventsys(ST_EVENTSYS_SELECT) == -1) {return srs_error_new(ERROR_ST_SET_SELECT, "st enable st failed, current is %s", st_get_eventsys_name());}
#elseif (st_set_eventsys(ST_EVENTSYS_ALT) == -1) {return srs_error_new(ERROR_THREAD, "st enable st failed, current is %s", st_get_eventsys_name());}
#endif// Before ST init, we might have already initialized the background cid.
// SrsContextId cid = _srs_context->get_id();
// if (cid.empty()) {
// cid = _srs_context->generate_id();
// }int r0 = 0;if((r0 = st_init()) != 0){return srs_error_new(ERROR_THREAD, "st initialize failed, r0=%d", r0);}// Switch to the background cid.
// _srs_context->set_id(cid);printf("st_init success, use %s", st_get_eventsys_name());return srs_success;
}void srs_st_destroy(void)
{st_destroy();
}void srs_close_stfd(srs_netfd_t& stfd)
{if (stfd) {// we must ensure the close is ok.int r0 = st_netfd_close((st_netfd_t)stfd);if (r0) {// By _st_epoll_fd_close or _st_kq_fd_closeif (errno == EBUSY) srs_assert(!r0);// By closeif (errno == EBADF) srs_assert(!r0);if (errno == EINTR) srs_assert(!r0);if (errno == EIO) srs_assert(!r0);// Otherssrs_assert(!r0);}stfd = NULL;}
}srs_error_t srs_fd_closeexec(int fd)
{int flags = fcntl(fd, F_GETFD);flags |= FD_CLOEXEC;if (fcntl(fd, F_SETFD, flags) == -1) {return srs_error_new(ERROR_THREAD, "FD_CLOEXEC fd=%d", fd);}return srs_success;
}srs_error_t srs_fd_reuseaddr(int fd)
{int v = 1;if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &v, sizeof(int)) == -1) {return srs_error_new(ERROR_THREAD, "SO_REUSEADDR fd=%d", fd);}return srs_success;
}srs_error_t srs_fd_reuseport(int fd)
{
#if defined(SO_REUSEPORT)int v = 1;if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &v, sizeof(int)) == -1) {printf("SO_REUSEPORT failed for fd=%d", fd);}
#else#warning "SO_REUSEPORT is not supported by your OS"srs_warn("SO_REUSEPORT is not supported util Linux kernel 3.9");
#endifreturn srs_success;
}srs_error_t srs_fd_keepalive(int fd)
{
#ifdef SO_KEEPALIVEint v = 1;if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &v, sizeof(int)) == -1) {return srs_error_new(ERROR_THREAD, "SO_KEEPALIVE fd=%d", fd);}
#endifreturn srs_success;
}srs_thread_t srs_thread_self()
{return (srs_thread_t)st_thread_self();
}void srs_thread_exit(void* retval)
{st_thread_exit(retval);
}int srs_thread_join(srs_thread_t thread, void **retvalp)
{return st_thread_join((st_thread_t)thread, retvalp);
}void srs_thread_interrupt(srs_thread_t thread)
{st_thread_interrupt((st_thread_t)thread);
}void srs_thread_yield()
{st_thread_yield();
}_ST_THREAD_CREATE_PFN _pfn_st_thread_create = (_ST_THREAD_CREATE_PFN)st_thread_create;srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t* pstfd)
{st_utime_t timeout = ST_UTIME_NO_TIMEOUT;if (tm != SRS_UTIME_NO_TIMEOUT) {timeout = tm;}*pstfd = NULL;srs_netfd_t stfd = NULL;char sport[8];int r0 = snprintf(sport, sizeof(sport), "%d", port);srs_assert(r0 > 0 && r0 < (int)sizeof(sport));addrinfo hints;memset(&hints, 0, sizeof(hints));hints.ai_family = AF_UNSPEC;hints.ai_socktype = SOCK_STREAM;addrinfo* r = NULL;SrsAutoFreeH(addrinfo, r, freeaddrinfo);if(getaddrinfo(server.c_str(), sport, (const addrinfo*)&hints, &r)) {return srs_error_new(ERROR_THREAD, "get address info");}int sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol);if(sock == -1){return srs_error_new(ERROR_SOCKET_CREATE, "create socket");}srs_assert(!stfd);stfd = st_netfd_open_socket(sock);if(stfd == NULL){::close(sock);return srs_error_new(ERROR_THREAD, "open socket");}if (st_connect((st_netfd_t)stfd, r->ai_addr, r->ai_addrlen, timeout) == -1){srs_close_stfd(stfd);return srs_error_new(ERROR_THREAD, "connect to %s:%d", server.c_str(), port);}*pstfd = stfd;return srs_success;
}srs_error_t do_srs_tcp_listen(int fd, addrinfo* r, srs_netfd_t* pfd)
{srs_error_t err = srs_success;// Detect alive for TCP connection.// @see https://github.com/ossrs/srs/issues/1044if ((err = srs_fd_keepalive(fd)) != srs_success) {return srs_error_wrap(err, "set keepalive");}if ((err = srs_fd_closeexec(fd)) != srs_success) {return srs_error_wrap(err, "set closeexec");}if ((err = srs_fd_reuseaddr(fd)) != srs_success) {return srs_error_wrap(err, "set reuseaddr");}if ((err = srs_fd_reuseport(fd)) != srs_success) {return srs_error_wrap(err, "set reuseport");}if (::bind(fd, r->ai_addr, r->ai_addrlen) == -1) {return srs_error_new(ERROR_THREAD, "bind");}if (::listen(fd, SERVER_LISTEN_BACKLOG) == -1) {return srs_error_new(ERROR_THREAD, "listen");}if ((*pfd = srs_netfd_open_socket(fd)) == NULL){return srs_error_new(ERROR_THREAD, "st open");}return err;
}srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd)
{srs_error_t err = srs_success;char sport[8];int r0 = snprintf(sport, sizeof(sport), "%d", port);srs_assert(r0 > 0 && r0 < (int)sizeof(sport));addrinfo hints;memset(&hints, 0, sizeof(hints));hints.ai_family = AF_UNSPEC;hints.ai_socktype = SOCK_STREAM;hints.ai_flags = AI_NUMERICHOST;addrinfo* r = NULL;SrsAutoFreeH(addrinfo, r, freeaddrinfo);if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {return srs_error_new(ERROR_THREAD, "getaddrinfo hints=(%d,%d,%d)",hints.ai_family, hints.ai_socktype, hints.ai_flags);}int fd = 0;if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",r->ai_family, r->ai_socktype, r->ai_protocol);}if ((err = do_srs_tcp_listen(fd, r, pfd)) != srs_success) {::close(fd);return srs_error_wrap(err, "fd=%d", fd);}return err;
}srs_error_t do_srs_udp_listen(int fd, addrinfo* r, srs_netfd_t* pfd)
{srs_error_t err = srs_success;if ((err = srs_fd_closeexec(fd)) != srs_success) {return srs_error_wrap(err, "set closeexec");}if ((err = srs_fd_reuseaddr(fd)) != srs_success) {return srs_error_wrap(err, "set reuseaddr");}if ((err = srs_fd_reuseport(fd)) != srs_success) {return srs_error_wrap(err, "set reuseport");}if (::bind(fd, r->ai_addr, r->ai_addrlen) == -1) {return srs_error_new(ERROR_THREAD, "bind");}if ((*pfd = srs_netfd_open_socket(fd)) == NULL){return srs_error_new(ERROR_THREAD, "st open");}return err;
}srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd)
{srs_error_t err = srs_success;char sport[8];int r0 = snprintf(sport, sizeof(sport), "%d", port);srs_assert(r0 > 0 && r0 < (int)sizeof(sport));addrinfo hints;memset(&hints, 0, sizeof(hints));hints.ai_family = AF_UNSPEC;hints.ai_socktype = SOCK_DGRAM;hints.ai_flags = AI_NUMERICHOST;addrinfo* r = NULL;SrsAutoFreeH(addrinfo, r, freeaddrinfo);if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) {return srs_error_new(ERROR_THREAD, "getaddrinfo hints=(%d,%d,%d)",hints.ai_family, hints.ai_socktype, hints.ai_flags);}int fd = 0;if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) {return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d",r->ai_family, r->ai_socktype, r->ai_protocol);}if ((err = do_srs_udp_listen(fd, r, pfd)) != srs_success) {::close(fd);return srs_error_wrap(err, "fd=%d", fd);}return err;
}srs_cond_t srs_cond_new()
{return (srs_cond_t)st_cond_new();
}int srs_cond_destroy(srs_cond_t cond)
{return st_cond_destroy((st_cond_t)cond);
}int srs_cond_wait(srs_cond_t cond)
{return st_cond_wait((st_cond_t)cond);
}int srs_cond_timedwait(srs_cond_t cond, srs_utime_t timeout)
{return st_cond_timedwait((st_cond_t)cond, (st_utime_t)timeout);
}int srs_cond_signal(srs_cond_t cond)
{return st_cond_signal((st_cond_t)cond);
}int srs_cond_broadcast(srs_cond_t cond)
{return st_cond_broadcast((st_cond_t)cond);
}srs_mutex_t srs_mutex_new()
{return (srs_mutex_t)st_mutex_new();
}int srs_mutex_destroy(srs_mutex_t mutex)
{if (!mutex) {return 0;}return st_mutex_destroy((st_mutex_t)mutex);
}int srs_mutex_lock(srs_mutex_t mutex)
{return st_mutex_lock((st_mutex_t)mutex);
}int srs_mutex_unlock(srs_mutex_t mutex)
{return st_mutex_unlock((st_mutex_t)mutex);
}int srs_key_create(int *keyp, void (*destructor)(void *))
{return st_key_create(keyp, destructor);
}int srs_thread_setspecific(int key, void *value)
{return st_thread_setspecific(key, value);
}void *srs_thread_getspecific(int key)
{return st_thread_getspecific(key);
}int srs_thread_setspecific2(srs_thread_t thread, int key, void* value)
{return st_thread_setspecific2((st_thread_t)thread, key, value);
}int srs_netfd_fileno(srs_netfd_t stfd)
{return st_netfd_fileno((st_netfd_t)stfd);
}int srs_usleep(srs_utime_t usecs)
{return st_usleep((st_utime_t)usecs);
}srs_netfd_t srs_netfd_open_socket(int osfd)
{return (srs_netfd_t)st_netfd_open_socket(osfd);
}srs_netfd_t srs_netfd_open(int osfd)
{return (srs_netfd_t)st_netfd_open(osfd);
}int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout)
{return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout);
}int srs_sendto(srs_netfd_t stfd, void *buf, int len, const struct sockaddr * to, int tolen, srs_utime_t timeout)
{return st_sendto((st_netfd_t)stfd, buf, len, to, tolen, (st_utime_t)timeout);
}int srs_recvmsg(srs_netfd_t stfd, struct msghdr *msg, int flags, srs_utime_t timeout)
{return st_recvmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout);
}int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, srs_utime_t timeout)
{return st_sendmsg((st_netfd_t)stfd, msg, flags, (st_utime_t)timeout);
}srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout)
{return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout);
}ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout)
{return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout);
}bool srs_is_never_timeout(srs_utime_t tm)
{return tm == SRS_UTIME_NO_TIMEOUT;
}SrsStSocket::SrsStSocket()
{init(NULL);
}SrsStSocket::SrsStSocket(srs_netfd_t fd)
{init(fd);
}SrsStSocket::~SrsStSocket()
{
}void SrsStSocket::init(srs_netfd_t fd)
{stfd_ = fd;stm = rtm = SRS_UTIME_NO_TIMEOUT;rbytes = sbytes = 0;
}void SrsStSocket::set_recv_timeout(srs_utime_t tm)
{rtm = tm;
}srs_utime_t SrsStSocket::get_recv_timeout()
{return rtm;
}void SrsStSocket::set_send_timeout(srs_utime_t tm)
{stm = tm;
}srs_utime_t SrsStSocket::get_send_timeout()
{return stm;
}int64_t SrsStSocket::get_recv_bytes()
{return rbytes;
}int64_t SrsStSocket::get_send_bytes()
{return sbytes;
}srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread)
{srs_error_t err = srs_success;srs_assert(stfd_);ssize_t nb_read;if (rtm == SRS_UTIME_NO_TIMEOUT) {nb_read = st_read((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT);} else {nb_read = st_read((st_netfd_t)stfd_, buf, size, rtm);}if (nread) {*nread = nb_read;}// On success a non-negative integer indicating the number of bytes actually read is returned// (a value of 0 means the network connection is closed or end of file is reached).// Otherwise, a value of -1 is returned and errno is set to indicate the error.if (nb_read <= 0) {if (nb_read < 0 && errno == ETIME) {return srs_error_new(ERROR_THREAD, "timeout %d ms", srsu2msi(rtm));}if (nb_read == 0) {errno = ECONNRESET;}return srs_error_new(ERROR_THREAD, "read");}rbytes += nb_read;return err;
}srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread)
{srs_error_t err = srs_success;srs_assert(stfd_);ssize_t nb_read;if (rtm == SRS_UTIME_NO_TIMEOUT) {nb_read = st_read_fully((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT);} else {nb_read = st_read_fully((st_netfd_t)stfd_, buf, size, rtm);}if (nread) {*nread = nb_read;}// On success a non-negative integer indicating the number of bytes actually read is returned// (a value less than nbyte means the network connection is closed or end of file is reached)// Otherwise, a value of -1 is returned and errno is set to indicate the error.if (nb_read != (ssize_t)size) {if (nb_read < 0 && errno == ETIME) {return srs_error_new(ERROR_THREAD, "timeout %d ms", srsu2msi(rtm));}if (nb_read >= 0) {errno = ECONNRESET;}return srs_error_new(ERROR_THREAD, "read fully, size=%d, nn=%d", size, nb_read);}rbytes += nb_read;return err;
}srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite)
{srs_error_t err = srs_success;srs_assert(stfd_);ssize_t nb_write;if (stm == SRS_UTIME_NO_TIMEOUT) {nb_write = st_write((st_netfd_t)stfd_, buf, size, ST_UTIME_NO_TIMEOUT);} else {nb_write = st_write((st_netfd_t)stfd_, buf, size, stm);}if (nwrite) {*nwrite = nb_write;}// On success a non-negative integer equal to nbyte is returned.// Otherwise, a value of -1 is returned and errno is set to indicate the error.if (nb_write <= 0) {if (nb_write < 0 && errno == ETIME) {return srs_error_new(ERROR_THREAD, "write timeout %d ms", srsu2msi(stm));}return srs_error_new(ERROR_THREAD, "write");}sbytes += nb_write;return err;
}srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{srs_error_t err = srs_success;srs_assert(stfd_);ssize_t nb_write;if (stm == SRS_UTIME_NO_TIMEOUT) {nb_write = st_writev((st_netfd_t)stfd_, iov, iov_size, ST_UTIME_NO_TIMEOUT);} else {nb_write = st_writev((st_netfd_t)stfd_, iov, iov_size, stm);}if (nwrite) {*nwrite = nb_write;}// On success a non-negative integer equal to nbyte is returned.// Otherwise, a value of -1 is returned and errno is set to indicate the error.if (nb_write <= 0) {if (nb_write < 0 && errno == ETIME) {return srs_error_new(ERROR_THREAD, "writev timeout %d ms", srsu2msi(stm));}return srs_error_new(ERROR_THREAD, "writev");}sbytes += nb_write;return err;
}SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm)
{stfd_ = NULL;io = new SrsStSocket();host = h;port = p;timeout = tm;
}SrsTcpClient::~SrsTcpClient()
{srs_freep(io);srs_close_stfd(stfd_);
}srs_error_t SrsTcpClient::connect()
{srs_error_t err = srs_success;srs_netfd_t stfd = NULL;if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) {return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout));}// TODO: FIMXE: The timeout set on io need to be set to new object.srs_freep(io);io = new SrsStSocket(stfd);srs_close_stfd(stfd_);stfd_ = stfd;return err;
}void SrsTcpClient::set_recv_timeout(srs_utime_t tm)
{io->set_recv_timeout(tm);
}srs_utime_t SrsTcpClient::get_recv_timeout()
{return io->get_recv_timeout();
}void SrsTcpClient::set_send_timeout(srs_utime_t tm)
{io->set_send_timeout(tm);
}srs_utime_t SrsTcpClient::get_send_timeout()
{return io->get_send_timeout();
}int64_t SrsTcpClient::get_recv_bytes()
{return io->get_recv_bytes();
}int64_t SrsTcpClient::get_send_bytes()
{return io->get_send_bytes();
}srs_error_t SrsTcpClient::read(void* buf, size_t size, ssize_t* nread)
{return io->read(buf, size, nread);
}srs_error_t SrsTcpClient::read_fully(void* buf, size_t size, ssize_t* nread)
{return io->read_fully(buf, size, nread);
}srs_error_t SrsTcpClient::write(void* buf, size_t size, ssize_t* nwrite)
{return io->write(buf, size, nwrite);
}srs_error_t SrsTcpClient::writev(const iovec *iov, int iov_size, ssize_t* nwrite)
{return io->writev(iov, iov_size, nwrite);
}
ST协程库测试
#include "srs_app_st.hpp"
#include <st.h>//0.在ST_TEST对象里启动一个协程
class ST_TEST : public ISrsCoroutineHandler{
public:ST_TEST(){trd = NULL;srs_freep(trd);trd = new SrsSTCoroutine("ST_TEST-", this,"9527ID");//2.new一个ST对象}srs_error_t startST(){srs_error_t err = srs_success;if ((err = trd->start()) != srs_success) {//3.start()创建协程return srs_error_wrap(err, "start timer");}printf("\nST.startST\n");return err;}
public: virtual srs_error_t cycle() {//4.协程处理函数,回调cycle()srs_error_t err = srs_success;printf("ST.cycle\n");return err;}
private:SrsCoroutine* trd;
};srs_st_init();//1.初始化STST_TEST *pST_TEST = new ST_TEST;pST_TEST->startST();st_thread_exit(NULL);
打印
st_init success, use epoll
ST.startST
ST.cycle
SrsAutoFree测试
class SrsAutoFree_TEST{
public:SrsAutoFree_TEST(){printf("SrsAutoFree_TEST\n");}~SrsAutoFree_TEST(){printf("~SrsAutoFree_TEST\n");}
};void testAutoFree()
{SrsAutoFree_TEST *pSrsAutoFree_TEST = nullptr;pSrsAutoFree_TEST = new SrsAutoFree_TEST;SrsAutoFree(SrsAutoFree_TEST,pSrsAutoFree_TEST);
}testAutoFree();
打印
SrsAutoFree_TEST
~SrsAutoFree_TEST