cache教程 5.分布式节点的通信

0.对原教程的一些见解

其回顾完请求流程就是抽象了两个接口,PeerPicker和PeerGetter。这样操作,读者阅读时可能很难快速明白其含义,不好理解为什么就创建出两个接口,感觉会比较疑惑。原教程的评论中也有讨论这点。

 本教程就先不创建接口,而是使用struct方式,这样可能好理解点。

1.节点请求处理的流程

先弄清楚我们查询缓存的逻辑。

单节点: 

客户发送查询请求到节点A,该节点有缓存就立即返回,若是没有就执行用户设置的回调函数获取值并添加到缓存中,然后返回。

分布式节点:

客户端发送查询请求到某个缓存节点,该节点会判断该key是否在本地,若是不在本地,使用一致性哈希选择节点,若不是在远程节点,则就退回到本地节点处理;若在远程节点,该节点会发送请求去访问其他 node 节点。(不是客户端再去访问其他节点)

从这可以看出,一个node要处理两种请求,一个是来自客户端的外部请求,一个是来自其他远端节点的内部请求

为了清晰,划分职责,我们可以在一个node中启动两种HTTP服务,一个处理客户端请求(APIServer), 一个处理节点之间的请求(CacheServer)

2.HTTP客户端

之前我们为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此,我们接下来先实现客户端的功能。这个客户端是节点作为客户端去访问其他节点

  • baseURL 表示将要访问的远程节点的地址,例如 http://example.com/geecache/
type httpGetter struct {baseURL string
}func (h *httpGetter) Get(group string, key string) ([]byte, error) {//QueryEscape 对字符串进行转义,以便可以将其安全地放置在 URL 查询中。u := fmt.Sprintf("%v/%v/%v", h.baseURL,url.QueryEscape(group),url.QueryEscape(key))res, err := http.Get(u)if err != nil {return nil, err}defer res.Body.Close()if res.StatusCode != http.StatusOK {return nil, fmt.Errorf("server returned: %v", res.Status)}bytes, err := io.ReadAll(res.Body)if err != nil {return nil, fmt.Errorf("reading response body: %v", err)}return bytes, nil
}

3.回顾上一章节实现的单节点的访问流程

func (g *Group) Get(key string) (ByteView, error) {//现在本地查询if v, ok := g.mainCache.get(key); ok {return v, nil}return g.load(key)   
}func (g *Group) load(key string) (ByteView, error) {bytes, err := g.getter.Get(key)if err != nil {return ByteView{}, err}value := ByteView{b: cloneByte(bytes)}g.mainCache.add(key, value)return value, nil
}

那很明显是需要修改load方法,让其可以去访问远程节点。

在load方法中,伪代码如下。

func func (g *Group) load(key string) (ByteView, error){if 有远程节点 {if 找到key所在的远程节点 {本地作为客户端去访问该远程节点}}没有远程节点,只能在本地调用回调函数去源地方获取
}

要想在Group中访问节点,那么就要在Group中存储节点集合。

节点结合结构体Peers

那节点集合是不是又要创建一个结构体?那先试试创建一个结构体Peers。

因为 hash 环的 map 不是线程安全的,所以这里要加锁。

成员变量 httpGetters,映射远程节点与对应的 httpGetter。(httpGetter就是个客户端,是一个节点作为客户端),每一个远程节点对应一个 httpGetter,因为 httpGetter 与远程节点的地址 baseURL 有关,map的key是远程节点的地址,比如"http://localhost:10000"

type Peers struct {addr          string //这个是用于进行选择节点时用来判断是不是本地节点basePath      stringmutex         sync.Mutex    //guards peersHashRing and httpGetterspeersHashRing *consistenthash.HashRinghttpGetters   map[string]*httpGetter
}//这是HTTP服务端章节的HTTPPool,这是很相似的
type HTTPPool struct {addr     stringbasePath string
}

那么该结构体Peers就要有添加远程节点和通过key去获取远程节点的方法。

增添远程节点方法Set

通过该方法可以知道其map的key是远程节点的地址。

// 使用用例:Set("http://localhost:8001","http://localhost:8002")
func (p *Peers) Set(peers ...string) {p.mutex.Lock()defer p.mutex.Unlock()p.peersHashRing = consistenthash.NewHash(50, nil)p.peersHashRing.Add(peers...) //在 hash 环上添加真实节点和虚拟节点//存储远端节点信息p.httpGetters = make(map[string]*httpGetter)for _, peer := range peers {p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}}
}

