hadoop streaming编程小demo(python版)

大数据团队搞数据质量评测。自动化质检和监控平台是用django,MR也是通过python实现的。(后来发现有orc压缩问题,python不知道怎么解决,正在改成java版本)

这里展示一个python编写MR的例子吧。

抄一句话:Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer。

 

1、首先,先介绍一下背景,我们的数据是存放在hive里的。hive建表语句如下:

我们将会解析元数据,和HDFS上的数据进行merge,方便处理。这里的partition_key用的是year/month/day。

hive (gulfstream_ods)> desc g_order;
OK
col_name        data_type       comment
order_id                bigint                  订单id                
driver_id               bigint                  司机id,司机抢单前该值为0      
driver_phone            string                  司机电话                
passenger_id            bigint                  乘客id                
passenger_phone         string                  乘客电话                
car_id                  int                     接驾车辆id              
area                    int                     城市id                
district                string                  城市区号                
type                    int                     订单时效,0 实时  1预约      
current_lng             decimal(19,6)           乘客发单时的经度            
current_lat             decimal(19,6)           乘客发单时的纬度            
starting_name           string                  起点名称                
starting_lng            decimal(19,6)           起点经度                
starting_lat            decimal(19,6)           起点纬度                
dest_name               string                  终点名称                
dest_lng                decimal(19,6)           终点经度                
dest_lat                decimal(19,6)           终点纬度                
driver_start_distance   int                     司机与出发地的路面距离,单位:米    
start_dest_distance     int                     出发地与终点的路面距离,单位:米    
departure_time          string                  出发时间(预约单的预约时间,实时单为发单时间)
strive_time             string                  抢单成功时间              
consult_time            string                  协商时间                
arrive_time             string                  司机点击‘我已到达’的时间       
setoncar_time           string                  上车时间(暂时不用)          
begin_charge_time       string                  司机点机‘开始计费’的时间       
finish_time             string                  完成时间                
year                    string                                      
month                   string                                      
day                     string                                      # Partition Information          
# col_name              data_type               comment             year                    string                                      
month                   string                                      
day                     string              

 

2、我们解析元数据

这里是解析元数据的过程。之后我们把元数据序列化后存入文件desc.gulfstream_ods.g_order,我们将会将此配置文件连同MR脚本一起上传到hadoop集群。

import subprocess
from subprocess import Popendef desc_table(db, table):process = Popen('hive -e "desc %s.%s"' % (db, table),shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)stdout, stderr = process.communicate()is_column = Truestructure_list = list()column_list = list()for line in stdout.split('\n'):value_list = list()if not line or len(line.split()) < 2:breakif is_column:column_list = line.split()is_column = Falsecontinueelse:value_list = line.split()structure_dict = dict(zip(column_list, value_list))structure_list.append(structure_dict)return structure_list

 

3、下面是hadoop streaming执行脚本。

#!/bin/bash
source /etc/profile
source ~/.bash_profile

#hadoop目录
echo "HADOOP_HOME: "$HADOOP_HOME
HADOOP="$HADOOP_HOME/bin/hadoop"

DB=$1
TABLE=$2
YEAR=$3
MONTH=$4
DAY=$5
echo $DB--$TABLE--$YEAR--$MONTH--$DAY

if [ "$DB" = "gulfstream_ods" ]
then
DB_NAME="gulfstream"
else
DB_NAME=$DB
fi
TABLE_NAME=$TABLE

#输入路径
input_path="/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*"
#标记文件后缀名
input_mark="_SUCCESS"
echo $input_path
#输出路径
output_path="/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY"
output_mark="_SUCCESS"
echo $output_path
#性能约束参数
capacity_mapper=500
capacity_reducer=200
map_num=10
reducer_num=10
queue_name="root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop"
#启动job name
job_name="DW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}"
mapper="python mapper.py $DB $TABLE_NAME"
reducer="python reducer.py"

