文章目录
- 异常信息
- 原由
- 代码
- 错误点
- 解决办法
异常信息
panic: concurrent write to websocket connection
原由
golang 编写 websocket
go版本:1.19
使用了第三方框架: https://github.com/gorilla/websocket/tree/main
代码
server.go
// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package mainimport ("flag""fmt""html/template""log""net/http""github.com/gorilla/websocket"
)var addr = flag.String("addr", "localhost:8080", "http service address")var upgrader = websocket.Upgrader{} // use default optionsfunc echo(w http.ResponseWriter, r *http.Request) {c, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Print("upgrade:", err)return}defer c.Close()for {mt, message, err := c.ReadMessage()if err != nil {log.Println("read:", err)break}fmt.Println(string(message))fmt.Println(mt)//log.Printf("recv: %s, type: %s", message, websocket.FormatMessageType(mt))err = c.WriteMessage(mt, message)if err != nil {log.Println("write:", err)break}}
}func home(w http.ResponseWriter, r *http.Request) {homeTemplate.Execute(w, "ws://"+r.Host+"/echo")
}func main() {flag.Parse()log.SetFlags(0)http.HandleFunc("/echo", echo)http.HandleFunc("/", home)log.Fatal(http.ListenAndServe(*addr, nil))
}var homeTemplate = template.Must(template.New("").Parse(`
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<script>
window.addEventListener("load", function(evt) {var output = document.getElementById("output");var input = document.getElementById("input");var ws;var print = function(message) {var d = document.createElement("div");d.textContent = message;output.appendChild(d);output.scroll(0, output.scrollHeight);};document.getElementById("open").onclick = function(evt) {if (ws) {return false;}ws = new WebSocket("{{.}}");ws.onopen = function(evt) {print("OPEN");}ws.onclose = function(evt) {print("CLOSE");ws = null;}ws.onmessage = function(evt) {print("RESPONSE: " + evt.data);}ws.onerror = function(evt) {print("ERROR: " + evt.data);}return false;};document.getElementById("send").onclick = function(evt) {if (!ws) {return false;}print("SEND: " + input.value);ws.send(input.value);return false;};document.getElementById("close").onclick = function(evt) {if (!ws) {return false;}ws.close();return false;};});
</script>
</head>
<body>
<table>
<tr><td valign="top" width="50%">
<p>Click "Open" to create a connection to the server,
"Send" to send a message to the server and "Close" to close the connection.
You can change the message and send multiple times.
<p>
<form>
<button id="open">Open</button>
<button id="close">Close</button>
<p><input id="input" type="text" value="Hello world!">
<button id="send">Send</button>
</form>
</td><td valign="top" width="50%">
<div id="output" style="max-height: 70vh;overflow-y: scroll;"></div>
</td></tr></table>
</body>
</html>
`))
client.go
// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package mainimport ("flag""fmt""log""net/url""os""os/signal""time""github.com/gorilla/websocket"
)var addr = flag.String("addr", "localhost:8080", "http service address")func main() {flag.Parse()log.SetFlags(0)interrupt := make(chan os.Signal, 1)signal.Notify(interrupt, os.Interrupt)u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}log.Printf("connecting to %s", u.String())c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)if err != nil {log.Fatal("dial:", err)}defer c.Close()done := make(chan struct{})go func() {// 发送Ping帧,检查连接是否活跃for {if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {log.Println("Failed to send Ping: ", err)return}fmt.Println("Ping success")time.Sleep(10 * time.Second)}}()go func() {defer close(done)for {mt, message, err := c.ReadMessage()if err != nil {log.Println("read:", err)return}fmt.Println(mt)fmt.Println(string(message)) // 时间//log.Printf("recv: %s, type: %s", message, websocket.FormatMessageType(mt))}}()ticker := time.NewTicker(time.Second)defer ticker.Stop()for {select {case <-done:returncase t := <-ticker.C:err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))if err != nil {log.Println("write:", err)return}case <-interrupt:log.Println("interrupt")// Cleanly close the connection by sending a close message and then// waiting (with timeout) for the server to close the connection.err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))if err != nil {log.Println("write close:", err)return}select {case <-done:case <-time.After(time.Second):}return}}
}
错误点
我希望在连接过程中,通信双方一直检测,也就使用了 PING
,检测活跃。
go func() {// 发送Ping帧,检查连接是否活跃for {if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {log.Println("Failed to send Ping: ", err)return}fmt.Println("Ping success")time.Sleep(10 * time.Second)}}()
出现了开始的错误信息:panic: concurrent write to websocket connection
,错误信息说:不能并发的给 socket 发消息。
错误 “concurrent write to websocket connection” 指的是有多个goroutine尝试同时向同一个WebSocket连接写入数据,这是不被允许的。gorilla/websocket 库并不是为并发写操作设计的,因此你需要确保对每个WebSocket连接的写操作在任何时候只由一个goroutine执行。
解决这个问题的方法是使用同步机制,比如互斥锁(sync.Mutex),来同步对WebSocket连接的写操作。下面是一个修改后的示例,展示如何使用互斥锁来避免并发写的问题:
解决办法
在这个示例中,我们定义了一个WebSocketConnection结构体,它包含一个websocket.Conn和一个sync.Mutex。在发送Ping消息的goroutine中,我们在写操作之前获取互斥锁,并在写操作完成后释放锁。这样可以确保在任何时候只有一个goroutine能够执行写操作。
请注意,如果还有其他goroutine需要写入WebSocket连接,它们也需要在执行写操作前获取互斥锁,并在完成后释放锁。这样可以避免并发写入的问题,并确保WebSocket连接的正确使用。
示例代码
package mainimport ("log""net/http""sync""github.com/gorilla/websocket"
)var upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {return true},
}// 定义一个结构体来包含WebSocket连接和互斥锁
type WebSocketConnection struct {Conn *websocket.ConnLock sync.Mutex
}func handleConnections(ws *websocket.Conn) {defer ws.Close()log.Println("Connection established")// 创建WebSocketConnection实例conn := &WebSocketConnection{Conn: ws,Lock: sync.Mutex{},}// Ping goroutinego func() {for {// 使用互斥锁来同步写操作conn.Lock()if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil {log.Println("Failed to send Ping: ", err)return}conn.Unlock()time.Sleep(10 * time.Second)}}()// 消息处理goroutinego func() {// 这里可以处理接收到的消息等// ...}()// 这里可以添加更多的goroutine来处理不同的任务// ...
}func main() {http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {ws, err := upgrader.Upgrade(w, r, nil)if err != nil {log.Println("upgrade:", err)return}go handleConnections(ws)})log.Fatal(http.ListenAndServe(":8080", nil))
}