1. 本程序功能
1) 要有完整的request 和 response;
2) 支持多进程并行处理任务;
3)子进程任务结束后无僵尸进程
2.Apache Thrift C++库的编译和安装
见
步步详解:Apache Thrift C++库从编译到工作模式DEMO_北雨南萍的博客-CSDN博客
3.框架生成
数据字段定义:
cat Datainfo.thrift
# Datainfo.thrift
struct message
{
1:i32 seqId,
2:string content
}
struct response
{
1:i32 seqId,
2:string content
}
service serDemo
{
response put(1:message msg)
}
thrift -gen cpp Datainfo.thrift
4.Server端源码
serDemo_server.skeleton.cpp
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>#include <iostream>
#include <stdexcept>
#include <thread>#include "serDemo.h"#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>using namespace ::apache::thrift;
using namespace ::apache::thrift::concurrency;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;void handler(int num) {//我接受到了SIGCHLD的信号啦int status;int pid = waitpid(-1,&status,WNOHANG);if(WIFEXITED(status)) {printf("The child exit with code %d\n",WEXITSTATUS(status));}
}class serDemoHandler : virtual public serDemoIf {public:serDemoHandler() {// 构造函数:用于初始任务处理句柄对象}void put(response& _return, const message& msg) {// Your implementation goes hereprintf("put starting ...\n");printf("receive message: id: %d, content: %s\n", msg.seqId, msg.content.c_str());char* ffmpeg_argv[]={"/ffmpeg","-i","/opt/videoroom-1234-user-161756615651626-1615963571436156-video.mp4","-t","60","-vcodec","libx264","-f","flv","-y","/opt/videoroom-1234-user-161756615651626-1615963571436156-video.flv",NULL};printf("ffmpeg_argv:\n");for (int i = 0; ffmpeg_argv[i] != NULL; i++) {printf("%s ", ffmpeg_argv[i]);}printf("\n");pid_t pid = fork();if (pid == 0) {execvp(ffmpeg_argv[0], ffmpeg_argv);exit(0);} else if (pid > 0) {printf("This is the parent process\n");} else {printf("Fork failed\n");}// 回复消息_return.seqId = msg.seqId;_return.content = "This is the response message.";printf("put stopped.\n");}};int main(int argc, char **argv) {int port = 19090;signal(SIGCHLD,handler);/*** TThreadPoolServer工作模式 ***/try {//创建一个线程管理器std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(16);std::shared_ptr<ThreadFactory> threadFactory = std::shared_ptr<ThreadFactory>(new ThreadFactory());threadManager->threadFactory(threadFactory);threadManager->start();std::shared_ptr<serDemoHandler> handler(new serDemoHandler());std::shared_ptr<TProcessor> processor(new serDemoProcessor(handler));::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());TThreadPoolServer server(processor, serverTransport, transportFactory, protocolFactory,threadManager);// 启动服务::std::cout << "Starting the TThreadPoolServer ..." << std::endl;server.serve();::std::cout << "Server stopped." << std::endl;} catch (std::exception& e) {std::cerr << "Error: " << e.what() << std::endl;}return 0;
}
编译生成server
g++ -std=c++11 -g -Wall Datainfo_types.cpp serDemo.cpp serDemo_server.skeleton.cpp -o server -lthrift -lpthread
5. client端源码
client.cpp
// 在同级目录下创建 client.cpp 文件
// ----------替换成自己的头文件----------
#include "serDemo.h"
// --------------------------------------
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport; using boost::shared_ptr; int main(int argc, char **argv) { /* 不能用localhost, 否则会有运行时提示:"* TSocket::open() connect() <Host: localhost Port: 19090>: Connection refused*///std::shared_ptr<TSocket> socket(new TSocket("localhost", 19090)); std::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 19090)); std::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));serDemoClient client(protocol); transport->open(); // ----------------------------我们的代码写在这里------------------------------ message msg; msg.seqId = 1; msg.content = "client message"; response response_put;client.put(response_put, msg); printf("seqId: %d, content: %s\n", response_put.seqId, response_put.content.c_str());//-------------------------------------------------------------------------- transport->close(); return 0;
}
编译生成client
g++ -std=c++11 -g -Wall Datainfo_types.cpp serDemo.cpp client.cpp -o client -lthrift -lpthread