$HADOOP fs -rmr $output_path
$HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
-jobconf mapred.job.name="$job_name" \
-jobconf mapred.job.queue.name=$queue_name \
-jobconf mapred.map.tasks=$map_num \
-jobconf mapred.reduce.tasks=$reducer_num \
-jobconf mapred.map.capacity=$capacity_mapper \
-jobconf mapred.reduce.capacity=$capacity_reducer \
-input $input_path \
-output $output_path \
-file ./mapper.py \
-file ./reducer.py \
-file ./utils.py \
-file ./"desc.${DB}.${TABLE_NAME}" \
-mapper "$mapper" \
-reducer "$reducer"
if [ $? -ne 0 ]; then
echo "$DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faild"
fi
$HADOOP fs -touchz "${output_path}/$output_mark"
rm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}
$HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}

 

 4、这里是Wordcount的进阶版本,第一个功能是分区域统计订单量,第二个功能是在一天中分时段统计订单量。

mapper脚本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')# 将字段和元数据匹配, 返回迭代器
def read_from_input(file, separator, columns):for line in file:if line is None or line == '':continuedata_list = mapper_input(line, separator)if not data_list:continueitem = None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) == len(columns) - 3:item = dict(zip(columns, data_list))elif len(data_list) == len(columns):item = dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open('desc.%s.%s' % (db, table), 'r') as fr:structure_list = deserialize(fr.read())return [column.get('col_name') for column in structure_list]# map入口
def main(separator, columns):items = read_from_input(sys.stdin, separator, columns)mapper_result = {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)

def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey = 'route1'area = item.get('area')district = item.get('district')order_id = item.get('order_id')if not area or not district or not order_id:returnmapper_output(key, {'area': area, 'district': district, 'order_id': order_id, 'count': 1})def mapper_plugin_2(item, mapper_result):key = 'route2'strive_time = item.get('strive_time')order_id = item.get('order_id')if not strive_time or not order_id:returntry:day_hour = strive_time.split(':')[0]mapper_output(key, {'order_id': order_id, 'strive_time': strive_time, 'count': 1, 'day_hour': day_hour})except Exception, ex:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator='\t'):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s%s%s' % (key, separator, data)# print >> sys.stderr, '%s%s%s' % (key, separator, data)if __name__ == '__main__':db = sys.argv[1]table = sys.argv[2]columns = index_columns(db, table)main('||', columns)

reducer脚本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import json
import pickle
from itertools import groupby
from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator='\t'):reducer_result = {}line_list = read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key == 'route1':reducer_plugin_1(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])if route_key == 'route2':reducer_plugin_2(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])

def reducer_plugin_1(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif not data.get('area') or not data.get('district') or not data.get('count'):continuekey = '_'.join([data.get('area'), data.get('district')])reducer_result[route_key].setdefault(key, 0)reducer_result[route_key][key] += int(data.get('count'))# print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])def reducer_plugin_2(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif not data.get('order_id') or not data.get('strive_time') or not data.get('count') or not data.get('day_hour'):continuekey = data.get('day_hour')reducer_result[route_key].setdefault(key, {})reducer_result[route_key][key].setdefault('count', 0)reducer_result[route_key][key].setdefault('order_list', [])reducer_result[route_key][key]['count'] += int(data.get('count'))if len(reducer_result[route_key][key]['order_list']) < 100:reducer_result[route_key][key]['order_list'].append(data.get('order_id'))# print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])
def serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator='\t'):data_list = data.strip().split(separator, 2)key = data_list[0]data = deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s\t%s' % (key, data)# print >> sys.stderr, '%s\t%s' % (key, data)if __name__ == '__main__':main()

 

5、上一个版本,遭遇了reduce慢的情况,原因有两个:一是因为route的设置,所有相同的route都将分发到同一个reducer,造成单个reducer处理压力大,性能下降。二是因为集群是搭建在虚拟机上的,性能本身就差。可以对这个问题进行改进。改进版本如下,方案是在mapper阶段先对数据进行初步的统计,缓解reducer的计算压力。

