基于C语言实现内存型数据库(kv存储)

基于C语言实现内存型数据库(kv存储)

文章目录

  • 基于C语言实现内存型数据库(kv存储)
  • 1. 项目背景
    • 1.1 Redis介绍
    • 1.2 项目预期及基本架构
  • 2. 服务端原理及代码框架
    • 2.1 网络数据回环的实现
    • 2.2 array的实现
    • 2.3 rbtree的实现
    • 2.4 btree的实现
    • 2.5 hash的实现
    • 2.6 dhash的实现
    • 2.7 skiplist的实现
    • 2.8 kv存储协议的实现
  • 3. 性能测试
  • 4. 项目总结及改进思路

  • 源代码仓库见Github:kv-store仓库
  • 参考视频:“零声教育”的“linux基础架构-Kv存储”。
  • 其他源码:协程。

1. 项目背景

1.1 Redis介绍

  本项目主要想仿照Redis的交互方式,实现一个基本的“内存型数据库”,所以首先来介绍一下Redis。随着互联网的普及,只要是上网的APP基本上都需要和相应的服务器请求数据,通常来说,这些数据被服务器保存在“磁盘”上的文件中,称之为“磁盘型数据库”。但是面对海量用户时(比如秒杀活动),磁盘IO的读写速率不够快从而导致用户体验下降,并且服务器数据库的压力也非常大。鉴于很多请求只是读取数据,这就启发我们将一些热点数据存放在内存中,以便快速响应请求、并且减轻磁盘的读写压力。

当然,上述只是一个初步的想法,后续如何清理内存数据、分布式存储等可以参考B站的科普视频,讲的非常简洁易懂:

  • 【趣话Redis第一弹】我是Redis,MySQL大哥被我坑惨了!—“缓存穿透、缓存击穿、缓存雪崩”、“定时删除、惰性删除、内存淘汰”
  • 【趣话Redis第二弹】Redis数据持久化AOF和RDB原理一次搞懂!—“RDB+AOF”
  • 【趣话Redis第三弹】Redis的高可用是怎么实现的?哨兵是什么原理?—“主观下线、客观下线”、“哨兵选举”、“故障转移”
  • 趣话Redis:Redis集群是如何工作的?—“哈希桶”、“集群工作+主从复制”

下面是一些典型的面试题:

  • 为什么要使用Redis?
  1. 从高并发上来说:直接操作缓存能够承受的请求是远远大于直接访问数据库的,所以我们可以考虑把数据库中的部分数据转移到缓存中去。这样用户的一部分请求会直接到缓存,而不用经过数据库。
  2. 从高性能上来说:用户第一次访问数据库中的某些数据,因为是从硬盘上读取的,所以这个过程会比较慢。将该用户访问的数据存在缓存中,下一次再访问这些数据的时候就可以直接从缓存中获取了。操作缓存就是直接操作内存,所以速度相当快。
  • 为什么要使用Redis而不是其他的,例如Java自带的map或者guava?

  缓存分为本地缓存和分布式缓存,像map或者guava就是本地缓存。本地缓存最主要的特点是轻量以及快速,生命周期随着jvm的销毁而结束。在多实例的情况下,每个实例都需要各自保存一份缓存,缓存不具有一致性。redis或memcached之类的称为分布式缓存,在多实例的情况下,各实例共用一份缓存数据,缓存具有一致性。

  • Redis应用场景有哪些?
  1. 缓存热点数据,缓解数据库的压力。
  2. 利用Redis原子性的自增操作,可以实现计数器的功能。比如统计用户点赞数、用户访问数等。
  3. 分布式锁。在分布式场景下,无法使用单机环境下的锁来对多个节点上的进程进行同步。可以使用Redis自带的SETNX命令实现分布式锁,除此之外,还可以使用官方提供的RedLock分布式锁实现。
  4. 简单的消息队列。可以使用Redis自身的发布/订阅模式或者List来实现简单的消息队列,实现异步操作。
  5. 限速器。可用于限制某个用户访问某个接口的频率,比如秒杀场景用于防止用户快速点击带来不必要的压力。
  6. 好友关系。利用集合的一些命令,比如交集、并集、差集等,实现共同好友、共同爱好之类的功能。
  • 为什么Redis这么快?
  1. Redis是基于内存进行数据操作的Redis使用内存存储,没有磁盘IO上的开销,数据存在内存中,读写速度快。
  2. 采用IO多路复用技术。Redis使用单线程来轮询描述符,将数据库的操作都转换成了事件,不在网络I/O上浪费过多的时间。
  3. 高效的数据结构。Redis每种数据类型底层都做了优化,目的就是为了追求更快的速度。
  • 参考视频:为什么要使用Redis?、Redis的应用场景有哪些?、Redis,好快!

1.2 项目预期及基本架构

2:OK
1:SET name humu
服务端
tcp server
kv_protocol
store-engine
array/rbtree/btree/hash/dhash/skiplist
客户端
图1 项目框架

  于是我们现在就来实现这个“内存型数据库”,本项目使用C语言,默认键值对key-value都是char*类型。如上图所示,我们希望“客户端”可以和“服务端”通讯,发送相应的指令并得到相应的信息。比如“客户端”插入一个新的键值对“(name: humu)”,那么就发送“SET name humu”;“服务端”接收到这个数据包后,执行相应的操作,再返回“OK”给“客户端”。鉴于kv存储需要强查找的数据结构,我们可以使用rbtree、btree、b+tree、hash、dhash、array(数据量不多,比如http头)、skiplist、list(性能低不考虑)。最终,下表列出了我们要实现的所有数据结构及其对应的指令:

表1 kv存储协议对应的数据结构及命令
操作/
数据结构
插入查找删除计数存在
arraySET key valueGET keyDELETE keyCOUNTEXIST key
rbtreeRSET key valueRGET keyRDELETE keyRCOUNTREXIST key
btreeBSET key valueBGET keyBDELETE keyBCOUNTBEXIST key
hashHSET key valueHGET keyHDELETE keyHCOUNTHEXIST key
dhashDHSET key valueDHGET keyDHDELETE keyDHCOUNTDHEXIST key
skiplistZSET key valueZGET keyZDELETE keyZCOUNTZEXIST key
备注返回OK/Fail,
表示插入键值对是否成功
返回对应的value返回OK/Fail,
删除对应的键值对
返回当前数据结构中
存储的键值对数量
返回True/False,
判断是否存在对应的键值对

  进一步,由于网络编程中的“Hello,World!程序”就是实现一个echo,收到什么数据就原封不动的发送回去。所以我们希望在此基础上,将kv存储写成一个独立的进程,和网络收发相关代码隔离开,进而提升代码的可维护性。另外在网络协议的选择中,由于我们的键值对设置通常较短只有十几个字符(比如set key value),而http协议的协议头就有几十个字符,有效数据占比太低;udp协议只能在底层网卡确认对方收到,但没法在应用层确认,所以不可控;于是我们网络通信协议选择tcp。于是对于“服务端”,我们就可以有如下的架构设计:

图2 “服务端”程序架构
  1. 网络层:负责收发数据。本项目中都是“字符串”。
  2. 协议层:将“网络层”传输过来的字符串进行拆解,若为无效指令直接返回相应的提示信息;若为有效指令则传递给“引擎层”进行进一步的处理,根据“引擎层”的处理结果给出相应的返回信息。
  3. 引擎层:分为6种存储引擎,每种存储引擎都可以进行具体的增、删、查等操作,也就是实现上表给出的5种命令。
  4. 存储层:注意“内存型数据库”的数据在内存中,但若后续需要“持久化”也会将数据备份到磁盘中。

2. 服务端原理及代码框架

2.1 网络数据回环的实现

  在使用原生的socket库函数进行网络通信时,会一直阻塞等待客户端的连接/通信请求,这个线程就做不了其他的事情,非常浪费资源。于是“reactor模式”应运而生,也被称为“基于事件驱动”,核心点在于:注册事件、监听事件、处理事件。也就是说,线程找了一个“秘书”专门负责去监听网络端口是否有“网络通信”的发生,线程就可以去做其他的事情;等到线程想处理“网络通信”的时候一起全部通知给该线程,然后这个“秘书”继续监听。显然,有这样一个“秘书”存在,可以将“网络通信”、“业务处理”分隔开,一个线程可以同时处理多个客户端的请求/通信,也就实现了“IO多路复用一个线程”。下面是常见的三种reactor模式:

  1. reactor单线程模型:只分配一个线程。显然若线程的“业务处理”时间过长,会导致“秘书”积压的事件过多,甚至可能会丢弃一些事件。本模型不适合计算密集型场景,只适合业务处理非常快的场景(本项目就是业务处理非常快)。
  2. reactor多线程模型:分配一个主线程和若干子线程。主线程只负责处理“网络通信”,“业务处理”则交给子线程处理。本模式的好处是可以充分利用多核CPU性能,但是带来了线程安全的问题。并且只有一个线程响应“网络通信”,在瞬时高并发的场景下容易成为性能瓶颈。
  3. 主从reactor多线程模型:在上述多线程模型的基础上,再额外开辟出新的子线程专门负责“与客户端通信”,主线程则只负责“连接请求”。

参考B站视频:【Java面试】介绍一下Reactor模式

下面使用epoll作为“秘书”,采用“reactor单线程模型”,完成网路数据回环(echo),也就是“服务端”程序框架的“网络层”:

main.c–共356行

