p2p、分布式,区块链笔记: 通过libp2p的Kademlia网络协议实现kv-store

Kademlia 网络协议

  • Kademlia 是一种分布式哈希表协议和算法,用于构建去中心化的对等网络,核心思想是通过分布式的网络结构来实现高效的数据查找和存储。在这个学习项目里,Kademlia 作为 libp2p 中的 NetworkBehaviour的组成。

  • 以下这些函数或方法是根据 Kademlia 网络协议设计的,它们实现了基本的网络操作,包括获取数据记录、获取数据提供者、存储数据记录和开始提供数据等功能(这里只展示了项目中用到的函数,常用函数可以看libp2p Kademlia DHT 规范,更多函数可见如下图中的源码部分)。

在这里插入图片描述

1. get_record

kademlia.get_record(key, Quorum::One);
  • 作用: 从 Kademlia 网络中获取与指定 key 相关的记录。
  • 参数:
    • key: 要获取记录的键。
    • Quorum::One: 获取记录时所需的一致性要求,这里是指只需要从一个节点获取记录即可。
  • 实现逻辑:
    • 根据 Kademlia 协议,节点首先根据 key 计算出其对应的 K-bucket 或者具体的节点 ID,然后向网络中查找负责该 key 的节点。
    • 节点通过网络查询和消息传递机制,从负责节点处获取存储的记录。
    • 返回获取到的记录或者执行相应的处理逻辑。

2. get_providers

kademlia.get_providers(key);
  • 作用: 获取能够提供与指定 key 相关数据的节点信息(即数据的提供者)。
  • 参数:
    • key: 要获取提供者信息的数据的键。
  • 实现逻辑:
    • 类似于 get_record,节点根据 key 计算出其对应的 K-bucket 或者节点 ID。
    • 节点向网络发送查询请求,询问哪些节点能够提供与 key 相关的数据。
    • 返回能够提供数据的节点列表或者执行相应的处理逻辑。

3. put_record

let record = Record {key,value,publisher: None,expires: None,
};
kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");
  • 作用: 将指定的记录存储到 Kademlia 网络中。
  • 参数:
    • record: 包含要存储的数据信息的记录对象,包括 key(键)、value(值)、publisher(发布者,可能为空)、expires(过期时间,可能为空)等字段。
    • Quorum::One: 存储记录时的一致性要求,这里是指只需要将记录存储在一个节点即可。
  • 实现逻辑:
    • 节点根据 key 计算出对应的 K-bucket 或节点 ID。
    • 节点将 record 发送给负责存储该 key 的节点,并根据指定的一致性要求存储副本。
    • 返回存储成功或失败的结果,或者执行相应的处理逻辑。

4. start_providing

kademlia.start_providing(key).expect("Failed to start providing key");
  • 作用: 在 Kademlia 网络中开始提供指定 key 的数据。
  • 参数:
    • key: 要开始提供的数据的键。
  • 实现逻辑:
    • 节点将 key 注册为它可以提供的数据标识。
    • 当其他节点查询或需要该 key 的数据时,该节点将响应并提供相应的数据。
    • 返回启动提供成功或失败的结果,或者执行相应的处理逻辑。

kv数据库主体代码及注释