mapper脚本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')# 将字段和元数据匹配, 返回迭代器
def read_from_input(file, separator, columns):for line in file:if line is None or line == '':continuedata_list = mapper_input(line, separator)if not data_list:continueitem = None# 最后3列, 年月日作为partitionkey, 无用if len(data_list) == len(columns) - 3:item = dict(zip(columns, data_list))elif len(data_list) == len(columns):item = dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open('desc.%s.%s' % (db, table), 'r') as fr:structure_list = deserialize(fr.read())return [column.get('col_name') for column in structure_list]# map入口
def main(separator, columns):items = read_from_input(sys.stdin, separator, columns)mapper_result = {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)for route_key, route_value in mapper_result.iteritems():for key, value in route_value.iteritems():ret_dict = dict()ret_dict['route_key'] = route_keyret_dict['key'] = keyret_dict.update(value)mapper_output('route_total', ret_dict)def mapper_plugin_1(item, mapper_result):# key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducerkey = 'route1'area = item.get('area')district = item.get('district')order_id = item.get('order_id')if not area or not district or not order_id:returntry:# total统计
        mapper_result.setdefault(key, {})mapper_result[key].setdefault('_'.join([area, district]), {})mapper_result[key]['_'.join([area, district])].setdefault('count', 0)mapper_result[key]['_'.join([area, district])].setdefault('order_id', [])mapper_result[key]['_'.join([area, district])]['count'] += 1if len(mapper_result[key]['_'.join([area, district])]['order_id']) < 10:mapper_result[key]['_'.join([area, district])]['order_id'].append(order_id)except Exception, ex:passdef mapper_plugin_2(item, mapper_result):key = 'route2'strive_time = item.get('strive_time')order_id = item.get('order_id')if not strive_time or not order_id:returntry:day_hour = strive_time.split(':')[0]# total统计
        mapper_result.setdefault(key, {})mapper_result[key].setdefault(day_hour, {})mapper_result[key][day_hour].setdefault('count', 0)mapper_result[key][day_hour].setdefault('order_id', [])mapper_result[key][day_hour]['count'] += 1if len(mapper_result[key][day_hour]['order_id']) < 10:mapper_result[key][day_hour]['order_id'].append(order_id)except Exception, ex:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator='\t'):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s%s%s' % (key, separator, data)# print >> sys.stderr, '%s%s%s' % (key, separator, data)if __name__ == '__main__':db = sys.argv[1]table = sys.argv[2]columns = index_columns(db, table)main('||', columns)

reducer脚本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import json
import pickle
from itertools import groupby
from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator='\t'):reducer_result = {}line_list = read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key == 'route_total':reducer_total(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])def reducer_total(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif data.get('route_key') == 'route1':reducer_result[route_key].setdefault(data.get('route_key'), {})reducer_result[route_key][data.get('key')].setdefault('count', 0)reducer_result[route_key][data.get('key')].setdefault('order_id', [])reducer_result[route_key][data.get('key')]['count'] += data.get('count')for order_id in data.get('order_id'):if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:reducer_result[route_key][data.get('key')]['order_id'].append(order_id)elif data.get('route_key') == 'route2':reducer_result[route_key].setdefault(data.get('route_key'), {})reducer_result[route_key][data.get('key')].setdefault('count', 0)reducer_result[route_key][data.get('key')].setdefault('order_id', [])reducer_result[route_key][data.get('key')]['count'] += data.get('count')for order_id in data.get('order_id'):if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:reducer_result[route_key][data.get('key')]['order_id'].append(order_id)else:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator='\t'):data_list = data.strip().split(separator, 2)key = data_list[0]data = deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s\t%s' % (key, data)# print >> sys.stderr, '%s\t%s' % (key, data)if __name__ == '__main__':main()

 

遇到的问题:

1、The DiskSpace /user/bigdata/qa quota of  is exceeded

