- 什么是rpc, rpc是干什么的?
- 几种协议的压测数据对比:
- tcp
- http
- 使用tcp实现一个简单的rpc服务
什么是rpc, rpc是干什么的?
rpc的作用就是实现远程的服务调用
工作流程: 客户端携带服务信息(服务名,方法名)+数据 去请求服务端,服务端拿到数据,解析后执行对应的方法,将结果返回给客户端.
看上去与http的请求流程很类似. 的确,有的rpc框架就是使用的http作为通信协议.(grpc使用的http2;rpcx也支持http的通信协议)
要提高rpc架构的关键在于选用高性能的网络协议,很少有人会使用http1作为rpc的网络协议;因为http1的性能并不是很好,单线程的形式以及http1.0还会频繁的创建与断开tcp,这会拉低rpc这种频繁调用的框架的性能
几种协议的压测数据对比:
tcp
serve.go
package mainimport ("io""log""net"
)func handleConnection(conn net.Conn) {defer conn.Close()buffer := make([]byte, 4)// 读取客户端的消息for {_, err := conn.Read(buffer)if err != nil {if err != io.EOF {log.Println("Error reading from connection:", err)}return}go conn.Write([]byte("pong"))}
}func startServer(address string) {listener, err := net.Listen("tcp", address)if err != nil {log.Fatal("Error starting server:", err)}defer listener.Close()log.Printf("Server listening on %s...\n", address)// 等待并处理客户端连接for {conn, err := listener.Accept()if err != nil {log.Println("Error accepting connection:", err)continue}// 每个连接启动一个 goroutine 来处理go handleConnection(conn)}
}func main() {startServer(":8080")
}
client_test.go
func pingServer(conn net.Conn) error {// 发送 ping 请求_, err := conn.Write([]byte("ping"))if err != nil {return fmt.Errorf("error sending ping: %v", err)}// 读取响应buffer := make([]byte, 4)_, err = conn.Read(buffer)if err != nil {return fmt.Errorf("error reading response: %v", err)}return nil}// BenchmarkPingServer 是基准测试函数,用于测试 TCP 客户端的性能func BenchmarkPingServer(b *testing.B) {conn, err := net.Dial("tcp", "localhost:8080")if err != nil {b.Fatalf("Failed to connect to server: %v", err)}defer conn.Close()// 设置并发数,使用 b.RunParallel 来模拟并发b.RunParallel(func(pb *testing.PB) {for pb.Next() {// 在每个 goroutine 中执行 ping 请求if err := pingServer(conn); err != nil {b.Errorf("Failed to ping server: %v", err)}}})//BenchmarkPingServer-8 191895 6838 ns/op 8 B/op 2 allocs/op}func BenchmarkPingServer2(b *testing.B) {conn, err := net.Dial("tcp", "localhost:8080")if err != nil {b.Fatalf("Failed to connect to server: %v", err)}defer conn.Close()for i := 0; i < b.N; i++ {// 每个 goroutine 发送一次 ping 请求if err := pingServer(conn); err != nil {b.Errorf("Failed to ping server: %v", err)}}//BenchmarkPingServer2-8 49065 20747 ns/op 8 B/op 2 allocs/op}
http
serve.go
// http_server.go
package mainimport ("log""net/http"
)// pingHandler 是处理 ping 请求的 HTTP 处理函数
func pingHandler(w http.ResponseWriter, r *http.Request) {// 如果请求是 ping,就返回 pongif r.Method == http.MethodGet && r.URL.Path == "/ping" {w.Write([]byte("pong"))return}// 如果请求路径不正确,返回 404http.NotFound(w, r)
}// 启动 HTTP 服务器
func startServer(address string) {http.HandleFunc("/ping", pingHandler)log.Printf("Server listening on %s...\n", address)err := http.ListenAndServe(address, nil)if err != nil {log.Fatal("Error starting server: ", err)}
}func main() {// 启动 HTTP 服务端startServer(":8081")
}
client_test.go
// pingServer 向服务器发送 ping 请求并接收响应
func pingServerHttp(address string) error {// 向服务器的 /ping 接口发送 GET 请求resp, err := http.Get(address + "/ping")if err != nil {return fmt.Errorf("error sending GET request: %v", err)}defer resp.Body.Close()// 确保返回的状态码是 200 OKif resp.StatusCode != http.StatusOK {return fmt.Errorf("unexpected status code: %d", resp.StatusCode)}return nil
}// BenchmarkPingServer 是基准测试函数,用于测试 HTTP 客户端的性能
func BenchmarkPingServerHttp(b *testing.B) {address := "http://localhost:8081"// 使用 b.RunParallel 来模拟并发请求b.RunParallel(func(pb *testing.PB) {for pb.Next() {// 每个 goroutine 发送一次 ping 请求if err := pingServerHttp(address); err != nil {b.Errorf("Failed to ping server: %v", err)}}})
}func BenchmarkPingServerHttp2(b *testing.B) {address := "http://localhost:8081"for i := 0; i < b.N; i++ {// 每个 goroutine 发送一次 ping 请求if err := pingServerHttp(address); err != nil b.Errorf("Failed to ping server: %v", err)}//BenchmarkPingServerHttp2-8 4976 225154 ns/op 16762 B/op 122 allocs/op}
}
在单线程单连接 的情况下tcp有近5万的qps;单连接多线程的情况下有近20万的qps;而http在单线程情况下只有5000左右,并发的情况下有2万左右, 性能差距有10倍左右.
使用tcp实现一个简单的rpc服务
public.go
package public
//公共的方法与类
import ("bytes""encoding/binary"
)func Encode(data []byte) []byte {l := len(data)lBytes := IntToBytes(l)return append(lBytes, data...)
}func IntToBytes(n int) []byte {data := int64(n)bytebuf := bytes.NewBuffer([]byte{})binary.Write(bytebuf, binary.BigEndian, data)return bytebuf.Bytes()
}func BytesToInt(bys []byte) int {bytebuff := bytes.NewBuffer(bys)var data int64binary.Read(bytebuff, binary.BigEndian, &data)return int(data)
}type ReqData struct {ServerName stringTag string //标记哪个线程调用的服务,返回的时候带上可以将数据传输到对应的县城Data []byte
}
type RspData struct {Tag string //标记哪个线程调用的服务,返回的时候带上可以将数据传输到对应的县城Data []byte
}type AddReq struct {NumA intNumB int
}type AddRsp struct {Sum int
}
server.go
package mainimport ("bufio""encoding/json""fmt""net""rpc_demo/public"
)type Server struct{}func (s *Server) Add(a *public.AddReq) *public.AddRsp {return &public.AddRsp{Sum: a.NumA + a.NumB}
}// 服务调用
// 服务名+方法名
// 封装对应的服务调用过程:根据方法名解析数据,并调用对应的方法
// 数据打包返回
// 这里做简化板手写处理:1. 没有实现自动化的服务方法注册;2. 我暂定使用uuid进行标识请求,以便于客户端可以将数据读取到对应的请求线程上,但事实上uuid过长,应该使用更为简单的标识方式
func serve(conn net.Conn) {defer conn.Close()reader := bufio.NewReader(conn)for {//解析长度lBytes := make([]byte, 8)_, err := reader.Read(lBytes[:])if err != nil {fmt.Printf("数据读取失败%v\n", err)return}l := public.BytesToInt(lBytes)reqBytes := make([]byte, l)_, err = reader.Read(reqBytes)if err != nil {fmt.Printf("数据读取失败%v\n", err)return}go func(reqData []byte) {req := new(public.ReqData)err = json.Unmarshal(reqData, req)if err != nil {fmt.Printf("json 解析失败%v\n", err)return}//解析处理(这里只注册了一个服务接口)switch req.ServerName {case "Server.Add":s := &Server{}data := new(public.AddReq)err := json.Unmarshal(req.Data, data)if err != nil {fmt.Printf("json 解析失败%v\n", err)return}rsp := s.Add(data)result, err := json.Marshal(rsp)if err != nil {fmt.Printf("数据编码失败%v\n", err)return}rspBytes, err := json.Marshal(&public.RspData{Tag: req.Tag, Data: result})if err != nil {fmt.Printf("数据编码失败%v\n", err)return}rspData := append(public.IntToBytes(len(rspBytes)), rspBytes...)conn.Write(rspData)default:conn.Write([]byte("该方法没有注册"))}}(reqBytes)}}func main() {listen, err := net.Listen("tcp", "127.0.0.1:9999")if err != nil {fmt.Println("Listen() failed, err: ", err)return}for {conn, err := listen.Accept() // 监听客户端的连接请求if err != nil {fmt.Println("Accept() failed, err: ", err)continue}go serve(conn) // 启动一个goroutine来处理客户端的连接请求}
}
client.go
package mainimport ("bufio""encoding/json""fmt""net""rpc_demo/public""time""github.com/google/uuid"
)type Client struct{ Conn net.Conn }func NewClient() *Client {conn, err := net.Dial("tcp", "127.0.0.1:9999")if err != nil {fmt.Println("err : ", err)return nil}return &Client{Conn: conn}
}// 每次调用都生成单独的uuid,并作为key,请求后select uuid对应的chan,直到有数据,读取数据,关闭通道,清除对应的map记录
var M map[string]chan ([]byte)// 启动客户端连接服务端并解析数据
func (c *Client) Run() {defer c.Conn.Close() // 关闭TCP连接reader := bufio.NewReader(c.Conn)for {lBytes := make([]byte, 8)_, err := reader.Read(lBytes[:])if err != nil {fmt.Printf("数据读取失败")return}l := public.BytesToInt(lBytes)reqBytes := make([]byte, l)_, err = reader.Read(reqBytes)if err != nil {fmt.Printf("数据读取失败")return}//解析数据体并写入对应的chango func(data []byte) {rspData := new(public.RspData)err := json.Unmarshal(data, rspData)if err != nil {fmt.Printf("数据解析失败")return}M[rspData.Tag] <- rspData.Data}(reqBytes)}
}// 我这边就不封装自动call了,直接手动call
func (c *Client) Call(serverAndfunc string, data []byte) []byte {//生成uuidtag := uuid.New().String()reqData := &public.ReqData{ServerName: serverAndfunc, Tag: tag, Data: data}r, err := json.Marshal(reqData)if err != nil {fmt.Println("编码错误")return nil}Ch := make(chan []byte)defer close(Ch)defer delete(M, tag)M[tag] = Chc.Conn.Write(append(public.IntToBytes(len(r)), r...))return <-Ch
}
压力测试
func BenchmarkJson(b *testing.B) {M = make(map[string]chan []byte)//建立tcp连接服务端client := NewClient()// 启动处理go client.Run()//模拟调用call方法req1 := &public.AddReq{NumA: 1,NumB: 2,}reqdata1, err := json.Marshal(req1)if err != nil {fmt.Println("编码错误")return}// 开始计时b.StartTimer()for i := 0; i < b.N; i++ {_ = client.Call("Server.Add", reqdata1)}// BenchmarkJson-8 29181 39678 ns/op 1065 B/op 21 allocs/op
}
在单线程单连接的情况下有近3万的吞吐量,性能也还可以.因为这个做得很粗糙
总结: 网络通信才是rpc框架的瓶颈,json在本地编码解码可以有1000万的qps;proto的性能是json的4-5倍;但是rpcx的吞吐量是20万左右,grpc是15万左右,因为tcp的通信是20万,很http2是15万左右;http1.0是两万左右 这就是瓶颈;
这样我们可以联系到redis;redis可以有10万qps,为什么是10万?因为网络的瓶颈就是10多万
mysql为什么只有1万,因为mysql储存磁盘的,会更慢.