use async_std::io;
use futures::{prelude::*, select};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult,Quorum, Record,
};
use libp2p::{development_transport, identity,mdns::{Mdns, MdnsConfig, MdnsEvent},swarm::SwarmEvent,NetworkBehaviour, PeerId, Swarm,
};
use std::error::Error;#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {env_logger::init();//  创建本地密钥,本地peer id和传输控制组件let local_key = identity::Keypair::generate_ed25519();let local_peer_id = PeerId::from(local_key.public());let transport = development_transport(local_key).await?;// 事件行为控制// We create a custom network behaviour that combines Kademlia and mDNS.#[derive(NetworkBehaviour)]// https://docs.rs/libp2p/latest/libp2p/swarm/trait.NetworkBehaviour.html#[behaviour(out_event = "MyBehaviourEvent")]//这个 "MyBehaviourEvent" 定义在下边的代码中// NetworkBehaviour这个trait将对所描述的结构体中的每个成员依次进行操作,例如 NetworkBehavior::poll它将首先轮询第一个结构成员,直到返回poll::Pending,然后再转到后面的成员。// 关于 #[behaviour(out_event = "MyBehaviourEvent")]中的out_event :The final out event. If we find a `#[behaviour(out_event = "Foo")]` attribute on the struct, we set `Foo` as the out event. Otherwise we use `()`.struct MyBehaviour {kademlia: Kademlia<MemoryStore>,mdns: Mdns,}#[allow(clippy::large_enum_variant)] //  #[allow()为Lint语法属性检查控制,https://doc.rust-lang.org/reference/attributes/diagnostics.html#lint-check-attributes    //关于large_enum_variant 详见https://rust-lang.github.io/rust-clippy/master/index.html#/large_enum_variantenum MyBehaviourEvent {Kademlia(KademliaEvent),Mdns(MdnsEvent),}// 实现(impl)块,用于为类型KademliaEvent实现了From trait,使其能够被转换为类型MyBehaviourEvent。impl From<KademliaEvent> for MyBehaviourEvent {fn from(event: KademliaEvent) -> Self {MyBehaviourEvent::Kademlia(event)}}// 实现(impl)块,用于为类型  MdnsEvent   实现了From trait,使其能够被转换为类型MyBehaviourEvent。impl From<MdnsEvent> for MyBehaviourEvent {fn from(event: MdnsEvent) -> Self {MyBehaviourEvent::Mdns(event)}}// Create a swarm to manage peers and events.let mut swarm = {// Create a Kademlia behaviour.let store = MemoryStore::new(local_peer_id);let kademlia = Kademlia::new(local_peer_id, store);let mdns = Mdns::new(MdnsConfig::default()).await?;let behaviour = MyBehaviour { kademlia, mdns };Swarm::new(transport, behaviour, local_peer_id)};// 从命令行读取指令并赋值给可变变量"stdin"let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();// Listen on all interfaces and whatever port the OS assigns.swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;// Kick it off.loop {select! {line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),event = swarm.select_next_some() => match event { // swarm.select_next_some() 是一个方法,用于从一个事件流中获取下一个事件,后续送到match进行匹配SwarmEvent::NewListenAddr { address, .. } => {//当发生新的监听地址事件时println!("Listening in {:?}", address);},SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {// 发生mDNS服务发现事件时for (peer_id, multiaddr) in list {swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);}}SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {// 当发出的 Kademlia 查询完成时handle_query_result(&result);}_ => {} // 通配符模式,执行一个空的代码块}}}
}// 下面是两个辅助函数,一个根据不同的查询结果类型执行不同的逻辑,另一个处理从命令行输入的命令
fn handle_query_result(result: &QueryResult) {match result {...}
}fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {let mut args = line.split(' ');match args.next() {...}
}

两个辅助函数