/** zv开头的变量是zvnet异步网络库(epoll)。* kv开头的变量是kv存储协议解析。
*/
#include<stdio.h>
#include<stdlib.h>
#include<string.h>#include<errno.h>
#include<unistd.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<fcntl.h>
#include<sys/epoll.h>#include"kvstore.h"/*-------------------------------------------------------*/
/*-----------------------异步网路库-----------------------*/
/*-------------------------------------------------------*/
/*-----------------------函数声明-------------------------*/
#define max_buffer_len      1024    // 读写buffer长度
#define epoll_events_size   1024    // epoll就绪集合大小
#define connblock_size      1024    // 单个连接块存储的连接数量
#define listen_port_count   1       // 监听端口总数// 有如下参数列表和返回之类型的函数都称为 CALLBACK
// 回调函数,方便在特定epoll事件发生时执行相应的操作
typedef int (*ZV_CALLBACK)(int fd, int events, void *arg);
// 回调函数:建立连接
int accept_cb(int fd, int event, void* arg);
// 回调函数:接收数据
int recv_cb(int clientfd, int event, void* arg);
// 回调函数:发送数据
int send_cb(int clientfd, int event, void* arg);// 单个连接
typedef struct zv_connect_s{// 本连接的客户端fdint fd;// 本连接的读写bufferchar rbuffer[max_buffer_len];size_t rcount;  // 读buffer的实际大小char wbuffer[max_buffer_len];size_t wcount;  // 写buffer的实际大小size_t next_len;  // 下一次读数据长度(读取多个包会用到)// 本连接的回调函数--accept()/recv()/send()ZV_CALLBACK cb;
}zv_connect;// 连接块的头
typedef struct zv_connblock_s{struct zv_connect_s *block;  // 指向的当前块,注意大小为 connblock_sizestruct zv_connblock_s *next;  // 指向的下一个连接块的头
}zv_connblock;// 反应堆结构体
typedef struct zv_reactor_s{int epfd;   // epoll文件描述符// struct epoll_event events[epoll_events_size];  // 就绪事件集合struct zv_connblock_s *blockheader;  // 连接块的第一个头int blkcnt;  // 现有的连接块的总数
}zv_reactor;// reactor初始化
int init_reactor(zv_reactor *reactor);
// reator销毁
void destory_reactor(zv_reactor* reactor);
// 服务端初始化:将端口设置为listen状态
int init_sever(int port);
// 将本地的listenfd添加进epoll
int set_listener(zv_reactor *reactor, int listenfd, ZV_CALLBACK cb);
// 创建一个新的连接块(尾插法)
int zv_create_connblock(zv_reactor* reactor);
// 根据fd从连接块中找到连接所在的位置
// 逻辑:整除找到所在的连接块、取余找到在连接块的位置
zv_connect* zv_connect_idx(zv_reactor* reactor, int fd);
// 运行kv存储协议
int kv_run_while(int argc, char *argv[]);
/*-------------------------------------------------------*//*-----------------------函数定义-------------------------*/
// reactor初始化
int init_reactor(zv_reactor *reactor){if(reactor == NULL) return -1;// 初始化参数memset(reactor, 0, sizeof(zv_reactor));reactor->epfd = epoll_create(1);if(reactor->epfd <= 0){printf("init reactor->epfd error: %s\n", strerror(errno));return -1;}// 为链表集合分配内存reactor->blockheader = (zv_connblock*)calloc(1, sizeof(zv_connblock));if(reactor->blockheader == NULL) return -1;reactor->blockheader->next = NULL;// 为链表集合中的第一个块分配内存reactor->blockheader->block = (zv_connect*)calloc(connblock_size, sizeof(zv_connect));if(reactor->blockheader->block == NULL) return -1;reactor->blkcnt = 1;return 0;
}// reator销毁
void destory_reactor(zv_reactor* reactor){if(reactor){close(reactor->epfd);  // 关闭epollzv_connblock* curblk = reactor->blockheader;zv_connblock* nextblk = reactor->blockheader;do{curblk = nextblk;nextblk = curblk->next;if(curblk->block) free(curblk->block);if(curblk) free(curblk);}while(nextblk != NULL);}
}// 服务端初始化:将端口设置为listen状态
int init_sever(int port){// 创建服务端int sockfd = socket(AF_INET, SOCK_STREAM, 0);  // io// fcntl(sockfd, F_SETFL, O_NONBLOCK);  // 非阻塞// 设置网络地址和端口struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(struct sockaddr_in));servaddr.sin_family = AF_INET;  // IPv4servaddr.sin_addr.s_addr = htonl(INADDR_ANY);  // 0.0.0.0,任何地址都可以连接本服务器servaddr.sin_port = htons(port);  // 端口// 将套接字绑定到一个具体的本地地址和端口if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))){printf("bind failed: %s", strerror(errno));return -1;}// 将端口设置为listen(并不会阻塞程序执行)listen(sockfd, 10);  // 等待连接队列的最大长度为10printf("listen port: %d, sockfd: %d\n", port, sockfd);return sockfd;
}// 将本地的listenfd添加进epoll
int set_listener(zv_reactor *reactor, int listenfd, ZV_CALLBACK cb){if(!reactor || !reactor->blockheader) return -1;// 将服务端放进连接块reactor->blockheader->block[listenfd].fd = listenfd;reactor->blockheader->block[listenfd].cb = cb;  // listenfd的回调函数应该是accept()// 将服务端添加进epoll事件struct epoll_event ev;ev.data.fd = listenfd;ev.events = EPOLLIN;epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, listenfd, &ev);return 0;
}// 创建一个新的连接块(尾插法)
int zv_create_connblock(zv_reactor* reactor){if(!reactor) return -1;// 初始化新的连接块zv_connblock* newblk = (zv_connblock*)calloc(1, sizeof(zv_connblock));if(newblk == NULL) return -1;newblk->block = (zv_connect*)calloc(connblock_size, sizeof(zv_connect));if(newblk->block == NULL) return -1;newblk->next = NULL;// 找到最后一个连接块zv_connblock* endblk = reactor->blockheader;while(endblk->next != NULL){endblk = endblk->next;}// 添加上新的连接块endblk->next = newblk;reactor->blkcnt++;return 0;
}// 根据fd从连接块中找到连接所在的位置
// 逻辑:整除找到所在的连接块、取余找到在连接块的位置
zv_connect* zv_connect_idx(zv_reactor* reactor, int fd){if(!reactor) return NULL;// 计算fd应该在的连接块int blkidx = fd / connblock_size;while(blkidx >= reactor->blkcnt){zv_create_connblock(reactor);// printf("create a new connblk!\n");}// 找到这个连接块zv_connblock* blk = reactor->blockheader;int i = 0;while(++i < blkidx){blk = blk->next;}return &blk->block[fd % connblock_size];
}// 回调函数:建立连接
// fd:服务端监听端口listenfd
// event:没用到,但是回调函数的常用格式
// arg:应该是reactor*
int accept_cb(int fd, int event, void* arg){// 与客户端建立连接struct sockaddr_in clientaddr;  // 连接到本服务器的客户端信息socklen_t len_sockaddr = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len_sockaddr);if(clientfd < 0){printf("accept() error: %s\n", strerror(errno));return -1;}// 将连接添加进连接块zv_reactor* reactor = (zv_reactor*)arg;zv_connect* conn = zv_connect_idx(reactor, clientfd);conn->fd = clientfd;conn->cb = recv_cb;conn->next_len = max_buffer_len;conn->rcount = 0;conn->wcount = 0;// 将客户端添加进epoll事件struct epoll_event ev;ev.data.fd = clientfd;ev.events = EPOLLIN;  // 默认水平触发(有数据就触发)epoll_ctl(reactor->epfd, EPOLL_CTL_ADD, clientfd, &ev);printf("connect success! sockfd:%d, clientfd:%d\n", fd, clientfd);
}// 回调函数:接收数据
int recv_cb(int clientfd, int event, void* arg){zv_reactor* reactor = (zv_reactor*)arg;zv_connect* conn = zv_connect_idx(reactor, clientfd);int recv_len = recv(clientfd, conn->rbuffer+conn->rcount, conn->next_len, 0);  // 由于当前fd可读所以没有阻塞if(recv_len < 0){printf("recv() error: %s\n", strerror(errno));close(clientfd);// return -1;exit(0);}else if(recv_len == 0){// 重置对应的连接块conn->fd = -1;conn->rcount = 0;conn->wcount = 0;// 从epoll监听事件中移除epoll_ctl(reactor->epfd, EPOLL_CTL_DEL, clientfd, NULL);// 关闭连接close(clientfd);printf("close clientfd:%d\n", clientfd);}else if(recv_len > 0){conn->rcount += recv_len;// conn->next_len = *(short*)conn->rbuffer;  // 从tcp协议头中获取数据长度,假设前两位是长度// 处理接收到的字符串,并将需要发回的信息存储在缓冲区中printf("recv clientfd:%d, len:%d, mess: %s\n", clientfd, recv_len, conn->rbuffer);// conn->rcount = kv_protocal(conn->rbuffer, max_buffer_len);// 将kv存储的回复消息(rbuffer)拷贝给wbuffer// printf("msg:%s len:%ld\n", msg, strlen(msg));memset(conn->wbuffer, '\0', max_buffer_len);memcpy(conn->wbuffer, conn->rbuffer, conn->rcount);conn->wcount = conn->rcount;memset(conn->rbuffer, 0, max_buffer_len);conn->rcount = 0;// 将本连接更改为epoll写事件conn->cb = send_cb;struct epoll_event ev;ev.data.fd = clientfd;ev.events = EPOLLOUT;epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, clientfd, &ev);}return 0;
}// 回调函数:发送数据
int send_cb(int clientfd, int event, void* arg){zv_reactor* reactor = (zv_reactor*)arg;zv_connect* conn = zv_connect_idx(reactor, clientfd);int send_len = send(clientfd, conn->wbuffer, conn->wcount, 0);if(send_len < 0){printf("send() error: %s\n", strerror(errno));close(clientfd);return -1;}memset(conn->wbuffer, 0, conn->next_len);conn->wcount -= send_len;// 发送完成后将本连接再次更改为读事件conn->cb = recv_cb;struct epoll_event ev;ev.data.fd = clientfd;ev.events = EPOLLIN;epoll_ctl(reactor->epfd, EPOLL_CTL_MOD, clientfd, &ev);return 0;
}// 运行kv存储协议
int kv_run_while(int argc, char *argv[]){// 创建管理连接的反应堆// zv_reactor reactor;zv_reactor *reactor = (zv_reactor*)malloc(sizeof(zv_reactor));init_reactor(reactor);// 服务端初始化int start_port = atoi(argv[1]);for(int i=0; i<listen_port_count; i++){int sockfd = init_sever(start_port+i);set_listener(reactor, sockfd, accept_cb);  // 将sockfd添加进epoll}printf("init finish, listening connet...\n");// 开始监听事件struct epoll_event events[epoll_events_size] = {0};  // 就绪事件集合while(1){// 等待事件发生int nready = epoll_wait(reactor->epfd, events, epoll_events_size, -1);  // -1等待/0不等待/正整数则为等待时长if(nready == -1){printf("epoll_wait error: %s\n", strerror(errno));break;}else if(nready == 0){continue;}else if(nready > 0){// printf("process %d epoll events...\n", nready);// 处理所有的就绪事件int i = 0;for(i=0; i<nready; i++){int connfd = events[i].data.fd;zv_connect* conn = zv_connect_idx(reactor, connfd);// 回调函数和下面的的逻辑实现了数据回环if(EPOLLIN & events[i].events){conn->cb(connfd, events[i].events, reactor);}if(EPOLLOUT & events[i].events){conn->cb(connfd, events[i].events, reactor);} }}}destory_reactor(reactor);return 0;
}int main(int argc, char *argv[]){if(argc != 2){printf("please enter port! e.x. 9999.\n");return -1;}// 初始化存储引擎// kv_engine_init();// 运行kv存储kv_run_while(argc, argv);// 销毁存储引擎// kv_engine_desy();return 0;
}
/*-------------------------------------------------------*/

注:只有251行、346行、352行是后续和kv存储有关的接口函数。可以发现“网络层”和“协议层”被很好的隔离开来。

图3 验证网络数据回环

可以看到上述实现了网络数据回环的功能。并且进一步来说,假如客户端使用浏览器(http协议)对该端口进行访问,那么对接收到的数据包进行http协议拆包,根据其请求的内容返回相应的信息(如html文件),那么就是我们所熟知的“web服务器”了。为什么是“烂大街”啊,一代比一代卷是吧😭。

2.2 array的实现

  现在使用“数组”来存储“键值对节点”。首先我们可以想到的是,直接定义一个长度足够大的数组(如1024),然后每次“插入”就直接找第一个空节点,“查找”和“删除”都是遍历所有节点。但是数组长度固定是一件很危险的事情,所以我们可以借鉴“内存池”的思想,来自动进行扩容和缩容。

图4 array结构自动扩容/缩容的示意图

如上图所示,首先申请一个固定大小的“数组”存储元素,当在某次“插入”元素发现没有空节点时,可以直接再申请一块“数组”,并将当前数组指向这个新数组;同理,当我们“删除”一个元素时,若发现删除后当前数组块为空,可以直接free掉这块内存,然后将其前后的内存块连起来。注意到,为了能帮助我们快速判断某个数组块是否为空,还需要在其结构体中定义当前数组块的有效元素数量count

array结构的增/删/查操作还是比较简单的,可以直接参考项目源码中的“array.h”、“array.c”。

2.3 rbtree的实现

图5 红黑树示意图

  红黑树是一种“自平衡的二叉搜索树”,可谓是耳熟能详,其增删查改的操作都已经非常成熟。通俗来说,红黑树本质上是一个二叉树,而一个满二叉树的查找性能近似于 O ( log ⁡ N ) O(\log N) O(logN),于是我们便希望每次插入/删除元素时,红黑树都能把自己调整成近乎于满二叉树的状态。具体来说,就是在普通二叉树的基础上,只需要为每个节点额外添加一个“颜色信息”,再额外规定一些必须满足的性质,就可以保证红黑树始终平衡。下面是红黑树的性质–来源中文维基百科“红黑树”:

  1. 每个节点是红的或者黑的。
  2. 根结点是黑的。
  3. 所有叶子节点都是黑的(叶子节点是nil节点)。
  4. 红色节点的子节点必须是黑色。(或者说红色节点不相邻)
  5. 从任一节点到其每个叶子节点的所有简单路径,都包含相同数目的黑色节点。

“nil节点”:是一个黑色的空节点,作为红黑树的叶子节点。
性质口诀:左根右,根叶黑,不红红,黑路同
注:零声教育提供了能显示红黑树生成步骤的本地HTML文件–“红黑树生成步骤.html(39KB)”。

  红黑树的查找操作只需要从根节点不断比较即可,而红黑树的插入/删除操作则有点说法。下面是我按照编程逻辑进行简化的插入/删除原理。具体来说,就是每次插入/删除后都需要进行调整,调整的逻辑如下:

红黑树插入:刚插入的位置一定是叶子节点,颜色为红,然后按照如下方式调整。

// 父节点是黑色:无需操作。
while(父节点是红色){if(叔节点为红色){变色:祖父变红/父变黑/叔变黑、祖父节点成新的当前节点。进入下一次循环。}else(叔节点是黑色,注意叶子节点也是黑色){判断“祖父节点->父节点->当前节点”的左右走向:LL:祖父变红/父变黑、祖父右旋。最后的当前节点应该是原来的当前节点。LR:祖父变红/父变红/当前变黑、父左旋/祖父右旋。最后的当前节点应该是原来的祖父节点/父节点。RL:祖父变红/父变红/当前变黑、父右旋/祖父左旋。最后的当前节点应该是原来的祖父节点/父节点。RR:祖父变红/父变黑、祖父左旋。最后的当前节点应该是原来的当前节点。然后进入下一次循环。}
}

红黑树删除:要删除的位置一定是 没有/只有一个 孩子。也就是说,如果要删除的元素有两个孩子,那就和其后继节点交换键值对,然后实际删除这个后继节点。实际删除的节点del_r为黑色,则将“孩子节点”(没有孩子就是左侧的叶子节点)作为“当前节点”按照如下方式调整。

while(当前节点是黑色 && 不是根节点){if(兄弟节点为红色){if(当前节点是左孩子)父变红/兄弟变黑、父左旋,当前节点下一循环。else(当前节点是右孩子)父变红/兄弟变黑、父右旋,当前节点下一循环。}else(兄弟节点为黑色){if(兄弟节点没有红色子节点){if(父为黑)父变黑/兄弟变红,父节点成新的当前节点进入下一循环。else(父为红)父变黑/兄弟变红,结束循环(当前节点指向根节点)。}else(兄弟节点有红色子节点){判断“父节点->兄弟节点->兄弟的红色子节点(两个子节点都是红色则默认是左)”的走向:LL:红子变黑/兄弟变父色、父右旋、父变黑,结束循环。LR:红子变父色、兄弟左旋/父右旋、父变黑,结束循环。RR:红子变黑/兄弟变父色、父左旋、父变黑,结束循环。RL:红子变父色、兄弟右旋/父左旋、父变黑,结束循环。“结束循环”等价于当前节点指向根节点。}}
}
注意出了循环别忘了将根节点调整成黑色。

根据上述原理,我使用C语言实现了红黑树完整的增删查操作,并增加了检验有效性打印红黑树图的代码,以及测试代码。值得注意的是,为了加快开发速度,下面的代码预设“键值对”的类型为int keyvoid* value,也就是只关心int型的key、不关心value,后续将红黑树添加进“kv存储协议”中时会进一步修改:

rbtree_int.c-共905行

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<stdbool.h>// 编译指令:gcc -o main rbtree_int.c
// 本代码实现红黑树,存储int型key,未指定value。#define RBTREE_DEBUG 1  // 是否运行测试代码typedef int KEY_TYPE;  // 节点的key类型
#define RED   1
#define BLACK 0// 定义红黑树单独节点
typedef struct _rbtree_node {KEY_TYPE key;      // 键void *value;  // 值,可以指向任何类型struct _rbtree_node *left;struct _rbtree_node *right;struct _rbtree_node *parent;unsigned char color;  // 不同编译器的无符号性质符号不同,这里加上unsigned减少意外。/* 对于32位系统,上述只有color是1个字节,其余都是4个字节,所以color放在最后可以节省内存。 */
} rbtree_node;// 定义整个红黑树
typedef struct _rbtree{struct _rbtree_node *root_node; // 根节点struct _rbtree_node *nil_node; // 空节点,也就是叶子节点、根节点的父节点
} rbtree;// 存储打印红黑树所需的参数
typedef struct _disp_parameters{// 打印缓冲区char **disp_buffer;// 打印缓冲区的深度,宽度,当前打印的列数int disp_depth;int disp_width;int disp_column;// 树的深度int max_depth;// 最大的数字位宽int max_num_width;// 单个节点的显示宽度int node_width;
}disp_parameters;/*----初始化及释放内存----*/
// 红黑树初始化,注意调用完后释放内存rbtree_free
rbtree *rbtree_malloc(void);
// 红黑树释放内存
void rbtree_free(rbtree *T);/*----插入操作----*/
// 红黑树插入
void rbtree_insert(rbtree *T, KEY_TYPE key, void *value);
// 调整插入新节点后的红黑树,使得红色节点不相邻(平衡性)
void rbtree_insert_fixup(rbtree *T, rbtree_node *cur);/*----删除操作----*/
// 红黑树删除
void rbtree_delete(rbtree *T, rbtree_node *del);
// 调整删除某节点后的红黑树,使得红色节点不相邻(平衡性)
void rbtree_delete_fixup(rbtree *T, rbtree_node *cur);/*----查找操作----*/
// 红黑树查找
rbtree_node* rbtree_search(rbtree *T, KEY_TYPE key);/*----打印信息----*/
// 中序遍历整个红黑树,依次打印节点信息
void rbtree_traversal(rbtree *T);
// 以图的形式展示红黑树
void rbtree_display(rbtree *T);
// 先序遍历,打印红黑树信息到字符数组指针
void set_display_buffer(rbtree *T, rbtree_node *cur, disp_parameters *p);/*----检查有效性----*/
// 检查当前红黑树的有效性:根节点黑色、红色不相邻、所有路径黑高相同
bool rbtree_check_effective(rbtree *T);/*----其他函数----*/
// 在给定节点作为根节点的子树中,找出key最小的节点
rbtree_node* rbtree_min(rbtree *T, rbtree_node *cur);
// 在给定节点作为根节点的子树中,找出key最大的节点
rbtree_node* rbtree_max(rbtree *T, rbtree_node *cur);
// 找出当前节点的前驱节点
rbtree_node* rbtree_precursor_node(rbtree *T, rbtree_node *cur);
// 找出当前节点的后继节点
rbtree_node* rbtree_successor_node(rbtree *T, rbtree_node *cur);
// 红黑树节点左旋,无需修改颜色
void rbtree_left_rotate(rbtree *T, rbtree_node *x);
// 红黑树节点右旋,无需修改颜色
void rbtree_right_rotate(rbtree *T, rbtree_node *y);
// 计算红黑树的深度
int rbtree_depth(rbtree *T);
// 递归计算红黑树的深度(不包括叶子节点)
int rbtree_depth_recursion(rbtree *T, rbtree_node *cur);/*-----------------------------下面为函数定义-------------------------------*/
// 红黑树初始化,注意调用完后释放内存rbtree_free()
rbtree *rbtree_malloc(void){rbtree *T = (rbtree*)malloc(sizeof(rbtree));if(T == NULL){printf("rbtree malloc failed!");}else{T->nil_node = (rbtree_node*)malloc(sizeof(rbtree_node));T->nil_node->color = BLACK;T->nil_node->left = T->nil_node;T->nil_node->right = T->nil_node;T->nil_node->parent = T->nil_node;T->root_node = T->nil_node;}return T;
}// 红黑树释放内存
void rbtree_free(rbtree *T){free(T->nil_node);free(T);
}// 在给定节点作为根节点的子树中,找出key最小的节点
rbtree_node* rbtree_min(rbtree *T, rbtree_node *cur){  while(cur->left != T->nil_node){cur = cur->left;}return cur;
}// 在给定节点作为根节点的子树中,找出key最大的节点
rbtree_node* rbtree_max(rbtree *T, rbtree_node *cur){  while(cur->right != T->nil_node){cur = cur->right;}return cur;
}// 找出当前节点的前驱节点
rbtree_node* rbtree_precursor_node(rbtree *T, rbtree_node *cur){// 若当前节点有左孩子,那就直接向下找if(cur->left != T->nil_node){return rbtree_max(T, cur->left);}// 若当前节点没有左孩子,那就向上找rbtree_node *parent = cur->parent;while((parent != T->nil_node) && (cur == parent->left)){cur = parent;parent = cur->parent;}return parent;// 若返回值为空节点,则说明当前节点就是第一个节点
}// 找出当前节点的后继节点
rbtree_node* rbtree_successor_node(rbtree *T, rbtree_node *cur){// 若当前节点有右孩子,那就直接向下找if(cur->right != T->nil_node){return rbtree_min(T, cur->right);}// 若当前节点没有右孩子,那就向上找rbtree_node *parent = cur->parent;while((parent != T->nil_node) && (cur == parent->right)){cur = parent;parent = cur->parent;}return parent;// 若返回值为空节点,则说明当前节点就是最后一个节点
}// 红黑树节点左旋,无需修改颜色
void rbtree_left_rotate(rbtree *T, rbtree_node *x){// 传入rbtree*是为了判断节点node的左右子树是否为叶子节点、父节点是否为根节点。rbtree_node *y = x->right;// 注意红黑树中所有路径都是双向的,两边的指针都要改!// 另外,按照如下的修改顺序,无需存储额外的节点。x->right = y->left;if(y->left != T->nil_node){y->left->parent = x;}y->parent = x->parent;if(x->parent == T->nil_node){  // x为根节点T->root_node = y;}else if(x->parent->left == x){x->parent->left = y;}else{x->parent->right = y;}y->left = x;x->parent = y;
}// 红黑树节点右旋,无需修改颜色
void rbtree_right_rotate(rbtree *T, rbtree_node *y){rbtree_node *x = y->left;y->left = x->right;if(x->right != T->nil_node){x->right->parent = y;}x->parent = y->parent;if(y->parent == T->nil_node){T->root_node = x;}else if(y->parent->left == y){y->parent->left = x;}else{y->parent->right = x;}x->right = y;y->parent = x;
}// 调整插入新节点后的红黑树,使得红色节点不相邻(平衡性)
void rbtree_insert_fixup(rbtree *T, rbtree_node *cur){// 父节点是黑色,无需调整。// 父节点是红色,则有如下八种情况。while(cur->parent->color == RED){// 获取叔节点rbtree_node *uncle;if(cur->parent->parent->left == cur->parent){uncle = cur->parent->parent->right;}else{uncle = cur->parent->parent->left;}// 若叔节点为红,只需更新颜色(隐含了四种情况)// 循环主要在这里起作用if(uncle->color == RED){// 叔节点为红色:祖父变红/父变黑/叔变黑、祖父节点成新的当前节点。if(uncle->color == RED){cur->parent->parent->color = RED;cur->parent->color = BLACK;uncle->color = BLACK;cur = cur->parent->parent;}}// 若叔节点为黑,需要变色+旋转(当前节点相当于祖父节点位置包括四种情况:LL/RR/LR/RL)// 下面对四种情况进行判断:都是只执行一次else{if(cur->parent->parent->left == cur->parent){// LL:祖父变红/父变黑、祖父右旋。最后的当前节点应该是原来的当前节点。if(cur->parent->left == cur){cur->parent->parent->color = RED;cur->parent->color = BLACK;rbtree_right_rotate(T, cur->parent->parent);}// LR:祖父变红/父变红/当前变黑、父左旋、祖父右旋。最后的当前节点应该是原来的祖父节点/父节点。else{cur->parent->parent->color = RED;cur->parent->color = RED;cur->color = BLACK;cur = cur->parent;rbtree_left_rotate(T, cur);rbtree_right_rotate(T, cur->parent->parent);}}else{// RL:祖父变红/父变红/当前变黑、父右旋、祖父左旋。最后的当前节点应该是原来的祖父节点/父节点。if(cur->parent->left == cur){cur->parent->parent->color = RED;cur->parent->color = RED;cur->color = BLACK;cur = cur->parent;rbtree_right_rotate(T, cur);rbtree_left_rotate(T, cur->parent->parent);}// RR:祖父变红/父变黑、祖父左旋。最后的当前节点应该是原来的当前节点。else{cur->parent->parent->color = RED;cur->parent->color = BLACK;rbtree_left_rotate(T, cur->parent->parent);}}}}// 将根节点变为黑色T->root_node->color = BLACK;
}// 插入
// void rbtree_insert(rbtree *T, rbtree_node *new){
void rbtree_insert(rbtree *T, KEY_TYPE key, void *value){// 创建新节点rbtree_node *new = (rbtree_node*)malloc(sizeof(rbtree_node));new->key = key;new->value = value;// 寻找插入位置(红黑树中序遍历升序)rbtree_node *cur = T->root_node;rbtree_node *next = T->root_node;// 刚插入的位置一定是叶子节点while(next != T->nil_node){cur = next;if(new->key > cur->key){next = cur->right;}else if(new->key < cur->key){next = cur->left;}else if(new->key == cur->key){// 红黑树本身没有明确如何处理key相同节点,所以取决于业务。// 场景1:统计不同课程的人数,相同就+1。// 场景2:时间戳,若相同则稍微加一点// 其他场景:覆盖、丢弃...printf("Already have the same key=%d!\n", new->key);free(new);return;}}if(cur == T->nil_node){// 若红黑树本身没有节点T->root_node = new;}else if(new->key > cur->key){cur->right = new;}else{cur->left = new;}new->parent = cur;new->left = T->nil_node;new->right = T->nil_node;new->color = RED;// 调整红黑树,使得红色节点不相邻rbtree_insert_fixup(T, new);
}// 调整删除某节点后的红黑树,使得红色节点不相邻(平衡性)
void rbtree_delete_fixup(rbtree *T, rbtree_node *cur){// child是黑色、child不是根节点才会进入循环while((cur->color == BLACK) && (cur != T->root_node)){// 获取兄弟节点rbtree_node *brother = T->nil_node;if(cur->parent->left == cur){brother = cur->parent->right;}else{brother = cur->parent->left;}// 兄弟节点为红色:父变红/兄弟变黑、父单旋、当前节点下一循环if(brother->color == RED){cur->parent->color = RED;brother->color = BLACK;if(cur->parent->left == cur){rbtree_left_rotate(T, cur->parent);}else{rbtree_right_rotate(T, cur->parent);}}// 兄弟节点为黑色else{ // 兄弟节点没有红色子节点:父变黑/兄弟变红、看情况是否结束循环if((brother->left->color == BLACK) && (brother->right->color == BLACK)){// 若父原先为黑,父节点成新的当前节点进入下一循环;否则结束循环。if(brother->parent->color == BLACK){cur = cur->parent;}else{cur = T->root_node;}brother->parent->color = BLACK;brother->color = RED;}// 兄弟节点有红色子节点:LL/LR/RR/RLelse if(brother->parent->left == brother){// LL:红子变黑/兄弟变父色/父变黑、父右旋,结束循环if(brother->left->color == RED){brother->left->color = BLACK;brother->color = brother->parent->color;brother->parent->color = BLACK;rbtree_right_rotate(T, brother->parent);cur = T->root_node;}// LR:红子变父色/父变黑、兄弟左旋/父右旋,结束循环else{brother->right->color = brother->parent->color;cur->parent->color = BLACK;rbtree_left_rotate(T, brother);rbtree_right_rotate(T, cur->parent);cur = T->root_node;}}else{// RR:红子变黑/兄弟变父色/父变黑、父左旋,结束循环if(brother->right->color == RED){brother->right->color = BLACK;brother->color = brother->parent->color;brother->parent->color = BLACK;rbtree_left_rotate(T, brother->parent);cur = T->root_node;}// RL:红子变父色/父变黑、兄弟右旋/父左旋,结束循环else{brother->left->color = brother->parent->color;brother->parent->color = BLACK;rbtree_right_rotate(T, brother);rbtree_left_rotate(T, cur->parent);cur = T->root_node;}}}}// 下面这行处理情况2/3cur->color = BLACK;
}// 红黑树删除
void rbtree_delete(rbtree *T, rbtree_node *del){if(del != T->nil_node){/* 红黑树删除逻辑:1. 标准的BST删除操作(本函数):最红都会转换成删除只有一个子节点或者没有子节点的节点。2. 若删除节点为黑色,则进行调整(rebtre_delete_fixup)。*/rbtree_node *del_r = T->nil_node;        // 实际删除的节点rbtree_node *del_r_child = T->nil_node;  // 实际删除节点的子节点// 找出实际删除的节点// 注:实际删除的节点最多只有一个子节点,或者没有子节点(必然在最后两层中,不包括叶子节点那一层)if((del->left == T->nil_node) || (del->right == T->nil_node)){// 如果要删除的节点本身就只有一个孩子或者没有孩子,那实际删除的节点就是该节点del_r = del;}else{// 如果要删除的节点有两个孩子,那就使用其后继节点(必然最多只有一个孩子)del_r = rbtree_successor_node(T, del);}// 看看删除节点的孩子是谁,没有孩子就是空节点if(del_r->left != T->nil_node){del_r_child = del_r->left;}else{del_r_child = del_r->right;}// 将实际要删除的节点删除del_r_child->parent = del_r->parent;  // 若child为空节点,最后再把父节点指向空节点if(del_r->parent == T->nil_node){T->root_node = del_r_child;}else if(del_r->parent->left == del_r){del_r->parent->left = del_r_child;}else{del_r->parent->right = del_r_child;}// 替换替换键值对if(del != del_r){del->key = del_r->key;del->value = del_r->value;}// 最后看是否需要调整if(del_r->color == BLACK){rbtree_delete_fixup(T, del_r_child);}// 调整空节点的父节点if(del_r_child == T->nil_node){del_r_child->parent = T->nil_node;}free(del_r);}
}// 查找
rbtree_node* rbtree_search(rbtree *T, KEY_TYPE key){rbtree_node *cur = T->root_node;while(cur != T->nil_node){if(cur->key > key){cur = cur->left;}else if(cur->key < key){cur = cur->right;}else{return cur;}}printf("There is NO key=%d in rbtree!\n", key);return T->nil_node;
}// 中序遍历给定结点为根节点的子树(递归)
void rbtree_traversal_node(rbtree *T, rbtree_node *cur){if(cur != T->nil_node){rbtree_traversal_node(T, cur->left);if(cur->color == RED){printf("Key:%d\tColor:Red\n", cur->key);}else{printf("Key:%d\tColor:Black\n", cur->key);}rbtree_traversal_node(T, cur->right);}
}// 中序遍历整个红黑树
void rbtree_traversal(rbtree *T){rbtree_traversal_node(T, T->root_node);
}// 递归计算红黑树的深度(不包括叶子节点)
int rbtree_depth_recursion(rbtree *T, rbtree_node *cur){if(cur == T->nil_node){return 0;}else{int left = rbtree_depth_recursion(T, cur->left);int right = rbtree_depth_recursion(T, cur->right);return ((left > right) ? left : right) + 1;}
}// 计算红黑树的深度
int rbtree_depth(rbtree *T){return rbtree_depth_recursion(T, T->root_node);
}// 获取输入数字的十进制显示宽度
int decimal_width(int num_in){int width = 0;while (num_in != 0){num_in = num_in / 10;width++;}return width;
}// 先序遍历,打印红黑树信息到字符数组指针
void set_display_buffer(rbtree *T, rbtree_node *cur, disp_parameters *p){if(cur != T->nil_node){// 输出当前节点p->disp_depth++;// 输出数字到缓冲区char num_char[20];char formatString[20];int cur_num_width = decimal_width(cur->key);int num_space = (p->node_width - 2 - cur_num_width) >> 1;  // 数字后面需要补充的空格数量strncpy(formatString, "|%*d", sizeof(formatString));int i = 0;for(i=0; i<num_space; i++){strncat(formatString, " ", 2);}strncat(formatString, "|", 2);snprintf(num_char, sizeof(num_char), formatString, (p->node_width-2-num_space), cur->key);i = 0;while(num_char[i] != '\0'){p->disp_buffer[(p->disp_depth-1)*3][p->disp_column+i] = num_char[i];i++;}// 输出颜色到缓冲区char color_char[20];if(cur->color == RED){num_space = (p->node_width-2-3)>>1;strncpy(color_char, "|", 2);for(i=0; i<(p->node_width-2-3-num_space); i++){strncat(color_char, " ", 2);}strncat(color_char, "RED", 4);for(i=0; i<num_space; i++){strncat(color_char, " ", 2);}strncat(color_char, "|", 2);}else{num_space = (p->node_width-2-5)>>1;strncpy(color_char, "|", 2);for(i=0; i<(p->node_width-2-5-num_space); i++){strncat(color_char, " ", 2);}strncat(color_char, "BLACK", 6);for(i=0; i<num_space; i++){strncat(color_char, " ", 2);}strncat(color_char, "|", 2);}// strcpy(color_char, (cur->color == RED) ? "| RED |" : "|BLACK|");i = 0;while(color_char[i] != '\0'){p->disp_buffer[(p->disp_depth-1)*3+1][p->disp_column+i] = color_char[i];i++;}// 输出连接符到缓冲区if(p->disp_depth>1){char connector_char[10];strcpy(connector_char, (cur->parent->left == cur) ? "/" : "\\");p->disp_buffer[(p->disp_depth-1)*3-1][p->disp_column+(p->node_width>>1)] = connector_char[0];}// 下一层需要前进/后退的字符数int steps = 0;if(p->disp_depth+1 == p->max_depth){steps = (p->node_width>>1)+1;}else{steps = (1<<(p->max_depth - p->disp_depth - 2)) * p->node_width;}// 输出左侧节点p->disp_column -= steps;set_display_buffer(T, cur->left, p);p->disp_column += steps;// 输出右侧节点if(p->disp_depth+1 == p->max_depth){steps = p->node_width-steps;}p->disp_column += steps;set_display_buffer(T, cur->right, p);p->disp_column -= steps;p->disp_depth--;}
}// 以图的形式展示红黑树
void rbtree_display(rbtree *T){// 红黑树为空不画图if(T->root_node == T->nil_node){printf("rbtree DO NOT have any key!\n");return;}// 初始化参数结构体disp_parameters *para = (disp_parameters*)malloc(sizeof(disp_parameters));if(para == NULL){printf("disp_parameters struct malloc failed!");return;}rbtree_node *max_node = rbtree_max(T, T->root_node);para->max_num_width = decimal_width(max_node->key);    para->max_depth = rbtree_depth(T);para->node_width = (para->max_num_width<=5) ? 7 : (para->max_num_width+2);  // 边框“||”宽度2 + 数字宽度para->disp_depth = 0;para->disp_width = para->node_width * (1 << (para->max_depth-1)) + 1;para->disp_column = ((para->disp_width-para->node_width)>>1);int height = (para->max_depth-1)*3 + 2;// 根据树的大小申请内存para->disp_buffer = (char**)malloc(sizeof(char*)*height);int i = 0;for(i=0; i<height; i++){para->disp_buffer[i] = (char*)malloc(sizeof(char)*para->disp_width);memset(para->disp_buffer[i], ' ', para->disp_width);para->disp_buffer[i][para->disp_width-1] = '\0';}// 打印内容set_display_buffer(T, T->root_node, para);for(i=0; i<height; i++){printf("%s\n", para->disp_buffer[i]);}// 释放内存for(i=0; i<height; i++){free(para->disp_buffer[i]);}free(para->disp_buffer);free(para);
}// 检查当前红黑树的有效性:根节点黑色、红色不相邻、所有路径黑高相同
bool rbtree_check_effective(rbtree *T){bool rc_flag = true;  // 根节点黑色bool rn_flag = true;  // 红色不相邻bool bh_flag = true;  // 所有路径黑高相同if(T->root_node->color == RED){printf("ERROR! root-node's color is RED!\n");rc_flag = false;}else{int depth = rbtree_depth(T);int max_index_path = 1<<(depth-1);  // 从根节点出发的路径总数// 获取最左侧路径的黑高int black_height = 0;rbtree_node *cur = T->root_node;while(cur != T->nil_node){if(cur->color == BLACK) black_height++;cur = cur->left;// printf("bh = %d\n", black_height);}// 遍历每一条路径int i_path = 0;for(i_path=1; i_path<max_index_path; i_path++){int dir = i_path;int bh = 0;  // 当前路径的黑高cur = T->root_node;while(cur != T->nil_node){// 更新黑高if(cur->color == BLACK){bh++;}// 判断红色节点不相邻else{if((cur->left->color == RED) || (cur->right->color == RED)){printf("ERROR! red node %d has red child!\n", cur->key);rn_flag = false;}}// 更新下一节点// 0:left, 1:rightif(dir%2) cur = cur->right;else      cur = cur->left;dir = dir>>1;}if(bh != black_height){printf("ERROR! black height is not same! path 0 is %d, path %d is %d.\n", black_height, i_path, bh);bh_flag = false;}}}return (rc_flag && rn_flag && bh_flag);
}
/*------------------------------------------------------------------------*//*-----------------------------下面为测试代码-------------------------------*/
#if RBTREE_DEBUG
#include<time.h>  // 使用随机数
#include<sys/time.h>  // 计算qps中获取时间
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define ENABLE_QPS  1  // 是否开启qps性能测试
#define continue_test_len  1000000  // 连续测试的长度
// 冒泡排序
void bubble_sort(int arr[], int len) {int i, j, temp;for (i = 0; i < len - 1; i++)for (j = 0; j < len - 1 - i; j++)if (arr[j] > arr[j + 1]) {temp = arr[j];arr[j] = arr[j + 1];arr[j + 1] = temp;}
}
int main(){/* --------------------定义数组-------------------- */// 预定义的数组// int KeyArray[20] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20};  // 正着插// int KeyArray[20] = {20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1};  // 倒着插// int KeyArray[20] = {1,2,3,4,5,10,9,8,7,6,11,12,13,14,15,20,19,18,17,16};  // 乱序插// int KeyArray[31] = {11,12,13,14,15,16,17,18,19,20,1,2,3,4,5,6,7,8,9,10,21,22,23,24,25,26,27,28,29,30,31};  // 乱序插// 顺序增长的数组int len_array = 18;int KeyArray[len_array];int i_array = 0;for(i_array=0; i_array<len_array; i_array++){KeyArray[i_array] = i_array + 1;}// // 随机生成固定大小的随机数组// int len_array = 18;// int KeyArray[len_array];// srand(time(NULL));// int i_array = 0;// for(i_array=0; i_array<len_array; i_array++){//     KeyArray[i_array] = rand() % 9999999999;// }/* ------------------以下测试代码------------------ */printf("-------------------红黑树插入测试------------------\n");// 先给输入的数组排个序int len_max = sizeof(KeyArray)/sizeof(int);printf("测试数组长度: %d\n", len_max);// 将原先的数组深拷贝并升序排序int *KeyArray_sort = (int*)malloc(sizeof(KeyArray));printf("RAND_MAX: %d\n", RAND_MAX);int i = 0;for(i = 0; i < len_max; i++){KeyArray_sort[i] = KeyArray[i];}bubble_sort(KeyArray_sort, len_max);// 申请红黑树内存rbtree *T = rbtree_malloc();// 依次插入数据for(i = 0; i < len_max; i++){rbtree_insert(T, KeyArray[i], NULL);}// 遍历数据,查看是否符合红黑树性质// rbtree_display(T);if(rbtree_check_effective(T)){printf("PASS---->插入测试\n");}else{printf("FAIL---->插入测试\n");}// rbtree_display(T);printf("-------------------红黑树前驱节点测试------------------\n");int pass_flag = 1;if(rbtree_precursor_node(T, rbtree_search(T, KeyArray_sort[0])) != T->nil_node){printf("search first key %d's precursor error! get %d, expected nil_node\n", len_max, rbtree_precursor_node(T, rbtree_search(T, KeyArray_sort[0]))->key);pass_flag = 0;}for(i = 1; i<len_max; i++){rbtree_node *precursor = rbtree_precursor_node(T, rbtree_search(T, KeyArray_sort[i]));if(precursor->key != KeyArray_sort[i-1]){printf("search key %d error! get %d, expected %d\n", KeyArray_sort[i], precursor->key, KeyArray_sort[i-1]);pass_flag = 0;}}if(pass_flag){printf("PASS---->前驱节点测试\n");}else{printf("FAIL---->前驱节点测试\n");}printf("-------------------红黑树后继节点测试------------------\n");pass_flag = 1;if(rbtree_successor_node(T, rbtree_search(T, KeyArray_sort[len_max-1])) != T->nil_node){printf("search last key %d's successor error! get %d, expected nil_node\n",\KeyArray_sort[len_max-1],\rbtree_successor_node(T, rbtree_search(T, KeyArray_sort[len_max-1]))->key);pass_flag = 0;}for(i = 0; i<len_max-1; i++){rbtree_node *successor = rbtree_successor_node(T, rbtree_search(T, KeyArray_sort[i]));if(successor->key != KeyArray_sort[i+1]){printf("search key %d error! get %d, expected %d\n", KeyArray_sort[i], successor->key, KeyArray_sort[i+1]);pass_flag = 0;}}if(pass_flag){printf("PASS---->后继节点测试\n");}else{printf("FAIL---->后继节点测试\n");}printf("-------------------红黑树删除测试------------------\n");// 依次删除所有元素for(i=0; i<len_max; i++){rbtree_delete(T, rbtree_search(T, KeyArray_sort[i]));if(!rbtree_check_effective(T)){rbtree_display(T);printf("FAIL---->删除测试%d\n", i+1);break;}else{printf("PASS---->删除测试%d\n", i+1);}}printf("-------------------红黑树打印测试------------------\n");// 先插入数据1~18,再删除16/17/18,即可得到4层的满二叉树for(i = 0; i < len_max; i++){rbtree_insert(T, KeyArray[i], NULL);}for(i=0; i<3; i++){rbtree_delete(T, rbtree_search(T, KeyArray_sort[len_max-i-1]));if(!rbtree_check_effective(T)){printf("FAIL---->删除测试%d\n", KeyArray_sort[len_max-i-1]);break;}else{printf("PASS---->删除测试%d\n", KeyArray_sort[len_max-i-1]);}}// 打印看看结果rbtree_display(T);// 清空红黑树for(i=0; i<len_max; i++){rbtree_delete(T, rbtree_search(T, KeyArray_sort[i]));}#if ENABLE_QPSprintf("---------------红黑树连续插入性能测试---------------\n");// 定义时间结构体struct timeval tv_begin;struct timeval tv_end;gettimeofday(&tv_begin, NULL);for(i = 0; i < continue_test_len; i++){rbtree_insert(T, i+1, NULL);}gettimeofday(&tv_end, NULL);int time_ms = TIME_SUB_MS(tv_end, tv_begin);float qps = (float)continue_test_len / (float)time_ms * 1000;printf("total INSERTs:%d  time_used:%d(ms)  qps:%.2f(INSERTs/sec)\n", continue_test_len, time_ms, qps);printf("---------------红黑树连续查找性能测试---------------\n");gettimeofday(&tv_begin, NULL);for(i = 0; i < continue_test_len; i++){// rbtree_search(T, i+1);if(rbtree_search(T, i+1)->key != i+1){printf("continue_search error!\n");return 0;}}gettimeofday(&tv_end, NULL);time_ms = TIME_SUB_MS(tv_end, tv_begin);qps = (float)continue_test_len / (float)time_ms * 1000;printf("total SEARCHs:%d  time_used:%d(ms)  qps:%.2f(SEARCHs/sec)\n", continue_test_len, time_ms, qps);printf("---------------红黑树连续删除性能测试---------------\n");gettimeofday(&tv_begin, NULL);for(i = 0; i < continue_test_len; i++){rbtree_delete(T, rbtree_search(T, i+1));}gettimeofday(&tv_end, NULL);time_ms = TIME_SUB_MS(tv_end, tv_begin);qps = (float)continue_test_len / (float)time_ms * 1000;printf("total DELETEs:%d  time_used:%d(ms)  qps:%.2f(DELETEs/sec)\n", continue_test_len, time_ms, qps);
#endifprintf("--------------------------------------------------\n");rbtree_free(T); // 别忘了释放内存free(KeyArray_sort);return 0;
}
#endif

测试代码输出结果

lyl@ubuntu:~/Desktop/kv-store/code_init$ gcc -o main rbtree_int.c 
lyl@ubuntu:~/Desktop/kv-store/code_init$ ./main
-------------------红黑树插入测试------------------
测试数组长度: 18
RAND_MAX: 2147483647
PASS---->插入测试
-------------------红黑树前驱节点测试------------------
PASS---->前驱节点测试
-------------------红黑树后继节点测试------------------
PASS---->后继节点测试
-------------------红黑树删除测试------------------
PASS---->删除测试1
PASS---->删除测试2
PASS---->删除测试3
PASS---->删除测试4
PASS---->删除测试5
PASS---->删除测试6
PASS---->删除测试7
PASS---->删除测试8
PASS---->删除测试9
PASS---->删除测试10
PASS---->删除测试11
PASS---->删除测试12
PASS---->删除测试13
PASS---->删除测试14
PASS---->删除测试15
PASS---->删除测试16
PASS---->删除测试17
PASS---->删除测试18
-------------------红黑树打印测试------------------
PASS---->删除测试18
PASS---->删除测试17
PASS---->删除测试16|  8  |                        |BLACK|                        /                           \             |  4  |                     |  12 |          | RED |                     | RED |          /             \             /             \      |  2  |       |  6  |       |  10 |       |  14 |   |BLACK|       |BLACK|       |BLACK|       |BLACK|   /      \      /      \      /      \      /      \   
|  1  ||  3  ||  5  ||  7  ||  9  ||  11 ||  13 ||  15 |
|BLACK||BLACK||BLACK||BLACK||BLACK||BLACK||BLACK||BLACK|
There is NO key=16 in rbtree!
There is NO key=17 in rbtree!
There is NO key=18 in rbtree!
---------------红黑树连续插入性能测试---------------
total INSERTs:1000000  time_used:295(ms)  qps:3389830.50(INSERTs/sec)
---------------红黑树连续查找性能测试---------------
total SEARCHs:1000000  time_used:118(ms)  qps:8474576.00(SEARCHs/sec)
---------------红黑树连续删除性能测试---------------
total DELETEs:1000000  time_used:95(ms)  qps:10526315.00(DELETEs/sec)
--------------------------------------------------

编程感想:

  1. 每一次旋转都是一次谋权篡位。
  2. 双旋的时候,最后的“当前节点”应该是原来的“祖父节点/父节点”,若还保留当前身份,那么会造成错误。

红黑树插入:

  • 参考B站:【neko算法课】红黑树 插入【11期】

红黑树删除:

  • 参考B站:【neko算法课】红黑树 删除【12期】
  • 参考微信图文:图解:什么是二叉排序树?–介绍了标准BST删除操作
  • 参考知乎:图解:红黑树删除篇(一文读懂)–里面的v对应del_r、u对应del_r_child

红黑树打印:

  • 参考CSDN:二叉树生成与打印显示 c语言实现

2.4 btree的实现

图6 6阶B树示意图——依次插入26个英文字母

  一般来说,B树也是一个自平衡的二叉搜索树。但与红黑树不同的是,B树的节点可以存储多个元素, m m m阶B树的单个节点,最多有 m − 1 m-1 m1 个元素、 m m m 个子节点。并且B树只有孩子节点、没有父节点(没有向上的指针)。也就是说,对于插入/删除操作,红黑树可以先从上往下寻找插入位置,再从下往上进行调整;而B树要先从上往下调整完(“分裂、合并/借位”),最后在叶子节点进行插入/删除,而没有从下往上的过程。即进行插入/删除时,B树从上往下只走一次。下面给出一个 m m m阶B树应该满足的条件(判断一棵B树是否有效的依据):

  1. 每个结点至多拥有 m m m颗子树。
  2. 根结点至少拥有两颗子树。
  3. 除了根结点以外,其余每个分支结点至少拥有 m / 2 m/2 m/2棵子树。
  4. 所有的叶结点都在同一层上。
  5. k k k 棵子树的分支结点则存在 k − 1 k-1 k1 个元素,元素按照递增顺序进行排序。
  6. 单个节点的元素数量 n n n 满足 ceil ( m / 2 ) − 1 ≤ n ≤ m − 1 \text{ceil}(m/2)-1 \le n \le m-1 ceil(m/2)1nm1

B树可视化网站:https://www.cs.usfca.edu/~galles/visualization/BTree.html

同样的,B树的查找操作只需要从根节点不断比较即可,而B树的插入/删除逻辑如下:

B树插入:从上往下寻找要插入的叶子节点,过程中要下去的孩子若是满节点,则进行“分裂”。

  • 分裂:取孩子的中间节点(第 ceil m 2 \text{ceil}\frac{m}{2} ceil2m 个)放上来,剩下的元素列变成两个子节点。
  • 新元素必然添加到叶子节点上。
  • 注意:只有根节点分裂会增加层高,其余的不会。

B树删除:从上往下寻找要删除元素的所在节点,过程中看情况进行“合并/借位”。若所在节点不是叶子节点,就将其换到叶子节点中。最后在叶子节点删除元素。

  • 合并:从当前节点下放一个元素,然后该元素对应的两个子节点合并成一个子节点。团结就是力量。
  • 借位:从当前节点下方一个元素到元素较少的孩子,然后从当前元素的另一个孩子节点拉上来一个元素取代位置,注意大小顺序。损有余而补不足。
// 遍历到叶子节点
while(不是叶子节点){// 1. 确定下一节点和其兄弟节点if(当前节点有要删除的元素) 哪边少哪边就是下一节点,当前元素对应的另一边就是兄弟节点。else(当前节点没有要删除的元素) 确定好要去的下一节点后,左右两边谁多谁是兄弟节点。// 2. 看是否需要调整if(下一节点元素少){if(孩子的兄弟节点元素多) 借位,进入下一节点。else(孩子的兄弟节点元素少) 合并,进入合并后的节点。}else if(下一节点元素多 && 当前节点有要删除元素){if(下一节点是删除元素的左节点) 删除元素和其前驱元素换位,进入下一节点。else(下一节点是删除元素的右节点) 删除元素和其后继元素换位,进入下一节点。}else{直接进入下一节点。}
}
// 然后在叶子节点删除元素

注:判断孩子节点的元素少的条件是 元素数量 ≤ ceil m 2 − 1 \le \text{ceil}\frac{m}{2}-1 ceil2m1,判断元素多的条件是 元素数量 ≥ ceil m 2 \ge\text{ceil}\frac{m}{2} ceil2m

根据上述原理,我使用C语言实现了B树完整的增删查操作,并增加了检验有效性打印B树的代码,以及测试代码(终端显示进度条)。同样为了加快开发速度,预设“键值对”的类型为int keyvoid* value,后续将B树添加进“kv存储协议”中时会进一步修改:

btree_int.c-共989行

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<stdbool.h>// 编译指令:gcc -o main 1-1btree.c
// 本代码实现M阶B树,存储int型key,未定义value。#define BTREE_DEBUG 1  // 是否运行测试代码typedef struct _btree_node{int *keys;void *values;struct _btree_node **children;int num;  // 当前节点的实际元素数量int leaf; // 当前节点是否为叶子节点
}btree_node;typedef struct _btree{int m;  // m阶B树struct _btree_node *root_node;
}btree;/*
下面是所有的函数声明,排列顺序与源代码调用相同,最外层的函数放在最下面。
*/
/*----初始化分配内存----*/
// 创建单个节点,leaf表示是否为叶子节点
btree_node *btree_node_create(btree *T, int leaf);
// 初始化m阶B树:分配内存,最后记得销毁B树btree_destroy()
btree *btree_init(int m);/*----释放内存----*/
// 删除单个节点
void btree_node_destroy(btree_node *cur);
// 递归删除给定节点作为根节点的子树
void btree_node_destroy_recurse(btree_node *cur);
// 删除所有节点,释放btree内存
btree *btree_destroy(btree *T);/*----插入操作----*/
// 根节点分裂
btree_node* btree_root_split(btree *T);
// 索引为idx的孩子节点分裂
btree_node* btree_child_split(btree *T, btree_node* cur, int idx);
// btree插入元素:先分裂,再插入,必然在叶子节点插入
void btree_insert_key(btree *T, int key);/*----删除操作----*/
// 借位:将cur节点的idx_key元素下放到idx_dest孩子
btree_node *btree_borrow(btree_node *cur, int idx_key, int idx_dest);
// 合并:将cur节点的idx元素向下合并
btree_node *btree_merge(btree *T, btree_node *cur, int idx);
// 找出当前节点索引为idx_key的元素的前驱节点
btree_node* btree_precursor_node(btree *T, btree_node *cur, int idx_key);
// 找出当前节点索引为idx_key的元素的后继节点
btree_node* btree_successor_node(btree *T, btree_node *cur, int idx_key);
// btree删除元素:先合并/借位,再删除,必然在叶子节点删除
void btree_delete_key(btree *T, int key);/*----查找操作----*/
// 查找key
btree_node* btree_search_key(btree *T, int key);/*----打印信息----*/
// 打印当前节点信息
void btree_node_print(btree_node *cur);
// 先序遍历给定节点为根节点的子树(递归)
void btree_traversal_node(btree *T, btree_node *cur);
// btree遍历
void btree_traversal(btree *T);/*----检查有效性----*/
// 获取B树的高度
int btree_depth(btree *T);
// 检查给定节点的有效性
// 键值:根节点至少有一个key,其余节点至少有ceil(m/2)-1个key
// 分支:所有节点数目子树为当前节点元素数量+1
bool btree_node_check_effective(btree *T, btree_node *cur);
// 遍历所有路径检查m阶B树的有效性
// 平衡性:所有叶节点都在同一层(所有路径高度相等)
// 有序性:所有元素升序排序
// 键值:根节点至少有一个key,其余节点至少有ceil(m/2)-1个key
// 分支:所有节点数目子树为当前节点元素数量+1
bool btree_check_effective(btree *T);/*-----------------------------下面为函数定义-------------------------------*/
// 创建单个节点,leaf表示是否为叶子节点
btree_node *btree_node_create(btree *T, int leaf){btree_node *new = (btree_node*)malloc(sizeof(btree_node));if(new == NULL){printf("btree node malloc failed!\n");return NULL;}new->keys = (int*)calloc(T->m-1, sizeof(int));new->values = (void*)calloc(T->m-1, sizeof(void));new->children = (btree_node **)calloc(T->m, sizeof(btree_node*));new->num = 0;new->leaf = leaf;return new;
}// 删除单个节点
void btree_node_destroy(btree_node *cur){free(cur->keys);free(cur->values);free(cur->children);free(cur);
}// 初始化m阶B树:分配内存,最后记得销毁B树btree_destroy()
btree *btree_init(int m){btree *T = (btree*)malloc(sizeof(btree));if(T == NULL){// 只有内存不够时才会分配失败printf("rbtree malloc failed!\n");return NULL;}T->m = m;T->root_node = NULL;
}// 递归删除给定节点作为根节点的子树
void btree_node_destroy_recurse(btree_node *cur){int i = 0;if(cur->leaf == 1){btree_node_destroy(cur);}else{for(i=0; i<cur->num+1; i++){btree_node_destroy_recurse(cur->children[i]);}}
}// 释放btree内存
btree *btree_destroy(btree *T){// 删除所有节点if(T->root_node != NULL){btree_node_destroy_recurse(T->root_node);}// 删除btreefree(T);
}// 根节点分裂
btree_node* btree_root_split(btree *T){// 创建兄弟节点btree_node *brother = btree_node_create(T, T->root_node->leaf);int i = 0;for(i=0; i<((T->m-1)>>1); i++){brother->keys[i] = T->root_node->keys[i+(T->m>>1)];T->root_node->keys[i+(T->m>>1)] = 0;brother->children[i] = T->root_node->children[i+(T->m>>1)];T->root_node->children[i+(T->m>>1)] = NULL;brother->num++;T->root_node->num--;}// 还需要复制最后一个指针brother->children[brother->num] = T->root_node->children[T->m-1];T->root_node->children[T->m-1] = NULL;// 创建新的根节点btree_node *new_root = btree_node_create(T, 0);new_root->keys[0] = T->root_node->keys[T->root_node->num-1];T->root_node->keys[T->root_node->num-1] = 0;T->root_node->num--;new_root->num = 1;new_root->children[0] = T->root_node;new_root->children[1] = brother;T->root_node = new_root;return T->root_node;
}// 索引为idx的孩子节点分裂
btree_node* btree_child_split(btree *T, btree_node* cur, int idx){// 创建孩子的兄弟节点btree_node *full_child = cur->children[idx];btree_node *new_child = btree_node_create(T, cur->children[idx]->leaf);int i = 0;for(i=0; i<((T->m-1)>>1); i++){new_child->keys[i] = full_child->keys[i+(T->m>>1)];full_child->keys[i+(T->m>>1)] = 0;new_child->children[i] = full_child->children[i+(T->m>>1)];full_child->children[i+(T->m>>1)] = NULL;new_child->num++;full_child->num--;}new_child->children[new_child->num] = full_child->children[T->m-1];full_child->children[T->m-1] = NULL;// 把孩子的元素拿上来// 调整自己的key和childrenfor(i=cur->num; i>idx; i--){cur->keys[i] = cur->keys[i-1];cur->children[i+1] = cur->children[i];}cur->children[idx+1] = new_child;cur->keys[idx] = full_child->keys[full_child->num-1];full_child->keys[full_child->num-1] = 0;cur->num++;full_child->num--;return cur;
}// btree插入元素:先分裂,再插入,必然在叶子节点插入
void btree_insert_key(btree *T, int key){btree_node *cur = T->root_node;if(key <= 0){// printf("illegal insert: key=%d!\n", key);}else if(cur == NULL){btree_node *new = btree_node_create(T, 1);new->keys[0] = key;new->num = 1;T->root_node = new;}else{// 函数整体逻辑:从根节点逐步找到元素要插入的叶子节点,先分裂、再添加// 先查看根节点是否需要分裂if(cur->num == T->m-1){cur = btree_root_split(T);}// 从根节点开始寻找要插入的叶子节点while(cur->leaf == 0){// 找到下一个要比较的孩子节点int next_idx = 0;  // 要进入的孩子节点的索引int i = 0;for(i=0; i<cur->num; i++){if(key == cur->keys[i]){// printf("insert failed! already has key=%d!\n", key);return;}else if(key < cur->keys[i]){next_idx = i;break;}else if(i == cur->num-1){next_idx = cur->num;}}// 查看孩子是否需要分裂,不需要就进入if(cur->children[next_idx]->num == T->m-1){cur = btree_child_split(T, cur, next_idx);}else{cur = cur->children[next_idx];}}// 将新元素插入到叶子节点中int i = 0;int pos = 0;  // 要插入的位置for(i=0; i<cur->num; i++){if(key == cur->keys[i]){// printf("insert failed! already has key=%d!\n", key);return;}else if(key < cur->keys[i]){pos = i;break;}else if(i == cur->num-1){pos = cur->num;}}// 插入元素if(pos == cur->num){cur->keys[cur->num] = key;}else{for(i=cur->num; i>pos; i--){cur->keys[i] = cur->keys[i-1];}cur->keys[pos] = key;}cur->num++;}
}// 借位:将cur节点的idx_key元素下放到idx_dest孩子
btree_node *btree_borrow(btree_node *cur, int idx_key, int idx_dest){int idx_sour = (idx_key == idx_dest) ? idx_dest+1 : idx_key;btree_node *node_dest = cur->children[idx_dest];  // 目的节点btree_node *node_sour = cur->children[idx_sour];  // 源节点if(idx_key == idx_dest){// 自己下去作为目的节点的最后一个元素node_dest->keys[node_dest->num] = cur->keys[idx_key];node_dest->children[node_dest->num+1] = node_sour->children[0];node_dest->num++;// 把源节点的第一个元素请上来cur->keys[idx_key] = node_sour->keys[0];for(int i=0; i<node_sour->num-1; i++){node_sour->keys[i] = node_sour->keys[i+1];node_sour->children[i] = node_sour->children[i+1];}node_sour->children[node_sour->num-1] = node_sour->children[node_sour->num];node_sour->children[node_sour->num] = NULL;node_sour->keys[node_sour->num-1] = 0;node_sour->num--;}else{// 自己下去作为目的节点的第一个元素node_dest->children[node_dest->num+1] = node_dest->children[node_dest->num];for(int i=node_dest->num; i>0; i--){node_dest->keys[i] = node_dest->keys[i-1];node_dest->children[i] = node_dest->children[i-1];}node_dest->keys[0] = cur->keys[idx_key];node_dest->children[0] = node_sour->children[node_sour->num];node_dest->num++;// 把源节点的最后一个元素请上来cur->keys[idx_key] = node_sour->keys[node_sour->num-1];node_sour->keys[node_sour->num-1] = 0;node_sour->children[node_sour->num] = NULL;node_sour->num--;}return node_dest;
}// 合并:将cur节点的idx元素向下合并
btree_node *btree_merge(btree *T, btree_node *cur, int idx){btree_node *left = cur->children[idx];btree_node *right = cur->children[idx+1];// 自己下去左孩子,调整当前节点left->keys[left->num] = cur->keys[idx];left->num++;for(int i=idx; i<cur->num-1; i++){cur->keys[i] = cur->keys[i+1];cur->children[i+1] = cur->children[i+2];}cur->keys[cur->num-1] = 0;cur->children[cur->num] = NULL;cur->num--;// 右孩子复制到左孩子for(int i=0; i<right->num; i++){left->keys[left->num] = right->keys[i];left->children[left->num] = right->children[i];left->num++;}left->children[left->num] = right->children[right->num];// 删除右孩子btree_node_destroy(right);// 更新根节点if(T->root_node == cur){btree_node_destroy(cur);T->root_node = left;}return left;
}// 找出当前节点索引为idx_key的元素的前驱节点
btree_node* btree_precursor_node(btree *T, btree_node *cur, int idx_key){if(cur->leaf == 0){cur = cur->children[idx_key];while(cur->leaf == 0){cur = cur->children[cur->num];}}return cur;
}// 找出当前节点索引为idx_key的元素的后继节点
btree_node* btree_successor_node(btree *T, btree_node *cur, int idx_key){if(cur->leaf == 0){cur = cur->children[idx_key+1];while(cur->leaf == 0){cur = cur->children[0];}}return cur;
}// btree删除元素:先合并/借位,再删除,必然在叶子节点删除
void btree_delete_key(btree *T, int key){if(T->root_node!=NULL && key>0){btree_node *cur = T->root_node;// 在去往叶子节点的过程中不断调整(合并/借位)while(cur->leaf == 0){// 看看要去哪个孩子int idx_next = 0; //下一个要去的孩子节点索引int idx_bro = 0;int idx_key = 0;if(key < cur->keys[0]){idx_next = 0;idx_bro = 1;}else if(key > cur->keys[cur->num-1]){idx_next = cur->num;idx_bro = cur->num-1;}else{for(int i=0; i<cur->num; i++){if(key == cur->keys[i]){// 哪边少去哪边if(cur->children[i]->num <= cur->children[i+1]->num){idx_next = i;idx_bro = i+1;}else{idx_next = i+1;idx_bro = i;}break;}else if((i<cur->num-1) && (key > cur->keys[i]) && (key < cur->keys[i+1])){idx_next = i + 1;// 谁多谁是兄弟if(cur->children[i]->num > cur->children[i+2]->num){idx_bro = i;}else{idx_bro = i+2;}break;}}}idx_key = (idx_next < idx_bro) ? idx_next : idx_bro;// 依据孩子节点的元素数量进行调整if(cur->children[idx_next]->num <= ((T->m>>1)-1)){// 借位:下一孩子的元素少,下一孩子的兄弟节点的元素多if(cur->children[idx_bro]->num >= (T->m>>1)){cur = btree_borrow(cur, idx_key, idx_next);}// 合并:两个孩子都不多else{cur = btree_merge(T, cur, idx_key);}}else if(cur->keys[idx_key] == key){// 若当前元素就是要删除的节点,那一定要送下去// 但是不能借位,而是将前驱元素搬上来btree_node* pre;int tmp;if(idx_key == idx_next){// 找到前驱节点pre = btree_precursor_node(T, cur, idx_key);// 交换 当前元素 和 前驱节点的最后一个元素tmp = pre->keys[pre->num-1];pre->keys[pre->num-1] = cur->keys[idx_key];cur->keys[idx_key] = tmp;}else{// 找到后继节点pre = btree_successor_node(T, cur, idx_key);// 交换 当前元素 和 后继节点的第一个元素tmp = pre->keys[0];pre->keys[0] = cur->keys[idx_key];cur->keys[idx_key] = tmp;}cur = cur->children[idx_next];// cur = btree_borrow(cur, idx_key, idx_next);}else{cur = cur->children[idx_next];}}// 叶子节点删除元素for(int i=0; i<cur->num; i++){if(cur->keys[i] == key){if(cur->num == 1){// 若B树只剩最后一个元素btree_node_destroy(cur);T->root_node = NULL;}else{if(i != cur->num-1){for(int j=i; j<(cur->num-1); j++){cur->keys[j] = cur->keys[j+1];}}cur->keys[cur->num-1] = 0;cur->num--;}}// else if(i == cur->num-1){//     printf("there is no key=%d\n", key);// }}}
}// 打印当前节点信息
void btree_node_print(btree_node *cur){if(cur == NULL){printf("NULL\n");}else{printf("leaf:%d, num:%d, key:|", cur->leaf, cur->num);for(int i=0; i<cur->num; i++){printf("%d|", cur->keys[i]);}printf("\n");}
}// 先序遍历给定节点为根节点的子树(递归)
void btree_traversal_node(btree *T, btree_node *cur){// 打印当前节点信息btree_node_print(cur);// 依次打印所有子节点信息if(cur->leaf == 0){int i = 0;for(i=0; i<cur->num+1; i++){btree_traversal_node(T, cur->children[i]);}}
}// btree遍历
void btree_traversal(btree *T){if(T->root_node != NULL){btree_traversal_node(T, T->root_node);}else{// printf("btree_traversal(): There is no key in B-tree!\n");}
}// 查找key
btree_node* btree_search_key(btree *T, int key){if(key > 0){btree_node *cur = T->root_node;// 先寻找是否为非叶子节点while(cur->leaf == 0){if(key < cur->keys[0]){cur = cur->children[0];}else if(key > cur->keys[cur->num-1]){cur = cur->children[cur->num];}else{for(int i=0; i<cur->num; i++){if(cur->keys[i] == key){return cur;}else if((i<cur->num-1) && (key > cur->keys[i]) && (key < cur->keys[i+1])){cur = cur->children[i+1];}}}}// 在寻找是否为叶子节点if(cur->leaf == 1){for(int i=0; i<cur->num; i++){if(cur->keys[i] == key){return cur;}}}}// 都没找到返回NULLreturn NULL;
}// 获取B树的高度
int btree_depth(btree *T){int depth = 0;btree_node *cur = T->root_node;while(cur != NULL){depth++;cur = cur->children[0];}return depth;
}// 检查给定节点的有效性
// 键值:根节点至少有一个key,其余节点至少有ceil(m/2)-1个key
// 分支:所有节点数目子树为当前节点元素数量+1
bool btree_node_check_effective(btree *T, btree_node *cur){bool eff_flag = true;// 统计键值和子节点数量int num_kvs = 0, num_child = 0;int i = 0;while(cur->keys[i] != 0){// 判断元素是否递增if(i>=1 && (cur->keys[i] <= cur->keys[i-1])){printf("ERROR! the following node DOT sorted!\n");btree_node_print(cur);eff_flag = false;break;}// 统计数量num_kvs++;i++;}i = 0;while(cur->children[i] != NULL){// 子节点和当前节点的有序性if(i<num_kvs){if(cur->keys[i] <= cur->children[i]->keys[cur->children[i]->num]){printf("ERROR! the follwing node's child[%d] has bigger key=%d than %d\n", i, cur->children[i]->keys[cur->children[i]->num], cur->keys[i]);printf("follwing node--");btree_node_print(cur);printf("  error child--");btree_node_print(cur->children[i]);eff_flag = false;}else if(cur->keys[i] >= cur->children[i+1]->keys[0]){printf("ERROR! the follwing node's child[%d] has smaller key=%d than %d\n", i+1, cur->children[i+1]->keys[0], cur->keys[i]);printf("follwing node--");btree_node_print(cur);printf("  error child--");btree_node_print(cur->children[i+1]);eff_flag = false;}}// 统计数量num_child++;i++;}// 判断元素数量是否合理if(cur->num >= T->m){printf("ERROR! the follwing node has too much keys:%d(at most %d)\n", cur->num, T->m-1);btree_node_print(cur);eff_flag = false;}if((cur != T->root_node) && (num_kvs<((T->m>>1)-1))){printf("ERROR! the follwing node has too few keys:%d(at least %d)\n", num_kvs, (T->m>>1)-1);btree_node_print(cur);eff_flag = false;}if(num_kvs != cur->num){printf("ERROR! the follwing node has %d keys but num=%d\n", num_kvs, cur->num);btree_node_print(cur);eff_flag = false;}if((cur->leaf == 0) && (num_child != cur->num+1)){printf("ERROR! the follwing node has %d keys but %d child(except keys+1=child)\n", num_kvs, num_child);btree_node_print(cur);eff_flag = false;}return eff_flag;
}// 遍历所有路径检查m阶B树的有效性
// 平衡性:所有叶节点都在同一层(所有路径高度相等)
// 有序性:所有元素升序排序
// 键值:根节点至少有一个key,其余节点至少有ceil(m/2)-1个key
// 分支:所有节点数目子树为当前节点元素数量+1
bool btree_check_effective(btree *T){bool effe_flag = true;int depth = btree_depth(T);if(depth == 0){// printf("btree_check_effective(): There is no key in B-tree!\n");}else if(depth == 1){// 只有一个根节点effe_flag = btree_node_check_effective(T, T->root_node);}else{// 最大的可能路径数量int max_path = 1;int depth_ = depth-1;while(depth_ != 0){max_path *= T->m;depth_--;}// 遍历所有路径(每个路径对应一个叶子节点)btree_node *cur = T->root_node;int i_path = 0;for(i_path=0; i_path<max_path; i_path++){int dir = i_path;  // 本次路径的方向控制int i_height = 0;  // 本次路径的高度int i_effe = 1; // 指示是否存在本路径cur = T->root_node;while(cur != NULL){// 当前节点的有效性effe_flag = btree_node_check_effective(T, cur);if(!effe_flag) break;// 更新高度i_height++;// 更新下一节点if(cur->children[dir%T->m]==NULL && !cur->leaf){i_effe = 0;break;}cur = cur->children[dir%T->m];dir /= T->m;}// if(btree_node_check_effective(T, cur))// 判断本路径节点数(高度)if(i_height != depth && i_effe){printf("ERROR! not all leaves in the same layer! the leftest path's height=%d, while the %dst path's height=%d.\n",depth, i_path, i_height);effe_flag = false;}if(!effe_flag) break;}}return effe_flag;
}/*-----------------------------下面为测试代码-------------------------------*/
#if BTREE_DEBUG
#include<time.h>  // 使用随机数
#include<sys/time.h>  // 计算qps中获取时间
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define ENABLE_QPS  1  // 是否开启qps性能测试
#define continue_test_len  10000000  // 连续测试的长度
// 冒泡排序
void bubble_sort(int arr[], int len) {int i, j, temp;for (i = 0; i < len - 1; i++)for (j = 0; j < len - 1 - i; j++)if (arr[j] > arr[j + 1]) {temp = arr[j];arr[j] = arr[j + 1];arr[j + 1] = temp;}
}
// 打印当前数组
void print_int_array(int* KeyArray, int len_array){printf("测试数组为KeyArray[%d] = {", len_array);for(int i=0; i<len_array; i++){if(i == len_array-1){printf("%d", KeyArray[i]);}else{printf("%d, ", KeyArray[i]);}}printf("}\n");
}
int main(){// 定义/* --------------------定义数组-------------------- */// 预定义的数组// int KeyArray[20] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20};  // 正着插// int KeyArray[20] = {20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1};  // 倒着插// int KeyArray[20] = {1,2,3,4,5,10,9,8,7,6,11,12,13,14,15,20,19,18,17,16};  // 乱序插// int KeyArray[31] = {11,12,13,14,15,16,17,18,19,20,1,2,3,4,5,6,7,8,9,10,21,22,23,24,25,26,27,28,29,30,31};  // 乱序插// int KeyArray[18] = {18,8,13,9,13,0,7,13,14,7,1,7,19,7,9,18,17,18};  // 乱序插// // 顺序增长的数组// int len_array = 26;// int KeyArray[len_array];// int i_array = 0;// for(i_array=0; i_array<len_array; i_array++){//     KeyArray[i_array] = i_array + 1;// }// // 随机生成固定大小的随机数组// int len_array = 18;// int KeyArray[len_array];// srand(time(NULL));// int i_array = 0;// for(i_array=0; i_array<len_array; i_array++){//     // KeyArray[i_array] = rand() % 9999999999;//     KeyArray[i_array] = rand() % 20;// }// printf("RAND_MAX: %d\n", RAND_MAX);/* ------------------以下测试代码------------------ */// printf("-------------------B树插入测试------------------\n");// // 先给输入的数组排个序// int len_max = sizeof(KeyArray)/sizeof(int);// printf("测试数组长度: %d\n", len_max);// // 将原先的数组深拷贝并升序排序// int *KeyArray_sort = (int*)malloc(sizeof(KeyArray));// int i = 0;// for(i = 0; i < len_max; i++){//     KeyArray_sort[i] = KeyArray[i];// }// bubble_sort(KeyArray_sort, len_max);int max_test = 100;         // 测试的总次数int len_array = 1000;      // 单次测试的数组长度bool detail_flag = false;  // 是否打印详细信息bool pass_flag = true;printf("---------------------常规测试---------------------\n");for(int i_test=0; i_test<max_test; i_test++){// 随机生成固定大小的随机数组int KeyArray[len_array];srand(time(NULL));for(int i_array=0; i_array<len_array; i_array++){// KeyArray[i_array] = rand() % 9999999999;KeyArray[i_array] = rand() % len_array;}// int KeyArray[10000] = {};// 申请红黑树内存btree *T = btree_init(6);btree *T_old = btree_init(6);if(detail_flag){printf("--------------------开始测试--------------------\n");print_int_array(KeyArray, len_array);}if(detail_flag){printf("-------------------B树插入测试------------------\n");}/*-------------------B树插入测试------------------*/// 依次插入数据,并检查有效性for(int i=0; i<len_array; i++){if(i>0){btree_insert_key(T_old, KeyArray[i-1]);}btree_insert_key(T, KeyArray[i]);// 方便打印调试// if(i==65){//     printf("Before insert the %2d's key=%d:\n", i+1, KeyArray[i]);//     btree_traversal(T_old);//     printf("After insert the %2d's key=%d:\n", i+1, KeyArray[i]);//     btree_traversal(T);// }if(btree_check_effective(T)==false){printf("after insert KeyArray[%d]=%d error!\n", i, KeyArray[i]);pass_flag = false;break;}}if(pass_flag){if(detail_flag) printf("PASS---->插入测试%d/%d\n", i_test+1, max_test);btree_insert_key(T_old, KeyArray[len_array]);}else{if(detail_flag) printf("FAIL---->插入测试%d/%d\n", i_test+1, max_test);// printf("Before insert:\n");// btree_traversal(T_old);// printf("After insert:\n");// btree_traversal(T);btree_destroy(T_old);btree_destroy(T);break;}if(detail_flag){btree_traversal(T);printf("\n");}if(detail_flag){printf("-------------------B树查找测试------------------\n");}/*-------------------B树查找测试------------------*/btree_node* sear = NULL;for(int i=0; i<len_array; i++){if(KeyArray[i] > 0){sear = btree_search_key(T, KeyArray[i]);pass_flag = false;if(sear != NULL){for(int j=0; j<sear->num; j++){if(sear->keys[j] == KeyArray[i]){pass_flag = true;break;}}}if(detail_flag){printf("search KeyArray[%d]=%d  ---->  ", i, KeyArray[i]);btree_node_print(sear);}}if(pass_flag == false){print_int_array(KeyArray, len_array);  // 打印当前数组printf("following node DOT has KeyArray[%d]=%d!\n", i, KeyArray[i]);btree_node_print(sear);pass_flag = false;break;}}if(pass_flag){if(detail_flag) printf("PASS---->查找测试%d/%d\n", i_test+1, max_test);}else{printf("FAIL---->查找测试%d/%d\n", i_test+1, max_test);break;}if(detail_flag){printf("-------------------B树删除测试------------------\n");}/*-------------------B树删除测试------------------*/for(int i=0; i<len_array; i++){// if(i==496){//     // 加一句打印方便调试暂停//     printf("Now delete KeyArray[%d]=%d:\n", i, KeyArray[i]);// }if(i>0){btree_delete_key(T_old, KeyArray[i-1]);}btree_delete_key(T, KeyArray[i]);if(detail_flag){printf("delete KeyArray[%d]=%d:\n", i, KeyArray[i]);btree_traversal(T);} if(btree_check_effective(T) == false){print_int_array(KeyArray, len_array);  // 打印当前数组printf("after delete KeyArray[%d]=%d error!\n", i, KeyArray[i]);pass_flag = false;break;}}if(pass_flag){if(detail_flag) printf("PASS---->删除测试%d/%d\n", i_test+1, max_test);}else{printf("FAIL---->删除测试%d/%d\n", i_test+1, max_test);// printf("Before delete:\n");// btree_traversal(T_old);// printf("After delete:\n");// btree_traversal(T);btree_destroy(T_old);btree_destroy(T);break;}if(detail_flag){printf("--------------------------------------------------\n");}btree_destroy(T_old);btree_destroy(T);// 整点进度条看看if(pass_flag){// printf("PASS----> WHOLE TEST %d/%d!\r", i_test+1, max_test);int bar_process;         // 编译器初始化为0bool already_print_txt;  // 编译器初始化为falsebool already_print_bar;  // 编译器初始化为falseconst int len_bar = 20;  // 完整进度条的长度// 打印进度条前面的说明if(!already_print_txt){printf("PASS TEST PROCESS: ");fflush(stdout);}already_print_txt = true;// 打印进度条if(len_bar*(i_test+1)/max_test > bar_process){// 光标往前回退if(already_print_bar){printf("\033[4D");  // ANSI转义序列}// 画出进度条for(int i=0; i<(len_bar*(i_test+1)/max_test - bar_process); i++){printf("█");fflush(stdout);}// 显示进度范围printf(" %d%%", 100*(i_test+1)/max_test);fflush(stdout);already_print_bar = true;bar_process = len_bar*(i_test+1)/max_test;if(i_test+1 == max_test) printf("\n");}}}// 只是为了最后一行判断用if(pass_flag){// printf("\r\033[K");  // 清除本行printf("PASS----> ALL %d TEST!\n", max_test);}printf("--------------------------------------------------\n");printf("---------------------性能测试---------------------\n");btree* bT = btree_init(16);  // 初始化16阶B树// 定义时间结构体struct timeval tv_begin;struct timeval tv_end;// 插入性能测试gettimeofday(&tv_begin, NULL);for(int i=0; i<continue_test_len; i++){btree_insert_key(bT, i+1);}gettimeofday(&tv_end, NULL);int time_ms = TIME_SUB_MS(tv_end, tv_begin);float qps = (float)continue_test_len / (float)time_ms * 1000;printf("total INSERTs:%d  time_used:%d(ms)  qps:%.2f(INSERTs/sec)\n", continue_test_len, time_ms, qps);// 查找性能测试gettimeofday(&tv_begin, NULL);for(int i=0; i<continue_test_len; i++){btree_node* node = btree_search_key(bT, i+1);int idx = 0;for(idx=0; idx<node->num; idx++){if(node->keys[idx] == i+1){break;}}if(idx == node->num){printf("continue_search error!\n");return 0;}}gettimeofday(&tv_end, NULL);time_ms = TIME_SUB_MS(tv_end, tv_begin);qps = (float)continue_test_len / (float)time_ms * 1000;printf("total SEARCHs:%d  time_used:%d(ms)  qps:%.2f(SEARCHs/sec)\n", continue_test_len, time_ms, qps);// // 删除性能测试// gettimeofday(&tv_begin, NULL);// for(int i=0; i<continue_test_len; i++){//     btree_delete_key(bT, i+1);// }// gettimeofday(&tv_end, NULL);// time_ms = TIME_SUB_MS(tv_end, tv_begin);// qps = (float)continue_test_len / (float)time_ms * 1000;// printf("total DELETEs:%d  time_used:%d(ms)  qps:%.2f(DELETEs/sec)\n", continue_test_len, time_ms, qps);// 销毁B树btree_destroy(bT);printf("--------------------------------------------------\n");return 0;
}#endif

测试代码输出结果----记得测一下连续读、查、删的速度

lyl@ubuntu:~/Desktop/kv-store/code_init$ gcc -o main btree_int.c 
lyl@ubuntu:~/Desktop/kv-store/code_init$ ./main
---------------------常规测试---------------------
PASS TEST PROCESS: ███████████████████ 100%
PASS----> ALL 100 TEST!
--------------------------------------------------
---------------------性能测试---------------------
total INSERTs:10000000  time_used:2379(ms)  qps:4203447.00(INSERTs/sec)
total SEARCHs:10000000  time_used:1007(ms)  qps:9930486.00(SEARCHs/sec)
total DELETEs:10000000  time_used:18(ms)  qps:555555584.00(DELETEs/sec)
--------------------------------------------------

未解决bug:测试代码的最后如果多加上一个“删除性能测试”,就不显示进度条了?很奇怪

参考内容:

  • B站:【CS61B汉化】七海讲数据结构-平衡树&B树–B树的插入–超可爱的视频
  • 知乎:从B树中删除一个关键字–主要参考了孩子元素多的时候怎么删除
  • B站:可视化数据结构-B+树–插入数据的动画很清晰

2.5 hash的实现

图7 链式哈希结构示意图——固定长度为6

  终于度过了本项目所有最难的部分,下面的内容都比较简单。链式哈希的增删查操作简洁明了。链式哈希首先会声明一个固定长度的哈希表(如1024),若需要插入新元素时,首先计算哈希值作为索引,若有冲突则直接在当前位置使用“头插法”即可。注意以下几点:

  1. 哈希值计算:对应int型,key % table_size就可以直接当作哈希值。对于char*,则可以sum(key) % table_size当作哈希值。当前也有专门的哈希函数,但是由于需要频繁计算哈希值,在简单情况下,就采用上述处理即可。
  2. 单个索引上的链表没有大小关系。所以查找/删除时需要遍历这个索引对应的整条链表

就不单独写int型的代码并测试了,可以直接参考项目源码中的“hash.h”、“hash.c”。

2.6 dhash的实现

图8 动态哈希的插入和删除过程

  显然上述hash有个很大问题,就是“哈希表的大小”是固定的。如果声明哈希表大小为1024,却要插入10w个元素,那每个所有都会对应一个很长的链表,最坏的情况下和直接遍历一遍没什么区别!这显然失去了哈希的意义,于是在上面的基础上,我们使用“空间换时间”,自动增加/缩减哈希表的大小,也就是“动态哈希表”dhash:

  1. 插入元素时,首先判断是否需要扩容。若当前元素超过哈希表的1/2(自定义),则将哈希表翻倍(自定义),并将原来的元素重新映射到新的哈希表。若遇到冲突,则将新元素顺延插入到下一个空节点
  2. 删除元素时,首先判断是否需要缩容。若当前元素小于哈希表的1/4(自定义),则将哈希表缩小一半(自定义),并将原来的元素重新映射到新的哈希表。若当前节点不是待删除元素,则需要从当前索引开始遍历完所有节点,才能说哈希表不存在此元素

同样,不单独写int型的代码测试了,可以直接参考项目源码中的“dhash.h”、“dhash.c”。

2.7 skiplist的实现

图9 理想跳表的结构示意图

  跳表本质上是一个有序链表。红黑树每次比较都能排除一半的节点,这启发我们,要是每次都能找到链表最中间的节点,不就可以实现 O ( log ⁡ N ) O(\log N) O(logN)的查找时间复杂度了嘛。于是如上图所示,我们不妨规定跳表的每个节点都有一组指针,跳表还有一个额外的空节点作为“跳表头”,那么每次都从顶层依次底层进行“跳”,就可以实现“每次比较都能排除剩下一半的节点”。但是还有个大问题,那就是上述理想跳表需要插入/删除一个元素时,元素的调整会非常麻烦,甚至还需要遍历整个链表来调整所有节点的指向!

所以在实际应用中,不会直接使用上述理想跳表的结构。而是在每次插入一个新元素时,按照一定概率计算其高度。统计学证明,当存放元素足够多的时候,该实际跳表性能无限趋近于理想跳表。

图10 实际跳表的结构示意图

同样,代码直接见项目源码中的“skiplist.h”、“skiplist.c”。

2.8 kv存储协议的实现

  如“1.2节-项目预期及基本架构”给出的“服务端程序架构”。现在我们实现了网络收发功能(网络层)、所有存储引擎的增删查改操作(引擎层),还差最后一个“kv存储协议”(协议层)就可以实现完整的服务端程序。“kv存储协议”的主要功能有:

  1. 初始化/销毁所有的存储引擎。这个直接调用各引擎的初始化/销毁函数即可。
  2. 拆解网络层接收的数据,若为有效指令则传递给相应的存储引擎函数处理,并根据存储引擎的处理结果返回相应的信息给网络层。

值得注意的是,“引擎层”的接口函数应该统一封装命名,并在各存储引擎中实现,“引擎层”的头文件中也只有这些接口函数暴露给“协议层”。这样保证了“协议层”和“引擎层”的隔离性,即使后续“引擎层”代码需要进行修改,也不会干扰到接口函数的调用、无需修改协议层。整个“服务端”的“网络层”、“协议层”、“引擎层”的函数调用关系如下:

图11 “服务端”函数调用关系

同样,代码直接见项目源码中的“kvstore.h”、“kvstore.c”。

3. 性能测试

图12 使用网络连接助手进行简单的功能验证

  上述我们将“服务端”的代码实现完毕,并且可以使用“网络连接助手”进行正常的收发数据。如上图所示,依次发送5条指令后都得到预期的回复。但是我们要想测试客户端的极限性能,显然需要写一个“客户端”测试程序。该测试程序目标如下:

  1. 测试单个键值对(name:humu)能否正常实现所有功能。
  2. 测试多个键值对能否正常实现所有功能。这一步主要是为了验证各引擎能正常扩容/缩容。
  3. 测试连续进行10w次插入、查找、删除的总耗时,计算出服务器的相应速度qps。

注:测试代码直接见项目源码中的“tb_kvstore.c”。

如下图所示,开启两个Ubuntu虚拟机,分别运行“服务端”、“客户端”程序,得到如下的测试数据:

图13 性能测试结果
表2 连续5次的性能测试结果
次数操作qps(trans/s)
arrayrbtreebtreehashdhashskiplist
1插入1461.33 1918.24 1981.65 1727.24 1924.93 1778.44
查找1604.18 1906.07 1895.09 1741.52 1887.68 1755.71
删除1722.71 1959.13 1909.60 1713.88 1964.98 1887.65
2插入1570.13 1972.93 1949.62 1710.54 1926.63 1780.18
查找1707.30 1970.40 1883.49 1709.46 2021.55 1747.89
删除1739.92 1918.80 1922.82 1730.28 1917.32 1912.74
3插入1103.35 1898.25 1923.71 1737.59 1948.86 1794.24
查找1316.59 1867.80 1933.38 1733.31 1953.74 1768.44
删除1774.78 1873.89 1978.98 1771.73 1966.76 1933.19
4插入1134.22 1887.79 1888.72 1734.21 1909.53 1802.00
查找1394.54 1908.80 1910.40 1726.43 1932.37 1764.76
删除1778.54 1897.97 1937.98 1719.25 1935.40 1928.57
5插入1029.79 1981.02 1912.16 1718.33 1916.52 1777.62
查找1236.52 1917.58 1947.00 1732.17 1934.39 1754.66
删除1703.87 1955.19 1899.59 1724.97 1915.42 1905.71

结果分析

  1. array引擎的指标都是最差的,这是因为每次插入/删除都需要遍历所有元素。
  2. hash引擎(链式哈希)的性能是倒数第二差的,这是因为虽然哈希函数可以快速定位到索引,但当键值对远大于哈希表大小时,冲突元素会形成一个链表,增/删/查操作也需要遍历整条链表。
  3. btree的插入性能好于rbtree,只是因为btree的单个节点可以容纳多个元素,节省了很多增加新结点、调整结构的时间;但是btree的查找/删除性能不如rbtree,也同样是因为单个节点还需要进行遍历。
  4. 综合来看,最优秀的存储引擎是rbtree。

注:上述测试结果可以通过“相对值”比较各数据结构间的差异;“绝对值”则受限于虚拟机内存、运行频率等物理特性,并且软件上使用“协程”等也可以大幅提升“绝对值”。

4. 项目总结及改进思路

  C/C++适合做服务器,但不适合做业务。因为可能会因为一行代码有问题,导致整个进程退出。虽然也能做,但维护成本高,并且对工程师要求高。比如“腾讯课堂”中课程、图片、价格等参数很适合用C语言做“kv存储”,但是显示网页等业务功能使用Jc语言更加合适。所以VUE框架(Java)等适合做前端业务;C/C++适合做基础架构、高性能组件、中间件。比如在量化交易中,底层的高频组件、低延迟组件适合用C/C++,上层的交易业务、交易策略没必要C/C++。

下面是对本项目的一些 改进思路:

  • 持久化和日志:将数据都存储到(磁盘)文件中,并将接收到的指令记录下来成为日志。持久化的思路有很多种,比如直接分配一个大的“内存池”(见下一条)存放所有引擎的存储数据,然后直接对这个大的内存池进行持久化。
  • 内存池:现在存储的数据都是零散的,比如rbtree每次插入新元素都会申请一个节点内存,时间长了就会出现很多内存碎片,不利于内存管理。理想状态是和数组结构体一样,直接预先声明一个内存池,需要新节点的时候就取出一个节点内存。这样,每次申请和释放的内存都是一整块,不会出现大量的内存碎片。
  • 分布式锁:kv存储内部是单线程的,对于变量的改变无需额外加锁。但是外部的网络收发指令,需要引入分布式锁,防止多台客户端同时对同一个键值对进行修改。
  • 主从模式:所有数据都存储在一台服务器上,会有丢失数据的风险,所以需要使用“主从模式”来对数据进行备份。关键点在于合适的备份策略。
  • 分布式存储:所有数据都存储在一台服务器上,会导致内存压力过大,多台服务器结合起来做成“分布式存储”,所有服务器分摊数据。

编程感想:

  1. 字符串拷贝:C语言中,使用strncpysnprintf拷贝字符串时,注意目的字符串不能只是声明为char*,而是需要malloc/calloc分配内存才可以。另外也不要忘了释放内存free
  2. 天坑:解析指令时层层传递的是epoll的读缓冲rbuffer,然后使用strtok/strtok_r进行分割指令并存储在char* tokens[]中,注意这个char* tokens[]的元素指向的就是读缓冲本身!!!而snprintf是逐字符进行拷贝的,也就是说,此时使用snprintftokens的内容再写回读缓冲就会导致读缓冲错乱。如果不额外分配内存很难解决该问题。所以建议不要使用snprintf拷贝自己的格式化字符串。
  3. 良好的内存管理习惯:free()之前先判断是否为NULLfree()之后一定要指向NULL
  4. 层层传递初始化:

【可行方法1】如果最顶层需要创建实例对象(不如全局变量),那就需要传地址给最底层,且最底层无需再重新为这个对象malloc空间(因为最顶层已经创建对象了),只需要malloc好这个实例对象的所有指针即可,或者先定义成NULL后期插入时再分配。
【可行方法2】若最顶层无需创建全局的实例对象,那么也可以不传参数给最底层,最底层直接创建一个对象指针,并malloc/NULL这个对象指针的所有参数,最后直接返回这个对象指针就行。
【不可行方法】顶层创建了全局的实例对象,然后传地址给最底层,最底层重新malloc一个新的对象指针,初始化这个对象指针的所有参数,最后让传递下来的地址指向这个指针。最后在顶层就会发现所有参数都没初始化,都是空!
【关键点】:对谁进行malloc非常重要,一定要对顶层传下来的结构体指针的元素直接malloc,而不是malloc一个新的结构体,在赋值给这个结构体指针。

  1. 关于strcmp():在使用strcmp()时一定要先判断不为空,才能使用。这是因为strcmp()的底层使用while(*des++==*src**),所以若比较的双方有一方为空,就会直接报错。
  2. 注意项目的include关系,是在编译指令中指定的。当然也可以将“kv_store.c”中的case语句封装到各自的数据结构中,然后以动态库的方式进行编译。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/719143.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

XV4001KC数字输出 车载用(piezoman)

EPSON的XV4001KC角速度传感器是为满足汽车行业对高精度和高可靠性需求而设计的。它不仅提供了高级的运动监测特性&#xff0c;高精度的角速度测量和温度监测功能&#xff0c;而且其紧凑的设计6.04.83.3mm尺寸对于空间受限的车载环境来说&#xff0c;是一大优势&#xff0c;使得…

政务浏览器——打通信创闭环最后一公里

当前&#xff0c;信创建设工作主要集中在芯片、操作系统、数据库以及pc整机&#xff0c;这些领域基本可用&#xff0c;或者达到了市场主流水平。但是&#xff0c;政务办事场景下的信创落地仍然困难重重&#xff0c;很多地方不得不装双系统或买两台设备来来平衡日常业务和信创考…

STM32CubeIDE基础学习-软件安装,环境搭建

STM32CubeIDE基础学习-软件介绍及环境搭建步骤 文章目录 STM32CubeIDE基础学习-软件介绍及环境搭建步骤前言第1章 STM32CubeIDE 介绍1.1 软件描述1.2 软件支持的功能及特点 第2章 STM32CubeIDE 软件安装2.1 STM32CubeIDE 软件获取方法2.2 STM32CubeIDE 软件安装步骤2.2.1 错误安…

C++模板完整版

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C从入门到精通》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 一、泛型编程 如何实现一个通用的交换函数呢&#xff1f; void Swap(int& left, int& right) {int temp left…

抖店入驻费用是多少?新手入驻都有哪些要求?2024费用明细!

我是电商珠珠 我做电商做了将近五年&#xff0c;做抖店做了三年多&#xff0c;期间还带着学员一起做店。 今天&#xff0c;就来给大家详细的讲一下在抖音开店&#xff0c;需要多少费用&#xff0c;最低需要投入多少。 1、营业执照200元左右 就拿个体店举例&#xff0c;在入…

hook函数——useReducer

目录 1.useReducer定义2.useReducer用法3.useState和useReducer区别 1.useReducer定义 const [state, dispatch] useReducer(reducer, initialArg, init?) reducer&#xff1a;用于更新 state 的纯函数。参数为 state 和 action&#xff0c;返回值是更新后的 state。state …

这波操作看麻了!十亿行数据,从71s到1.7s的优化之路。

节期间关注到了一个关于 Java 方面的比赛&#xff0c;很有意思。由于是开源的&#xff0c;我把项目拉下来试图学&#xff08;白&#xff09;习&#xff08;嫖&#xff09;别人的做题思路&#xff0c;在这期间一度让我产生了一个自我怀疑&#xff1a; 他们写的 Java 和我会的 Ja…

每日一题——LeetCode1576.替换所有的问号

方法一 3个字母原则 把&#xff1f;替换为和他左右都不相等的字符&#xff0c;那么找3个字符abc&#xff0c;&#xff1f;总能替换为abc中的一个字符&#xff0c;遍历字符串找到所有&#xff1f;&#xff0c;再遍历abc把&#xff1f;替换为abc中的一个字符 var modifyString …

解析 openGauss 的 AutoVacuum 机制及优化策略

前言 在 openGauss 数据库中&#xff0c;AutoVacuum 机制是一个关键的自动化功能&#xff0c;用于管理表的空间和性能。AutoVacuum 通过定期清理过时数据和更新统计信息&#xff0c;帮助数据库管理员维护数据库的性能和稳定性。 为什么需要 AutoVacuum&#xff1f; 了解AutoV…

JAVA内存模型与JVM内存结构

注意区分Java内存模型&#xff08;Java Memory Model&#xff0c;简称JMM&#xff09;与Jvm内存结构&#xff0c;前者与多线程相关&#xff0c;后者与JVM内部存储相关。本文会对两者进行简单介绍。 一、JAVA内存模型(JMM) 1. 概念 说来话长&#xff0c;由于在不同硬件厂商和…

No matching version found for @babel/traverse@^7.24.0.

问题&#xff1a; npm安装 依赖失败&#xff0c;找不到所需依赖。 原因&#xff1a; npm镜像源中没有该依赖。&#xff08;大概率是因为依赖最近刚更新&#xff0c;当前镜像源没有同步&#xff09; 解决&#xff1a; 查看自己的npm镜像&#xff1a;npm config get registry…

机器学习-面经(part2)

3. 验证方式 3.1什么是过拟合?产生过拟合原因? 定义:指模型在训练集上的效果很好,在测试集上的预测效果很差 数据有噪声 训练数据不足,有限的训练数据 训练模型过度导致模型非常复杂3.2 如何避免过拟合问题? 3.3 什么是机器学习的欠拟合?产生原…

D4890可应用在对讲机上,采用 SOP8/MSOP8两种封装形式

D4890 目前客户主要使用在对讲机上&#xff0c;电压范围2.2V &#xff5e; 5.5V之间&#xff0c;输出功率&#xff08;THDN1%&#xff09;1.0W/8Ω 5.0V。采用 SOP8/MSOP8两种封装形式。 2、推荐的应用线路图如下&#xff1a; 3、实际测试输出波形如下&#xff08;VCC4.5V&…

解决android studio build Output中文乱码

1.效果如下所示&#xff1a; 代码运行报错的时候&#xff0c;Build Output报的错误日志中中文部分出现乱码&#xff0c;导致看不到到底报的什么错。 2.解决办法如下&#xff1a; 点击Android studio开发工具栏的Help-Edit Custom VM Options....&#xff0c;Android studio会…

AutoGPT实现原理

AutoGPT是一种利用GPT-4模型的自动化任务处理系统&#xff0c;其主要特点包括任务分配、多模型协作、互联网访问和文件读写能力以及上下文联动记忆性。其核心思想是通过零样本学习&#xff08;Zero Shot Learning&#xff09;让GPT-4理解人类设定的角色和目标&#xff0c;并通过…

端口号被占用时的解决办法

1、查看端口占用的进程号 netstat -ano |findstr 8080 2、 找到占用端口的程序 tasklist |findstr 2264 3、kill端口 taskkill /pid 2264 /f

文物预防性保护方案整体结构及软件介绍

​文物预防性保护监测与调控系统整体是构架在商业级技术平台上的多层综合性应用,采用分布式部署的模块化设计,以智能监测终端及高精传感器为核心的感知系统。系统通过以下的层次结构协同工作完成全面的监控与调控功能&#xff1a; 1)系统依靠文物监测调控模型作为运行核心&…

基于springboot+vue的校园爱心捐赠互助管理系统(源码+论文)

目录 前言 一、功能设计 二、功能实现 三、库表设计 四、论文 前言 随着经济水平和生活水平的提高在校大学生在校需要处理的物品也在不断增加&#xff0c;同时校园内还存在很多贫困生&#xff0c;可以通过线上平台实现资源的整合和二次利用&#xff0c;通过线上平台求助信…

护眼灯有效果吗怎么样?推荐五款值得入手的护眼台灯

随着护眼台灯被越来越多的人解锁新的护眼攻略&#xff0c;它的产品热度也越来越高&#xff0c;而且光线柔和&#xff0c;是一款非常不错的照明用具。但是也有不少用户反馈买到的护眼台灯效果不好&#xff0c;有时候还会觉得刺眼&#xff0c;有些不合格的台灯使用时间一久还会散…

动态IP代理技术在网络爬虫中的实际使用

目录 一、动态IP代理技术概述 二、动态IP代理技术的优势 三、动态IP代理技术的实际应用 四、注意事项 五、案例分析 六、结论 随着互联网的迅猛发展&#xff0c;网络爬虫成为了获取信息、分析数据的重要工具。然而&#xff0c;在进行大规模爬取时&#xff0c;爬虫常常面临…