在reducer结束后,遭遇如上问题,是因为HDFS  路径下的disk容量已经被沾满,释放容量即可;

 

转载于:https://www.cnblogs.com/kangoroo/p/6151104.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/255780.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Asp.net在IE10、IE11下事件丢失经验总结

asp.net4.0出生得比IE10早&#xff0c;所以asp.net4.0以前版本不认识IE10 的 User-Agent 标头&#xff0c;导致的后果就是ASP.NET 特定功能失效&#xff0c;例如&#xff1a;页面报错__doPostBack找不到&#xff0c;不支援 Cookies 功能等等。这属于.net的Bug&#xff0c;微软也…

第6章 循环结构

循环语句: 可以让一部分代码,反复执行 1.1 循环语句while while循环: 编写格式:while(条件){ 循环体 } 条件: 当条件是true,就执行循环体,执行完循环体后 程序再次执行while中的条件,如果条件还是true,继续执行循环体 直到条件是false的时候,循环就结束 public class WhileDem…

【深度学习】——pytorch搭建模型及相关模型

目录 1、搭建模型的流程 1&#xff09;步骤 2&#xff09;完整代码——手写minist数据集为例&#xff08;这里使用的数据集是自带的&#xff09; 2、搭建模型的四种方法 1&#xff09;方法一——利用nn.Sequential&#xff08;&#xff09; 2&#xff09;方法二——利用co…

ABB robot 与 Fronius 设备 IO

ABB robot 与 Fronius 设备 IO

初次使用cocoapods注意事项

在仅仅用cocoapods时可能会遇到各种各样的错误和问题 这里中总结下: 1.首先使用cocoapods有非常多优点,在github上非常多优秀的开源项目都用到了它;假设你不会使用它,那么非常多优秀的开源项目你下载下来了也发现跑不起来,假设发现有Profile,Profile.lock,Pods等cocoapods相关…

MongoDB复制集技术

为什么使用MongogDB复制集技术? mysql中:一主一从&#xff0c;一主多从结构存在的问题 1、 fileover&#xff08;故障转移&#xff09;a) 选主投票b) 切换 2、 是否对就用透明化 3、 数据补偿的问题a) 两阶段数据补偿 4、 解决方法 mysql中使用MHAVIP b…

Linux文件系统的实现 (图文并茂,比较好)

作者&#xff1a;Vamei 出处&#xff1a;http://www.cnblogs.com/vamei 欢迎转载&#xff0c;也请保留这段声明。谢谢&#xff01; Linux文件管理从用户的层面介绍了Linux管理文件的方式。Linux有一个树状结构来组织文件。树的顶端为根目录(/)&#xff0c;节点为目录&#xff0…

【深度学习】——如何处理输入图像大小不一样的情况

这里一般有常见的几种方法&#xff1a; 1&#xff09;将图像缩放成大小一致后再输入&#xff0c;如RCNN算法 2&#xff09;roi pooling&#xff1a;这里允许输入图像的大小不一样&#xff0c;后续根据指定的固定大小来求解池化的核大小&#xff0c;以此来得到相同大小的特征图&…

ROS探索总结(一)——ROS简介

随着机器人领域的快速发展和复杂化&#xff0c;代码的复用性和模块化的需求原来越强烈&#xff0c;而已有的开源机器人系统又不能很好的适应需求。2010年Willow Garage公司发布了开源机器人操作系统ROS&#xff08;robot operating system&#xff09;&#xff0c;很快在机器人…

微信浏览器取消缓存的方法

摘要:做微信公家号以及调试手机页面的时辰&#xff0c;防止不了页面要跳转到微信阅读器打开&#xff0c;调试阶段&#xff0c;android版微信阅读器一直都默许缓存html静态资本&#xff0c;每一次静态资本变革乃至新内容发布的时辰在微信阅读器上都极有可能不克不及更新&#xf…

【机器视觉】——裂纹检测笔记