处理从命令行输入的命令

  • 这段 Rust 代码定义了一个函数 handle_input_line,用于处理从命令行读取的输入 line,并根据命令执行相应的操作。函数通过分割输入行来解析命令和参数,处理缺少参数的错误情况,并根据命令调用传入的 Kademlia 网络实例 (kademlia) 的相应方法。
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {// 将输入行按空格分割为多个参数let mut args = line.split(' ');// 匹配第一个参数(命令)match args.next() {Some("GET") => {// 如果命令是 "GET"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};// 调用 Kademlia 网络的 get_record 方法,传入指定的键和 Quorum::Onekademlia.get_record(key, Quorum::One);}Some("GET_PROVIDERS") => {// 如果命令是 "GET_PROVIDERS"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};// 调用 Kademlia 网络的 get_providers 方法,传入指定的键kademlia.get_providers(key);}Some("PUT") => {// 如果命令是 "PUT"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};let value = {// 尝试获取下一个参数作为值match args.next() {Some(value) => value.as_bytes().to_vec(), // 将值转换为字节向量None => {// 如果未提供值,则打印错误并从函数返回eprintln!("缺少值");return;}}};// 创建一个包含指定键、值及可选字段的 Record 对象let record = Record {key,value,publisher: None,expires: None,};// 在 Kademlia 网络中以 Quorum::One 一致性存储记录kademlia.put_record(record, Quorum::One).expect("本地存储记录失败。");}Some("PUT_PROVIDER") => {// 如果命令是 "PUT_PROVIDER"let key = {// 尝试获取下一个参数作为键match args.next() {Some(key) => Key::new(&key), // 从字符串创建 Key 对象None => {// 如果未提供键,则打印错误并从函数返回eprintln!("缺少键");return;}}};// 在 Kademlia 网络中开始提供指定的键kademlia.start_providing(key).expect("启动提供键失败");}_ => {// 如果命令不匹配预期的任何命令eprintln!("期望命令为 GET、GET_PROVIDERS、PUT 或 PUT_PROVIDER");}}

根据不同的查询结果类型执行不同的逻辑

fn handle_query_result(result: &QueryResult) {match result {QueryResult::GetProviders(Ok(ok)) => {for peer in &ok.providers {println!("Peer {:?} provides key {:?}",peer,std::str::from_utf8(ok.key.as_ref()).unwrap());}}QueryResult::GetProviders(Err(err)) => {eprintln!("Failed to get providers: {:?}", err);}QueryResult::GetRecord(Ok(ok)) => {for PeerRecord {record: Record { key, value, .. },..} in &ok.records{println!("Got record {:?} {:?}",std::str::from_utf8(key.as_ref()).unwrap(),std::str::from_utf8(&value).unwrap(),);}}QueryResult::GetRecord(Err(err)) => {eprintln!("Failed to get record: {:?}", err);}QueryResult::PutRecord(Ok(PutRecordOk { key })) => {println!("Successfully put record {:?}",std::str::from_utf8(key.as_ref()).unwrap());}QueryResult::PutRecord(Err(err)) => {eprintln!("Failed to put record: {:?}", err);}QueryResult::StartProviding(Ok(AddProviderOk { key })) => {println!("Successfully put provider record {:?}",std::str::from_utf8(key.as_ref()).unwrap());}QueryResult::StartProviding(Err(err)) => {eprintln!("Failed to put provider record: {:?}", err);}_ => {}}
}fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {let mut args = line.split(' ');match args.next() {Some("GET") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};kademlia.get_record(key, Quorum::One);}Some("GET_PROVIDERS") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};kademlia.get_providers(key);}Some("PUT") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};let value = {match args.next() {Some(value) => value.as_bytes().to_vec(),None => {eprintln!("Expected value");return;}}};let record = Record {key,value,publisher: None,expires: None,};kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");}Some("PUT_PROVIDER") => {let key = {match args.next() {Some(key) => Key::new(&key),None => {eprintln!("Expected key");return;}}};kademlia.start_providing(key).expect("Failed to start providing key");}_ => {eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");}}
}

运行示例

PS C:\Users\kingchuxing\Documents\learning-libp2p-main\rust> cargo run --example 04-kv-store
Listening in "/ip4/172.23.118.182/tcp/65055"
Listening in "/ip4/192.168.0.104/tcp/65055"
Listening in "/ip4/127.0.0.1/tcp/65055"
GET 123
Failed to get record: NotFound { key: Key(b"123"), closest_peers: [] }
PUT 123
缺少值
PUT 123 123456789
Failed to put record: QuorumFailed { key: Key(b"123"), success: [], quorum: 1 }
GET 123     
Got record "123" "123456789"
PUT_PROVIDER 234 //输入提供者
Successfully put provider record "234"
GET_PROVIDERS 234 //获取提供者
Peer PeerId("12D3KooWB7CFnrmeH5gzRxA4CYR2YTg2K3NMvNHP5dWDPFwAHY38") provides key "234"
GET 234
Failed to get record: NotFound { key: Key(b"234"), closest_peers: [] }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/39851.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android 输入系统 InputStage

