40分钟学 Go 语言高并发:Context包与并发控制

Context包与并发控制

学习目标

知识点掌握程度应用场景
context原理深入理解实现机制并发控制和请求链路追踪
超时控制掌握超时设置和处理API请求超时、任务限时控制
取消信号传播理解取消机制和传播链优雅退出、资源释放
context最佳实践掌握使用规范和技巧工程实践中的常见场景

1. Context原理

1.1 Context基本结构和实现

让我们先看一个完整的Context使用示例:

package mainimport ("context""fmt""log""time"
)// 请求追踪信息
type RequestInfo struct {TraceID    stringSessionID  stringStartTime  time.Time
}// 服务接口
type Service interface {HandleRequest(ctx context.Context, req string) (string, error)
}// 业务服务实现
type BusinessService struct {name string
}func NewBusinessService(name string) *BusinessService {return &BusinessService{name: name}
}// 处理请求
func (s *BusinessService) HandleRequest(ctx context.Context, req string) (string, error) {// 获取请求追踪信息info, ok := ctx.Value("request-info").(*RequestInfo)if !ok {return "", fmt.Errorf("request info not found in context")}log.Printf("[%s] Processing request: %s, TraceID: %s, Session: %s\n",s.name, req, info.TraceID, info.SessionID)// 模拟处理过程select {case <-time.After(2 * time.Second):return fmt.Sprintf("Result for %s", req), nilcase <-ctx.Done():return "", ctx.Err()}
}// 请求中间件
func requestMiddleware(next Service) Service {return &middlewareService{next: next}
}type middlewareService struct {next Service
}func (m *middlewareService) HandleRequest(ctx context.Context, req string) (string, error) {// 开始时间startTime := time.Now()// 添加请求信息到contextinfo := &RequestInfo{TraceID:   fmt.Sprintf("trace-%d", time.Now().UnixNano()),SessionID: fmt.Sprintf("session-%d", time.Now().Unix()),StartTime: startTime,}ctx = context.WithValue(ctx, "request-info", info)// 调用下一个处理器result, err := m.next.HandleRequest(ctx, req)// 记录处理时间duration := time.Since(startTime)log.Printf("Request completed in %v, TraceID: %s\n", duration, info.TraceID)return result, err
}func main() {// 创建服务service := requestMiddleware(NewBusinessService("UserService"))// 创建基础contextctx := context.Background()// 添加超时控制ctx, cancel := context.WithTimeout(ctx, 3*time.Second)defer cancel()// 处理请求result, err := service.HandleRequest(ctx, "get user profile")if err != nil {log.Printf("Request failed: %v\n", err)return}log.Printf("Request succeeded: %s\n", result)// 模拟超时场景ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)defer cancel()result, err = service.HandleRequest(ctx, "get user settings")if err != nil {log.Printf("Request failed: %v\n", err)return}
}

2. 超时控制

让我们实现一个带有超时控制的HTTP服务:

package mainimport ("context""encoding/json""fmt""log""net/http""time"
)// 响应结构
type Response struct {Data  interface{} `json:"data,omitempty"`Error string      `json:"error,omitempty"`
}// 服务配置
type ServiceConfig struct {Timeout        time.DurationMaxConcurrent  intRetryAttempts  intRetryDelay     time.Duration
}// HTTP客户端包装器
type HTTPClient struct {client  *http.Clientconfig  ServiceConfiglimiter chan struct{} // 并发限制器
}// 创建新的HTTP客户端
func NewHTTPClient(config ServiceConfig) *HTTPClient {return &HTTPClient{client: &http.Client{Timeout: config.Timeout,},config: config,limiter: make(chan struct{}, config.MaxConcurrent),}
}// 发送HTTP请求
func (c *HTTPClient) DoRequest(ctx context.Context, method, url string) (*Response, error) {var lastErr errorfor attempt := 0; attempt <= c.config.RetryAttempts; attempt++ {select {case <-ctx.Done():return nil, ctx.Err()case c.limiter <- struct{}{}: // 获取并发许可}// 确保释放并发许可defer func() {<-c.limiter}()// 创建请求req, err := http.NewRequestWithContext(ctx, method, url, nil)if err != nil {return nil, fmt.Errorf("create request failed: %w", err)}// 设置请求超时reqCtx, cancel := context.WithTimeout(ctx, c.config.Timeout)defer cancel()// 执行请求resp, err := c.client.Do(req.WithContext(reqCtx))if err != nil {lastErr = errlog.Printf("Request failed (attempt %d): %v\n", attempt+1, err)// 如果不是最后一次尝试,等待后重试if attempt < c.config.RetryAttempts {select {case <-ctx.Done():return nil, ctx.Err()case <-time.After(c.config.RetryDelay):continue}}continue}defer resp.Body.Close()// 解析响应var result Responseif err := json.NewDecoder(resp.Body).Decode(&result); err != nil {return nil, fmt.Errorf("decode response failed: %w", err)}return &result, nil}return nil, fmt.Errorf("all retry attempts failed, last error: %v", lastErr)
}// 处理HTTP请求的处理器
func handleRequest(w http.ResponseWriter, r *http.Request) {// 创建contextctx := r.Context()// 模拟长时间处理select {case <-time.After(2 * time.Second):response := Response{Data: "Request processed successfully",}json.NewEncoder(w).Encode(response)case <-ctx.Done():response := Response{Error: "Request timeout",}w.WriteHeader(http.StatusGatewayTimeout)json.NewEncoder(w).Encode(response)}
}func main() {// 配置HTTP客户端config := ServiceConfig{Timeout:       5 * time.Second,MaxConcurrent: 10,RetryAttempts: 3,RetryDelay:    time.Second,}client := NewHTTPClient(config)// 创建HTTP服务器http.HandleFunc("/api", handleRequest)// 启动服务器go func() {log.Println("Server starting on :8080")if err := http.ListenAndServe(":8080", nil); err != nil {log.Fatal(err)}}()// 等待服务器启动time.Sleep(time.Second)// 测试请求ctx := context.Background()// 测试正常请求resp, err := client.DoRequest(ctx, "GET", "http://localhost:8080/api")if err != nil {log.Printf("Request failed: %v\n", err)} else {log.Printf("Response: %+v\n", resp)}// 测试超时请求ctx, cancel := context.WithTimeout(ctx, time.Second)defer cancel()resp, err = client.DoRequest(ctx, "GET", "http://localhost:8080/api")if err != nil {log.Printf("Request failed (expected): %v\n", err)} else {log.Printf("Response: %+v\n", resp)}// 保持主程序运行select {}
}
package mainimport ("context""fmt""math/rand""sync""time"
)// 请求处理器
type RequestHandler struct {requests  chan Requestresponses chan Responsedone      chan struct{}wg        sync.WaitGroup
}// 请求结构
type Request struct {ID      intTimeout time.DurationData    string
}// 响应结构
type Response struct {RequestID intResult    stringError     error
}// 创建新的请求处理器
func NewRequestHandler() *RequestHandler {return &RequestHandler{requests:  make(chan Request, 100),responses: make(chan Response, 100),done:      make(chan struct{}),}
}// 启动处理器
func (h *RequestHandler) Start(workers int) {for i := 0; i < workers; i++ {h.wg.Add(1)go h.worker(i)}
}// 工作协程
func (h *RequestHandler) worker(id int) {defer h.wg.Done()for {select {case req, ok := <-h.requests:if !ok {fmt.Printf("Worker %d: request channel closed\n", id)return}// 创建context用于超时控制ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)// 处理请求response := h.processRequest(ctx, req)// 发送响应select {case h.responses <- response:fmt.Printf("Worker %d: sent response for request %d\n", id, req.ID)case <-h.done:cancel()return}cancel() // 清理contextcase <-h.done:fmt.Printf("Worker %d: received stop signal\n", id)return}}
}// 处理单个请求
func (h *RequestHandler) processRequest(ctx context.Context, req Request) Response {// 模拟处理时间processTime := time.Duration(rand.Intn(int(req.Timeout))) + req.Timeout/2select {case <-time.After(processTime):return Response{RequestID: req.ID,Result:   fmt.Sprintf("Processed: %s", req.Data),}case <-ctx.Done():return Response{RequestID: req.ID,Error:    ctx.Err(),}}
}// 提交请求
func (h *RequestHandler) SubmitRequest(req Request) error {select {case h.requests <- req:return nilcase <-h.done:return fmt.Errorf("handler is stopped")}
}// 获取响应
func (h *RequestHandler) GetResponse() (Response, error) {select {case resp := <-h.responses:return resp, nilcase <-h.done:return Response{}, fmt.Errorf("handler is stopped")}
}// 停止处理器
func (h *RequestHandler) Stop() {close(h.done)h.wg.Wait()close(h.requests)close(h.responses)
}func main() {// 创建请求处理器handler := NewRequestHandler()handler.Start(3)// 发送一些测试请求requests := []Request{{ID: 1, Timeout: time.Second, Data: "Fast request"},{ID: 2, Timeout: time.Second * 2, Data: "Normal request"},{ID: 3, Timeout: time.Millisecond * 500, Data: "Quick request"},{ID: 4, Timeout: time.Second * 3, Data: "Slow request"},}// 提交请求for _, req := range requests {if err := handler.SubmitRequest(req); err != nil {fmt.Printf("Failed to submit request %d: %v\n", req.ID, err)continue}fmt.Printf("Submitted request %d\n", req.ID)}// 收集响应var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for i := 0; i < len(requests); i++ {resp, err := handler.GetResponse()if err != nil {fmt.Printf("Failed to get response: %v\n", err)continue}if resp.Error != nil {fmt.Printf("Request %d failed: %v\n", resp.RequestID, resp.Error)} else {fmt.Printf("Request %d succeeded: %s\n", resp.RequestID, resp.Result)}}}()// 等待所有响应处理完成wg.Wait()// 停止处理器handler.Stop()fmt.Println("Main: processing completed")
}

3. 取消信号传播

让我们继续完成取消信号传播的示例代码:

package mainimport ("context""fmt""log""sync""time"
)// 任务定义
type Task struct {ID      stringProcess func(ctx context.Context) (interface{}, error)
}// 工作池
type WorkerPool struct {workers    inttasks      chan Taskresults    chan interface{}errors     chan errordone       chan struct{}wg         sync.WaitGroup
}// 创建新的工作池
func NewWorkerPool(workers int) *WorkerPool {return &WorkerPool{workers: workers,tasks:   make(chan Task, workers*2),results: make(chan interface{}, workers*2),errors:  make(chan error, workers*2),done:    make(chan struct{}),}
}// 启动工作池
func (p *WorkerPool) Start(ctx context.Context) {// 启动workersfor i := 0; i < p.workers; i++ {p.wg.Add(1)go p.worker(ctx, i)}// 等待所有worker完成go func() {p.wg.Wait()close(p.done)close(p.results)close(p.errors)}()
}// worker处理任务
func (p *WorkerPool) worker(ctx context.Context, id int) {defer p.wg.Done()log.Printf("Worker %d started\n", id)for {select {case <-ctx.Done():log.Printf("Worker %d stopped: %v\n", id, ctx.Err())returncase task, ok := <-p.tasks:if !ok {log.Printf("Worker %d: task channel closed\n", id)return}log.Printf("Worker %d processing task %s\n", id, task.ID)// 创建任务专用的contexttaskCtx, cancel := context.WithTimeout(ctx, 5*time.Second)// 执行任务result, err := task.Process(taskCtx)cancel() // 释放任务context资源if err != nil {select {case p.errors <- fmt.Errorf("task %s failed: %w", task.ID, err):case <-ctx.Done():return}} else {select {case p.results <- result:case <-ctx.Done():return}}}}
}// 提交任务
func (p *WorkerPool) Submit(task Task) error {select {case p.tasks <- task:return nilcase <-p.done:return fmt.Errorf("worker pool is closed")}
}// 关闭工作池
func (p *WorkerPool) Close() {close(p.tasks)
}// 获取结果通道
func (p *WorkerPool) Results() <-chan interface{} {return p.results
}// 获取错误通道
func (p *WorkerPool) Errors() <-chan error {return p.errors
}func main() {// 创建根contextctx, cancel := context.WithCancel(context.Background())defer cancel()// 创建工作池pool := NewWorkerPool(3)pool.Start(ctx)// 创建模拟任务tasks := []Task{{ID: "task-1",Process: func(ctx context.Context) (interface{}, error) {select {case <-time.After(2 * time.Second):return "Task 1 completed", nilcase <-ctx.Done():return nil, ctx.Err()}},},{ID: "task-2",Process: func(ctx context.Context) (interface{}, error) {select {case <-time.After(3 * time.Second):return "Task 2 completed", nilcase <-ctx.Done():return nil, ctx.Err()}},},{ID: "task-3",Process: func(ctx context.Context) (interface{}, error) {select {case <-time.After(1 * time.Second):return nil, fmt.Errorf("task 3 failed")case <-ctx.Done():return nil, ctx.Err()}},},}// 提交任务for _, task := range tasks {if err := pool.Submit(task); err != nil {log.Printf("Failed to submit task %s: %v\n", task.ID, err)}}// 等待3秒后取消所有任务go func() {time.Sleep(3 * time.Second)log.Println("Cancelling all tasks...")cancel()}()// 收集结果和错误completed := 0expected := len(tasks)for completed < expected {select {case result, ok := <-pool.Results():if !ok {continue}log.Printf("Got result: %v\n", result)completed++case err, ok := <-pool.Errors():if !ok {continue}log.Printf("Got error: %v\n", err)completed++case <-ctx.Done():log.Printf("Main: context cancelled: %v\n", ctx.Err())completed = expected // 强制退出循环}}// 关闭工作池pool.Close()// 等待工作池完全关闭<-pool.donelog.Println("All workers stopped")
}

3.1 取消信号传播流程图

在这里插入图片描述

4. Context最佳实践

4.1 Context使用规范

  1. 函数调用链传递
// 推荐
func HandleRequest(ctx context.Context, req *Request) error// 不推荐
func HandleRequest(timeout time.Duration, req *Request) error
  1. Context应作为第一个参数
// 推荐
func ProcessTask(ctx context.Context, task *Task) error// 不推荐
func ProcessTask(task *Task, ctx context.Context) error
  1. 不要储存Context在结构体中
// 不推荐
type Service struct {ctx context.Context
}// 推荐
type Service struct {// 其他字段
}func (s *Service) DoWork(ctx context.Context) error

4.2 Context使用注意事项

  1. 不要将nil传递给context参数
// 推荐
ctx := context.Background()
ProcessTask(ctx, task)// 不推荐
ProcessTask(nil, task)
  1. context.Value应该只用于请求作用域数据
// 推荐
ctx = context.WithValue(ctx, "request-id", requestID)// 不推荐 - 配置信息应该通过其他方式传递
ctx = context.WithValue(ctx, "db-config", dbConfig)
  1. 正确处理取消信号
select {
case <-ctx.Done():return ctx.Err()
default:// 继续处理
}

4.3 实践建议

  1. 超时控制
  • 设置合理的超时时间
  • 在不同层级使用不同的超时时间
  • 确保资源正确释放
  1. 错误处理
  • 区分超时和取消错误
  • 传递有意义的错误信息
  • 实现优雅降级
  1. 性能优化
  • 避免创建过多的context
  • 合理使用context.Value
  • 及时取消不需要的操作
  1. 日志追踪
  • 记录关键操作的耗时
  • 追踪请求的完整链路
  • 记录取消原因

总结

关键点回顾

  1. Context原理
  • 继承关系
  • 值传递机制
  • 生命周期管理
  1. 超时控制
  • 设置超时时间
  • 处理超时信号
  • 资源清理
  1. 取消信号传播
  • 信号传递机制
  • 取消处理流程
  • 资源释放
  1. 最佳实践
  • 使用规范
  • 注意事项
  • 优化建议

实践建议

  1. 代码规范
  • 遵循命名约定
  • 合理组织代码结构
  • 添加必要的注释
  1. 错误处理
  • 使用有意义的错误信息
  • 实现错误恢复机制
  • 记录错误日志
  1. 性能优化
  • 减少不必要的context创建
  • 避免context.Value滥用
  • 及时释放资源

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/61694.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

音频信号采集前端电路分析

音频信号采集前端电路 一、实验要求 要求设计一个声音采集系统 信号幅度&#xff1a;0.1mVpp到1Vpp 信号频率&#xff1a;100Hz到16KHz 搭建一个带通滤波器&#xff0c;滤除高频和低频部分 ADC采用套件中的AD7920&#xff0c;转换率设定为96Ksps &#xff1b;96*161536 …

SpringBoot中使用Sharding-JDBC实战(实战+版本兼容+Bug解决)

一、实战 1、引入 ShardingSphere-JDBC 的依赖 https://mvnrepository.com/artifact/org.apache.shardingsphere/shardingsphere-jdbc/5.5.0 <!-- https://mvnrepository.com/artifact/org.apache.shardingsphere/shardingsphere-jdbc --> <dependency><grou…

网络编程 day1.2~day2——TCP和UDP的通信基础(TCP)

笔记脑图 作业&#xff1a; 1、将虚拟机调整到桥接模式联网。 2、TCP客户端服务器实现一遍。 服务器 #include <stdio.h> #include <string.h> #include <myhead.h> #define IP "192.168.60.44" #define PORT 6666 #define BACKLOG 20 int mai…

PyQT开发与实践:全面掌握跨平台桌面应用开发

目录 引言 PyQT简介 PyQT的主要特点 开发环境搭建 PyQT开发流程 1. 创建项目和主窗口 2. 添加控件和布局 3. 信号与槽 4. 样式和美化 高级特性 数据绑定和模型/视图编程 多线程和并发 国际化和本地化 实践案例&#xff1a;简单的计算器应用 1. 界面设计 2. 逻辑…

微信小程序条件渲染与列表渲染的全面教程

微信小程序条件渲染与列表渲染的全面教程 引言 在微信小程序的开发中,条件渲染和列表渲染是构建动态用户界面的重要技术。通过条件渲染,我们可以根据不同的状态展示不同的内容,而列表渲染则使得我们能够高效地展示一组数据。本文将详细讲解这两种渲染方式的用法,结合实例…

Origin教程003:数据导入(2)-从文件导入和导入矩阵数据

文章目录 3.3 从文件导入3.3.1 导入txt文件3.3.2 导入excel文件3.3.3 合并工作表3.4 导入矩阵数据3.3 从文件导入 所需数据 https://download.csdn.net/download/WwLK123/900267473.3.1 导入txt文件 选择【数据->从文件导入->导入向导】: 选择文件之后,点击完成即可…

刷题计划 day22回溯(一)【组合】【组合总和 III】【电话号码的字母组合】

⚡刷题计划day22 回溯&#xff08;一&#xff09;开始&#xff0c;此期开启回溯专题&#xff0c;敬请期待关注&#xff0c;可以点个免费的赞哦~ 往期可看专栏&#xff0c;关注不迷路&#xff0c; 您的支持是我的最大动力&#x1f339;~ 目录 回溯算法理论基础 回溯法解决的…

访问限定符

文章目录 一、访问限定符 一、访问限定符 C⼀种实现封装的⽅式&#xff0c;用类将对象的属性与方法结合在⼀块&#xff0c;让对象更加完善&#xff0c;通过访问权限选择性的将其接口提供给外部的用户使用。 public修饰的成员在类外可以直接被访问&#xff1b;protected和priva…

【论文阅读】WGSR

0. 摘要 0.1. 问题提出 1.超分辨率(SR)是一个不适定逆问题&#xff0c;可行解众多。 2.超分辨率(SR)算法在可行解中寻找一个在保真度和感知质量之间取得平衡的“良好”解。 3.现有的方法重建高频细节时会产生伪影和幻觉&#xff0c;模型区分图像细节与伪影仍是难题。 0.2. …

CSP/信奥赛C++语法基础刷题训练(23):洛谷P1217:[USACO1.5] 回文质数 Prime Palindromes

CSP/信奥赛C语法基础刷题训练&#xff08;23&#xff09;&#xff1a;洛谷P1217&#xff1a;[USACO1.5] 回文质数 Prime Palindromes 题目描述 因为 151 151 151 既是一个质数又是一个回文数&#xff08;从左到右和从右到左是看一样的&#xff09;&#xff0c;所以 151 151 …

【单元测试】【Android】JUnit 4 和 JUnit 5 的差异记录

背景 Jetbrain IDE 支持生成 Test 类&#xff0c;其中选择JUnit5 和 JUnit&#xff0c;但是感觉这不是标准的单元测试&#xff0c;因为接口命名吧。 差异对比 两者生成的单测API名称同原API&#xff0c;没加test前缀的。使用差异主要表现在&#xff1a; setUp &#xff06; …

Kylin Server V10 下基于Sentinel(哨兵)实现Redis高可用集群

一、什么是哨兵模式 Redis Sentinel 是一个分布式系统,为 Redis 提供高可用性解决方案。可以在一个架构中运行多个 Sentinel 进程(progress)这些进程使用流言协议(gossip protocols)来接收关于主服务器是否下线信息,并使用投票协议(agreement protocols)来决定是否执行…

扩散模型从原理到实战 入门

diffusion-models-class-CN/unit1/README_CN.md at main darcula1993/diffusion-models-class-CN GitHub 你可以使用命令行来通过此令牌登录 (huggingface-cli login) 或者运行以下单元来登录&#xff1a; from huggingface_hub import notebook_loginnotebook_login() http…

阅读《先进引信技术的发展与展望》识别和控制部分_笔记

基本信息 题名&#xff1a;先进引信技术的发展与展望 作者&#xff1a; 张合;戴可人 发表时间&#xff1a;2023-07-20 可装定、可探测、可处理、可控制是灵巧引信设计的四项基本能力。与之对应&#xff0c;先进引信的基础研究涵盖了信息交联技术、末端探测技术、目标识别技术…

07-Making a Bar Chart with D3.js and SVG

课程链接 Curran的课程&#xff0c;通过 D3.js 的 scaleLinear, max, scaleBand, axisLeft, axisBottom&#xff0c;根据 .csv 文件生成一个横向柱状图。 【注】如果想造csv数据&#xff0c;可以使用通义千问&#xff0c;关于LinearScale与BandScale不懂的地方也可以在通义千…

Fakelocation Server服务器/专业版 ubuntu

前言:需要Ubuntu系统 Fakelocation开源文件系统需求 Ubuntu | Fakelocation | 任务一 任务一 更新Ubuntu&#xff08;安装下载不再赘述&#xff09; sudo -i # 提权 sudo apt update # 更新软件包列表 sudo apt upgrade # 升级已安装的软…

从搭建uni-app+vue3工程开始

技术栈 uni-app、vue3、typescript、vite、sass、uview-plus、pinia 一、项目搭建 1、创建以 typescript 开发的工程 npx degit dcloudio/uni-preset-vue#vite-ts my-vue3-project2、安装sass npm install -D sass// 安装sass-loader&#xff0c;注意需要版本10&#xff0c;…

SMMU软件指南操作之流(stream)安全性和流标识

安全之安全(security)博客目录导读 目录 1、流安全性 2、流标识 2.1 什么是 StreamID? 2.2 SubstreamID 的作用 1、流安全性 SMMUv3 架构在没有实现 RME 设备分配的情况下,支持两种可选的安全状态,这由 SMMU_S_IDR1.SECURE_IMPL 报告。如果实现了 RME 设备分配,则通过…

Android仿前端分页组件pagination

仿前端pagination Android仿前端分页组件pagination 最近Android原生有个需求就是做个分页组件&#xff0c;不用上拉加载&#xff0c;因为数据量太大用户喜欢前端的方式&#xff0c;UI主要是拼凑比较简单&#xff0c;主要补充了一些判断越界和数据不全的细节&#xff0c;记录方…

贴代码框架PasteForm特性介绍之query,linkquery

简介 PasteForm是贴代码推出的 “新一代CRUD” &#xff0c;基于ABPvNext&#xff0c;目的是通过对Dto的特性的标注&#xff0c;从而实现管理端的统一UI&#xff0c;借助于配套的PasteBuilder代码生成器&#xff0c;你可以快速的为自己的项目构建后台管理端&#xff01;目前管…