目录 传统算法处理裂缝的基本思路&#xff1a; 第一种思路 第二种思路&#xff1a; 第三种思路 CPP代码 halcon代码 python代码 Matlab代码 深度学习缺陷检测 裂缝检测文献 传统算法处理裂缝的基本思路&#xff1a; 第一种思路 1.先转换彩色图为灰度图 2.进行自适应…

利用union判断系统的大小端

int checkCPUendian()//返回1&#xff0c;为小端&#xff1b;反之&#xff0c;为大端&#xff1b; { union{ unsigned int a; unsigned char b; }c; c.a 1; return 1 c.b; }大端模式(Big-endian)&#xff0c;是指数据的高字节保存在内存的低地址中&#xff0c;而数据…

Filter(过滤器) 和 interceptor(拦截器)的区别

Filter&#xff08;过滤器&#xff09; 和 interceptor&#xff08;拦截器&#xff09;的区别 1.拦截器是基于java反射机制的&#xff0c;而过滤器是基于函数回调的。 2.过滤器依赖于Servlet容器&#xff0c;而拦截器不依赖于Servlet容器。 3.拦截器只对Action请求起作用&#…

ROS探索总结(二)——ROS总体框架

一、 总体结构 根据ROS系统代码的维护者和分布来标示&#xff0c;主要有两大部分&#xff1a;&#xff08;1&#xff09;main&#xff1a;核心部分&#xff0c;主要由Willow Garage公司和一些开发者设计、提供以及维护。它提供了一些分布式计算的基本工具&#xff0c;以及整个…

python 阿狸的进阶之路(4)

装饰器 #1、开放封闭原则&#xff1a;对扩展开放&#xff0c;对修改是封闭#2、装饰器&#xff1a;装饰它人的&#xff0c;器指的是任意可调用对象&#xff0c;现在的场景装饰器-》函数&#xff0c;被装饰的对象也是-》函数#原则&#xff1a;1、不修改被装饰对象的源代码 2、不修…

【深度学习】——利用pytorch搭建一个完整的深度学习项目(构建模型、加载数据集、参数配置、训练、模型保存、预测)

目录 一、深度学习项目的基本构成 二、实战&#xff08;猫狗分类&#xff09; 1、数据集下载 2、dataset.py文件 3、model.py 4、config.py 5、predict.py 一、深度学习项目的基本构成 一个深度学习模型一般包含以下几个文件&#xff1a; datasets文件夹&#xff1a;存放…

二叉树的序遍历

时间限制: 1 s空间限制: 32000 KB题目等级 : 白银 Silver题目描述 Description求一棵二叉树的前序遍历&#xff0c;中序遍历和后序遍历 输入描述 Input Description第一行一个整数n&#xff0c;表示这棵树的节点个数。 接下来n行每行2个整数L和R。第i行的两个整数Li和Ri代表编号…

GUI登录界面

在这次的作业中&#xff0c;我先使用单选按钮&#xff0c;输入框&#xff0c;复选框设计了一个简单地登录界面。接着我使用了MouseListener将登陆按钮与下一个“查询界面”连接起来。最后我使用了我们本周所学的JFrame框架与事件处理机制设计了一个简单地界面。我所设计的登录界…

浅谈ROS操作系统及其应用趋势

ROS操作系统是最先由斯坦福开发的开源机器人操作系统&#xff0c;目前由willowgarage公司开发和维护&#xff0c;相关的开发社区也很成熟&#xff08; http://www.ros.org &#xff0c; http://answers.ros.org, http://www.willowgarage.com), 经过几年的发展API也逐渐稳定&a…

Raft学习传送门

Raft官网 官方可视化动画1 官方可视化动画2 论文中文翻译 论文英文地址 Paxos Made Simple论文翻译 Raft理解 技术分享 《分布式一致性raft算法实现原理》 状态机 MIT&#xff1a; raft实现 分布式系统学习2-Raft算法分析与实现 分布式系统MIT 6.824学习资源 知乎大神的&#…