整体流程如上所说&#xff0c;简要归纳如下&#xff1a; 输入法之前的处理 输入法处理 输入法之后处理 综合处理 InputStage将输入事件的处理分成若干个阶段&#xff08;Stage&#xff09;, 如果当前有输入法窗口&#xff0c;则事件处理从 NativePreIme 开始&#xff0c;否…

主流国产服务器操作系统技术分析

主流国产服务器操作系统 信创 "信创"&#xff0c;即信息技术应用创新&#xff0c;作为科技自立自强的核心词汇&#xff0c;在我国信息化建设的进程中扮演着至关重要的角色。自2016年起步&#xff0c;2020年开始蓬勃兴起&#xff0c;信创的浪潮正席卷整个信息与通信技…

GNeRF代码复现

https://github.com/quan-meng/gnerf 之前一直去复现这个代码总是文件不存在&#xff0c;我就懒得搞了&#xff08;实际上是没能力哈哈哈&#xff09; 最近突然想到这篇论文重新试试复现 一、按步骤创建虚拟环境安装各种依赖等 二、安装好之后下载数据&#xff0c;可以用Blen…

virtualbox+Ubuntu部分窗口显示错乱

如下图&#xff1a; 窗口标题显示错乱&#xff0c;跟一般乱码不一样。 解决办法&#xff1a; 在virtualbox设置中&#xff0c;显示选项卡&#xff0c;取消勾选启用3D加速 也可参考此链接&#xff1a;linux ubuntu 中vscode中央窗口显示出现异常/显示错误_开发工具-CSDN问答

打卡第一天

今天是参加算法训练营的第一天&#xff0c;希望我能把这个训练营坚持下来&#xff0c;希望我的算法编程题的能力有所提升&#xff0c;不再面试挂了&#xff0c;面试总是挂编程题&#xff0c;记录我leetcode刷题数量&#xff1a; 希望我通过这个训练营能够实现两份工作的无缝衔接…

自动化任务工具 -- zTasker v1.94 绿色版

软件简介 zTasker 是一款功能强大的自动化任务管理软件&#xff0c;以其简洁易用、一键式操作而著称。软件体积小巧&#xff0c;启动迅速&#xff0c;提供了超过100种任务类型和30多种定时/条件执行方法&#xff0c;能够满足用户在自动化方面的多样化需求。 zTasker 支持定时任…

从全连接到卷积

一、全连接到卷积 1、卷积具有两个原则&#xff1a; 平移不变性&#xff1a;无论作用在哪个部分&#xff0c;它都要有相同的作用&#xff0c;而不会随着位置的改变而改变 局部性&#xff1a;卷积核作用处&#xff0c;作用域应该是核作用点的周围一小部分而不作用于更大的部分 …

OBD诊断(ISO15031) 04服务

文章目录 功能简介ISO 9141-2、ISO 14230-4和SAE J1850的诊断服务定义1、清除/重置与排放相关的诊断信息请求消息定义2、请求与排放相关的DTC响应消息定义3、报文示例 ISO 15765-4的诊断服务定义1、请求与排放相关的DTC请求消息定义2、请求与排放相关的DTC响应消息定义3、否定响…

专题二:Spring源码编译

目录 下载源码 配置Gradle 配置环境变量 配置setting文件 配置Spring源码 配置文件调整 问题解决 完整配置 gradel.properties build.gradle settiings.gradel 在专题一&#xff1a; Spring生态初探中我们从整体模块对Spring有个整体的印象&#xff0c;现在正式从最…

AI产品经理需要哪些必备技能?如何成为AI产品经理?