通过key去获取远程节点的方法PickPeer

Peers结构体中的变量addr在这里派上用场了,返回的地址要是等于本身addr,那就返回false,不用自己作为客户端再去访问自己。

func (p *Peers) PickPeer(key string) (*httpGetter, bool) {p.mutex.Lock()defer p.mutex.Unlock()//这里返回的peer是个地址,可以查看(Peers).Set函数中的参数if peer := p.peersHashRing.Get(key); peer != "" && peer != p.addr {fmt.Println("pick peer ", peer)return p.httpGetters[peer], true}return &httpGetter{}, false
}

Peers这个结构体就实现了,可以看到其与HTTPPool是很相似的。对比HTTPPool,就是成员变量添加了一些,方法也添加了一些,也没有改变HTTPPool原有的逻辑,只是扩张了。所以可以把Peers的内容添加到HTTPPool中去,具体的代码就不在这里显示了。

type HTTPPool struct {addr     stringbasePath string//新添加的,把Peers内容增添到HTTPPool中mutex         sync.MutexpeersHashRing *consistenthash.HashRinghttpGetters   map[string]*httpGetter
}

4.集成,实现主流程

最后,我们需要将上述新增的功能集成在主流程(geecache.go)中。

在Group结构体中有改变。

新增 RegisterPeers() 方法,将 peers 注入到 Group 中。

type Group struct {name      stringmainCache cachegetter    Getterpeers *Peers //添加了节点集合
}// 往分组内注册节点集合
func (g *Group) RegisterPeers(peers *Peers) {if g.peers != nil {panic("RegisterPeerPicker called more than once")}g.peers = peers
}

最终再回到load函数,这个函数是需要修改的。

func (g *Group) load(key string) (value ByteView, err error) {if g.peers != nil {    //有远程节点的情况if peer, ok := g.peers.PickPeer(key); ok {    //通过key找到该远程节点if value, err = g.getFromPeer(peer, key); err == nil {return value, nil        //找到值}log.Println("[GeeCache] Failed to get from peer", err)}}return g.getLocally(key)    //回到本地处理
}func (g *Group) getFromPeer(peer *httpGetter, key string) (ByteView, error) {bytes, err := peer.Get(g.name, key)if err != nil {return ByteView{}, err}return ByteView{b: bytes}, nil
}func (g *Group) getLocally(key string) (ByteView, error) {bytes, err := g.getter.Get(key)if err != nil {return ByteView{}, err}value := ByteView{b: cloneByte(bytes)}g.mainCache.add(key, value)return value, nil
}
  • 新增 getFromPeer() 方法,使用httpGetter 访问远程节点,获取缓存值。
  • 修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()

5. 测试

总结——缓存节点启动的流程

  1. 创建 Group 对象.(用于存储我们的缓存数据)
  2. 启动缓存 http 服务.(创建 HTTPPool,添加节点信息,注册到缓存分组中)
  3. 启动 API 服务.(用于与客户端进行交互)

 测试代码:

var db = map[string]string{"Tom":  "630","Jack": "589","Sam":  "567",
}func main() {var port intvar api boolflag.IntVar(&port, "port", 8001, "Geecache server port")flag.BoolVar(&api, "api", false, "Start a api server?")flag.Parse()apiAddr := "http://localhost:9999"addrMap := map[int]string{8001: "http://localhost:8001",8002: "http://localhost:8002",8003: "http://localhost:8003",}var addrs []stringfor _, v := range addrMap {addrs = append(addrs, v)}gee := createGroup()if api {go startAPIServer(apiAddr, gee)}startCacheServer(addrMap[port], addrs, gee)time.Sleep(time.Second * 1000)
}func createGroup() *cache.Group {return cache.NewGroup("scores", 2<<10, cache.GetterFunc(func(key string) ([]byte, error) {if v, ok := db[key]; ok {return []byte(v), nil}return nil, fmt.Errorf("%s not exit", key)}))
}func startCacheServer(addr string, addrs []string, groups *cache.Group) {//HTTPPool是节点结合和HTTP服务端peers := cache.NewHTTPPool(addr, cache.DefaultBasePath)peers.Set(addrs...)         //添加节点groups.RegisterPeers(peers) //注册节点集合log.Println("geecache is running at", addr)http.ListenAndServe(addr[7:], peers)
}func startAPIServer(apiAddr string, groups *cache.Group) {http.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) {key := r.URL.Query().Get("key")view, err := groups.Get(key)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}w.Header().Set("Content-Type", "application/octet-stream")w.Write(view.ByteSlice())})log.Println("fontend server is running at", apiAddr)http.ListenAndServe(apiAddr[7:], nil)
}