1.AI产品经理是什么 回答这个问题前我们首先得理清楚什么是AI产品经理&#xff0c;它和传统的互联网产品经理有什么区别。 1.1 AI产品经理职责 主要职责一方面是规划如何将成熟的AI技术应用在各个领域不同场景中&#xff0c;提升原有场景的效率或效果等&#xff1b;另一方面…

基于蜉蝣优化的聚类算法(MATLAB)

优化问题广泛存在于人们的日常生活和工程领域&#xff0c;其解决如何寻找使目标值达到最优的可行解的问题。伴随着科技发展&#xff0c;优化问题在生产调度、神经网络训练、图像处理、能源系统等领域起到举足轻重的作用&#xff0c;有助于提高系统效率。优化问题依据不同标准可…

Docker安装PostgreSQL详细教程

本章教程,使用Docker安装PostgreSQL具体步骤。 一、拉取镜像 docker pull postgres二、启动容器 docker run -it --name postgres --restart always -e POSTGRES_PASSWORD=123456 -e

EXCEL返回未使用数组元素(未使用值)

功能简介&#xff1a; 在我们工作中&#xff0c;需要在EXCEL表列出哪些元素&#xff08;物品或订单&#xff09;已经被使用了&#xff08;或使用了多少次&#xff09;&#xff0c;哪些没有被使用。 当数量过于庞大时人工筛选或许不是好办法&#xff0c;我们可以借助公式&…

偏微分方程笔记

极小位能原理&#xff1a; C 2 C^2 C2 是一个集合符号&#xff0c;表示所有二阶连续可微函数的集合 弱导数 C 2 C^2 C2 是一个集合符号&#xff0c;表示所有二阶连续可微函数的集合。 C 0 ∞ ( I ) C^{\infty}_0(I) C0∞​(I)表示于 I I I上无穷可微&#xff0c;且在端点a&…

2024 年人工智能和数据科学的五个主要趋势

引言 2023年&#xff0c;人工智能和数据科学登上了新闻头条。生成性人工智能的兴起无疑是这一显著提升曝光度的驱动力。那么&#xff0c;在2024年&#xff0c;该领域将如何继续占据头条&#xff0c;并且这些趋势又将如何影响企业的发展呢&#xff1f; 在过去几个月&#xff0c;…

Vue3实现点击按钮实现文字变色

1.动态样式实现 1.1核心代码解释&#xff1a; class"power-station-perspective-item-text"&#xff1a; 为这个 span 元素添加了一个 CSS 类&#xff0c;以便对其样式进行定义。 click"clickItem(item.id)"&#xff1a; 这是一个 Vue 事件绑定。当用户点…

SpringBoot 整合 SpringSecurity

1. 项目目录 2. pom.xml <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.6.3</version> </dependency> <dependency><groupId>org.sprin…

基于Datax开发支持瀚高数据库的插件_插件开发_以及部署---国产瀚高数据库工作笔记006

如果想直接使用,开发好的插件,那么可以去下载笔者上传的,打包好的插件,直接放入到 datax安装目录的./datax/plugin/reader 或者writer中就可以了 https://download.csdn.net/download/lidew521/89495306 https://download.csdn.net/download/lidew521/89495301这两个一个…

Unity之创建与导出PDF

内容将会持续更新&#xff0c;有错误的地方欢迎指正&#xff0c;谢谢! Unity之创建与导出PDF TechX 坚持将创新的科技带给世界&#xff01; 拥有更好的学习体验 —— 不断努力&#xff0c;不断进步&#xff0c;不断探索 TechX —— 心探索、心进取&#xff01; 助力快速…

PyPDF2指定范围拆分PDF文件为单个页面

本文目录 前言一、指定范围拆分PDF1、过程讲解2、拆分效果图3、完整代码二、其他问题1、更改页码索引值前言 上一篇文章讲解了怎么讲一个PDF文档分割为多个单页面PDF,本文来讲解一下进阶,就是指定范围拆分PDF页面,有的时候,我们只想把PDF文档中的某几页拆分出来,而不是全…