为了方便,我们将启动的命令封装为一个 shell 脚本:

我们开启了三个节点(都是在同一个台机器上的,只是用不同端口来当做一个节点,进行区分)。

在端口8003的节点上开启APIServer,用户去访问时候,都是访问端口8003的那个节点。

#!/bin/bash#trap 命令用于在 shell 脚本退出时,删掉临时文件,结束在该shell脚本运行的后台程序
trap "rm server;kill 0" EXITgo build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &sleep 2
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &wait

结果

测试的时候,我们并发了 3 个请求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。

但是会有一个问题,同时向 8001 发起了 3 次请求。试想,假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。

三次请求的结果是一致的,对于相同的 key,能不能只向 8001 发起一次请求?这个问题下一次解决。

6.多节点的访问流程图

完整代码:https://github.com/liwook/Go-projects/tree/main/go-cache/5-multi-nodes

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/215083.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python爬取酷我音乐

&#x1f388; 博主&#xff1a;一只程序猿子 &#x1f388; 博客主页&#xff1a;一只程序猿子 博客主页 &#x1f388; 个人介绍&#xff1a;爱好(bushi)编程&#xff01; &#x1f388; 创作不易&#xff1a;喜欢的话麻烦您点个&#x1f44d;和⭐&#xff01; &#x1f388;…

pytest + yaml 框架 -59.用例失败重跑机制pytest-rerunfailures

前言 有些接口可能不太稳定&#xff0c;第一次跑的时候由于网络原因或者其它原因失败&#xff0c;但是重新跑2次又成功了。 对于这种需要重新跑几次的场景&#xff0c;可以使用用例失败重跑机制&#xff0c;需安装pytest-rerunfailures 插件。 场景示例 失败重跑需要依赖 py…

【Axure原型分享】3D多柱状图_中继器版

今天和大家分享3D多柱状图_中继器版的原型模板&#xff0c;鼠标移入时&#xff0c;对应区域的背景会高亮变色&#xff0c;并且显示对应柱状体的数据。那这个原型是用Axure原生元件制作的&#xff0c;样式交互都可以自行修改&#xff0c;图表数据在中继器表格里填写&#xff0c;…

【二者区别】cuda和cudatoolkit

Pytorch 使用不同版本的 cuda 由于课题的原因&#xff0c;笔者主要通过 Pytorch 框架进行深度学习相关的学习和实验。在运行和学习网络上的 Pytorch 应用代码的过程中&#xff0c;不少项目会标注作者在运行和实验时所使用的 Pytorch 和 cuda 版本信息。由于 Pytorch 和 cuda 版…

R语言,table()函数实现统计每个元素出现的频数+并将最终统计频数结果转换成dataframe数据框形式

在 R中&#xff0c;要统计dataframe数据框中每个元素出现的频数&#xff0c;可以使用table()函数。以下是一个示例&#xff1a; 目录 一、创建数据 二、统计第一列每个元素出现的频数 三、统计第二列每个元素出现的频数 四、将频数结果转换为数据框&#xff0c;并改列名 一…

Cannot find cache named ‘‘ for Builder Redis

当引入 Redissson 时&#xff0c;springCache 缓存机制失效 原因&#xff1a;springCache 默认使用本地缓存 Redisson 使用redis 缓存 最后都转成redis了。。。 总感觉哪不对 两者居然不共存

nodejs+vue+微信小程序+python+PHP的外卖数据分析-计算机毕业设计推荐django

构建一种完全可实现、可操作的开放源代码信息收集系统&#xff0c;帮助记者完成工作任务。采编人员仅需输入所收集到的网址及题目即可迅速启动收集工作并进行信息归类。 2.根据新的数据收集要求&#xff0c;采用云计算技术实现新的收集器的迅速部署。对于资料采集点的改版&…

基于Qt的登录页面设计

题目&#xff1a; 完善对话框&#xff0c;点击登录对话框&#xff0c;如果账号和密码匹配&#xff0c;则弹出信息对话框&#xff0c;给出提示”登录成功“&#xff0c;提供一个Ok按钮&#xff0c;用户点击Ok后&#xff0c;关闭登录界面&#xff0c;跳转到其他界面 如果账号和…

音乐制作工具 Ableton Live 12中文最新 for Mac

Ableton Live 12 Mac具有直观的界面和强大的功能&#xff0c;使得音乐制作变得更加简单和高效。它支持实时录制、编辑和混音&#xff0c;用户可以在创作过程中随时进行修改和调整。此外&#xff0c;该软件还提供了各种音频效果、虚拟乐器和采样器&#xff0c;使用户可以创建出更…

Springboot入门篇

一、概述 Spring是一个开源框架&#xff0c;2003 年兴起的一个轻量级的Java 开发框架&#xff0c;作者Rod Johnson 。Spring是为了解决企业级应用开发的复杂性而创建的&#xff0c;简化开发。 1.1对比 对比一下 Spring 程序和 SpringBoot 程序。如下图 坐标 Spring 程序中的…

深入理解模板引擎:解锁 Web 开发的新境界(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

TrustZone之完成器:外围设备和内存

到目前为止,在本指南中,我们集中讨论了处理器,但TrustZone远不止是一组处理器功能。要充分利用TrustZone功能,我们还需要系统其余部分的支持。以下是一个启用了TrustZone的系统示例: 本节探讨了该系统中的关键组件以及它们在TrustZone中的作用。 完成器:外围设备…

点评项目——秒杀优化

2023.12.11 上一张的秒杀券下单还可以进行优化&#xff0c;先来回顾一下下单流程&#xff1a; 可以看出流程设计多次查询和操作数据库的操作&#xff0c;并且执行顺序是一个线程串行执行&#xff0c;执行性能是比较低的。 优化方案&#xff1a;我们将判断秒杀库存和校验一人一单…

从零开始实现神经网络(三)_RNN循环神经网络

参考文章&#xff1a;rnn循环神经网络介绍 循环神经网络 &#xff08;RNN&#xff09; 是一种专门处理序列的神经网络。它们通常用于自然语言处理 &#xff08;NLP&#xff09; 任务&#xff0c;因为它们在处理文本方面很有效。在这篇文章中&#xff0c;我们将探讨什么是 RNN&a…

图文教程:从0开始安装stable-diffusion

现在AI绘画还是挺火&#xff0c;Midjourney虽然不错&#xff0c;但是对于我来说还是挺贵的。今天我就来安一下开源的AI绘画stable-diffusion,它的缺点就是对电脑的要求比较高&#xff0c;尤其是显卡。 话不多说开搞。 访问sd的github&#xff0c;https://github.com/AUTOMATIC…

〖大前端 - 基础入门三大核心之JS篇(51)〗- 面向对象之认识上下文与上下文规则

说明&#xff1a;该文属于 大前端全栈架构白宝书专栏&#xff0c;目前阶段免费&#xff0c;如需要项目实战或者是体系化资源&#xff0c;文末名片加V&#xff01;作者&#xff1a;哈哥撩编程&#xff0c;十余年工作经验, 从事过全栈研发、产品经理等工作&#xff0c;目前在公司…

qt-C++笔记之addAction和addMenu的区别以及QAction的使用场景

qt-C笔记之addAction和addMenu的区别以及QAction的使用场景 code review! 文章目录 qt-C笔记之addAction和addMenu的区别以及QAction的使用场景1.QMenu和QMenuBar的关系与区别2.addMenu和addAction的使用场景区别3.将QAction的信号连接到槽函数4.QAction的使用场景5.将例1修改…

基于单片机智能浇花控制系统设计

**单片机设计介绍&#xff0c;基于单片机智能浇花控制系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的智能浇花控制系统可以通过水泵、传感器和单片机等硬件组件实现自动浇水&#xff0c;减轻人工浇花的工作…

基于OpenCV+CNN+IOT+微信小程序智能果实采摘指导系统——深度学习算法应用(含python、JS工程源码)+数据集+模型(三)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境TensorFlow 环境Jupyter Notebook环境Pycharm 环境微信开发者工具OneNET云平台 模块实现1. 数据预处理1&#xff09;爬取功能2&#xff09;下载功能 2. 创建模型并编译1&#xff09;定义模型结构2&#xff09;优化…