极简的MapReduce实现

目录

1. MapReduce概述

2. 极简MapReduce内存版

3. 复杂MapReduce磁盘版

4. MapReduce思想的总结


1. MapReduce概述

       以前写过一篇 MapReduce思想 ,这次再深入一点,简单实现一把单机内存的。MapReduce就是把它理解成高阶函数,需要传入map和reduce所需的函数,即可对一个集合的键值对进行按需变换。

我们按Python的map和reduce逻辑,接收这两个高阶函数所需的函数形态,而不是像Java那样写一个Mapper/Reducer,当然如果需要复杂一点map功能,可以通过闭包来实现。

2. 极简MapReduce内存版

       如果是效仿Python,很容易想到实现MapReduce 就是map + 分组 + reduce。一头一尾的map和reduce都可以直接用py的内置函数,中间分组也容易用groupby实现。因此首先约定map和reduce的自定义函数写法就是直接按内置函数要求来写,只不过map输出是键值对,要求用tuple/list 来区分key和value;reduce时候只对一组的values做自定义聚合,不能操作key。到这里很容易实现严格的MapReduce,你用起来大概就和Spark提供的mapreduce一样了。如果想直接做wordcount好像不太容易,因为一行文本得输出一个词表数组,因此可以宽松一点,实现类似flatMap的效果,也就是即使map只输出一个,也要放进列表里,由框架去展开。

def wc_map(line):return [(x,1) for x in line.strip().split()]

reduce就是严格的两个参数和一个返回值形式(其中y表示上一轮迭代的结果,x表示每次便利数组拿到的值,这种严格的reduce函数并容易设计)

def wc_reduce(y, x):return y+x

剩下要做的事就是把map出来的数据做 拉平和排序分组,最后对每一组做reduce。

from itertools import groupby
from functools import reduce
def map_reduce(f_map, f_reduce, seq):map_result = map(f_map, seq)flattened = sum(map_result, []) #拉平shuffled = sorted(flattened, key= lambda x:x[0]) #排序mr_result = [(key, reduce(f_reduce, (x[1] for x in data))) for key, data in groupby( shuffled, key=lambda x:x[0])] #groupby分组return mr_result

3. 复杂MapReduce磁盘版

如果内存不够,就必须不断把数据写到磁盘上,也就是过去大数据面试题最喜欢考察的点。那些题若是有单机版MapReduce,基本都可以直接做出来。大体结构如下图:

其中partition只有一个,就不用单独设计了。过程也简化很多,也就是每当buffer满了,就排序写出到成一个数据块,最后将所有的数据块merge起来。merge过程写起来不是很容易,需要维持所有数据块的句柄数组有序:每次读取所有句柄中最小的那一条,随后数组需要重新按最小值排序,直到所有数据块读取完。

另外就是因为中间数据需要落地,所以这一版的MapReduce框架直接从文件到文件。那么让写map和reduce函数直接对着文件输出,类似Java那样提供一个context的参数呢?我选择允许用yield来输出内容,这样相当于也宽松了输出多个键值对的要求。

def wc_map(line):for t in line.strip().split():yield (t, 1)

最后同样宽松reduce的写法,不用写那么复杂的两参数一返回,而是像Java那样,拿到一组key和可迭代的values,甚至可以这一组key输出多个结果。

def reduce_func(key , kv_pairs):s = 0for k,v in kv_pairs :   s += vyield (key, s)

很显然我们要补充很多东西,要读取文件、要设计缓存、要做merge,因此设计了一个类,过程就不解释了

from operator import itemgetter
import itertools
import sys
class MapReduce:class Buffer:def __init__(self, buffer_size = 80000):self.buffer_size = buffer_sizeself.buffer = []self.current_size = 0def add_data(self, kv) -> bool :self.current_size += len(str(kv))self.buffer.append(kv)return True if self.current_size  > self.buffer_size else False          def spill(self, file_path):if self.buffer:self.buffer.sort(key = lambda x:x[0]) with open(file_path, mode='w') as f:for kv in self.buffer:f.write(str(kv) + '\n')self.buffer = []self.current_size = 0def __init__(self, buffer_size,temp_dir):self.buffer = self.Buffer(buffer_size)self.temp_dir = temp_dirself.block_id = 0def __fetch_kvs_from_map(self, map_f, infile):with open(infile, mode='r', encoding='utf-8') as f:for line in f:for k,v in map_f(line):yield (k,v)def __run_map(self, map_f, infile):block_files = [x for x in os.listdir(self.temp_dir) if x.startswith('block.') and x.split('.',1)[1].isdigit() ]for block in block_files:os.remove(f'{self.temp_dir}/{block}')for kv in self.__fetch_kvs_from_map(map_f, infile):if self.buffer.add_data(kv):self.buffer.spill(f'{self.temp_dir}/block.{self.block_id}')self.block_id += 1self.buffer.spill(f'{self.temp_dir}/block.{self.block_id}')def __merge_sort_from_files(self):block_files = [x for x in os.listdir(self.temp_dir) if x.startswith('block.') and x.split('.',1)[1].isdigit() ]block_files.sort(key = lambda filename: int(filename.split('.',1)[1]))ffs = [open(f'{self.temp_dir}/{block}', 'r') for block in block_files]first_kvs = [ eval(f.readline()) for f in ffs ]shuffled_files = [[fk, ff]  for fk, ff in  zip(first_kvs, ffs)]shuffled_files.sort(key = lambda x:x[0][0], reverse=True) with open(f'{self.temp_dir}/final_one.dat', mode='w') as fw:while len(shuffled_files ) > 1:first_keys = [x[0][0] for x in shuffled_files]min_key = first_keys[-1]min_idx_bound = first_keys.index(min_key)sffs = shuffled_files[min_idx_bound:]for sff in sffs:fw.write(str(sff[0]) + '\n')n = sff[1].readline()if n :sff[0] = eval(n)else:sff[0] = ''sff[1].close()shuffled_files = sorted(filter(lambda x:x[0], shuffled_files), key = lambda x:x[0][0], reverse=True)if shuffled_files:fw.write(str(shuffled_files[0][0]) + '\n')  #already existfor line in shuffled_files[0][1]:fw.write(line)shuffled_files[0][1].close()def __run_reduce(self, reduce_f, outfile):def read_mapper_output(file):for line in file:yield eval(line.rstrip())with open(f'{self.temp_dir}/final_one.dat', encoding='utf-8') as f , \open(f'{self.temp_dir}/{outfile}', encoding='utf-8',mode='w') as fw:stdin_generator=read_mapper_output(f)for key, kv_pairs in itertools.groupby(stdin_generator, itemgetter(0) ):for key,result in reduce_f(key, kv_pairs):fw.write(f"{key}\t{result}\n")def run_mrjob(self, map_f, reduce_f, infile, outfile):#====mapself.__run_map(map_f, infile)#====shuffle===self.__merge_sort_from_files()#====reduceself.__run_reduce(reduce_f, outfile)def map_func(x):for t in x.strip().split():yield (t, 1)def reduce_func(key , kv_pairs):s = 0for k,v in kv_pairs :   s += vyield (key, s)if __name__ == "__main__":mr = MapReduce(30, "./")mr.run_mrjob(map_func, reduce_func, "words.txt", "result.txt")

代码明显复杂了很多,其中reduce读取有序文件还借鉴了Hadoop streaming的处理方式。

4. MapReduce思想的总结

       这个系列到这算完结了,为了讲解MapReduce我都是先讲解高阶函数map和reduce,深入思考以后发现这些个东西确实还有一些值得补充的地方。但应该不止于此,对于没有完全学过函数式编程的我来说就纯个人观点一下。

写map有什么用?在你写了很多独立地循环处理逻辑以后, 你发现你可以把循环与处理分离了。反过来更多使用map会习惯这种分离的思维,甚至提升这种分离。 这种分离意味着一种共性的抽象。

reduce被设计出来的意义何在? 不使用全局变量,而是部分变量传递,以完成全局功能的设计。全局功能线性拆分成N个独立的有部分依赖的功能总和。其本质也是一种分离或拆分思想。反过来使用reduce实现功能,习惯这种等价的整体拆分成部分的思维,强化这种拆分,意味着进行带依赖的共性的提炼

最后概括一下MapReduce的学习意义:证明了对任务的工序拆分能有效实现并行加速。即对任务抽象拆分出同质(独立或依赖)的步骤,这些同质工作能并行/自动化。而工序拆分,靠的是前面高阶函数训练出来的思维。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/104661.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Candence报错】Discrepancy #i in TASK

问题描述 Candence LVS仿真提示网络不匹配 问题解决 检查原理图和Layout 注意:

css 星星闪烁加载框

今天带来的是普灵普灵的loader闪烁加载框 效果如下 开源精神给我们带来了源码 ,源码如下 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, in…

SpringMVC的请求处理

请求映射路径的配置 请求映射路径的配置主要是通过RequestMapping注解实现的 相关注解作用使用位置RequestMapping设置控制器方法访问路径的资源&#xff0c;可以接收任何请求方法和类上GetMapping设置控制器方法访问路径的资源&#xff0c;可接收GET请求方法和类上PostMappin…

Step 1 搭建一个简单的渲染框架

Step 1 搭建一个简单的渲染框架 万事开头难。从萌生到自己到处看源码手抄一个mini engine出来的想法&#xff0c;到真正敲键盘去抄&#xff0c;转眼过去了很久的时间。这次大概的确是抱着认真的想法&#xff0c;打开VS从零开始抄代码。不知道能坚持多久呢。。。 本次的主题是搭…

idea(添加jsp文件模板)

初始jsp模板代码&#xff1a; <%-- Created by IntelliJ IDEA. User: ${USER} Date: ${DATE} Time: ${TIME} To change this template use File | Settings | File Templates. --%> <% page contentType"text/html;charsetUTF-8" language&q…

局域网上IP多播与IP单播关于MAC地址的区别

IP单播进行到局域网上的时候&#xff1a; 网际层使用IP地址进行寻址&#xff0c;各路由器收到IP数据报后&#xff0c;根据其首部中的目的IP地址的网络号部分&#xff0c;基于路由表进行查表转发。 查表转发的结果可指明IP数据报的下一跳路由器的IP地址&#xff0c;但无法指明…

SpringCloud小项目——订单积分商城 使用Nacos、Open Feign、Gateway、Sentinel技术栈

目录 引出小项目要求创建极简数据库表订单表&#xff0c;订单明细表商品表积分表 相关微服务积分微服务产品微服务订单微服务调用积分和订单微服务 网关微服务登陆认证通过网关实现对外提供接口API走网关功能 sentinel相关使用Sentinel限流&#xff0c;流量整形Sentinel降级服务…

Python正则表达式

正则表达式 当处理文本数据时&#xff0c;正则表达式是一种强大的工具&#xff0c;它允许我们根据特定的模式来匹配、搜索和处理字符串。 正则表达式由一系列字符和特殊字符组成&#xff0c;用于描述文本模式。这些模式可以包含普通字符&#xff08;如字母、数字和标点符号&a…

kafka 开启认证授权

前言 1、前面自己写了一篇关于各个环境各个模式的安装的文章&#xff0c;大家可以去看看 kafka各种环境安装(window,linux,docker,k8s),包含KRaft模式 2、使用版本 kafka_2.13-3.4.1 3、kafka验证方式&#xff0c;有两大类如下&#xff0c;文档内容在 kafka官方文档的 第七节…

蓝桥杯每日一题2023.10.14

年号字串 - 蓝桥云课 (lanqiao.cn) 题目描述 我们发现每个字母都与26紧密相关&#xff0c;其%26的位置就是最后一个字母&#xff0c;由于最开始将0做为了1故在写答案时需要注意细节问题 #include<bits/stdc.h> using namespace std; char s[] "ABCDEFGHIJKLMNOPQ…

编译linux的设备树

使用make dtbs命令时 在arch/arm 的目录Makefile文件中有 boot : arch/arm/boot prepare 和scripts是空的 在文件scripts/Kbuild.include中 变量build : -f $(srctree)/scripts/Makefile.build obj 在顶层Makefile中 $(srctree)&#xff1a;. 展开后-f ./scripts/Mak…

Linux:mongodb数据库源码包安装(4.4.25版本)

环境 系统&#xff1a;centos7 本机ip&#xff1a;192.168.254.1 准备的mongodb包 版本 &#xff1a; 4.4.25 全名称&#xff1a;mongodb-linux-x86_64-rhel70-4.4.25.tgz 下载源码包 Download MongoDB Community Server | MongoDBhttps://www.mongodb.com/try/downloa…

论文学习——Class-Conditioned Latent Diffusion Model For DCASE 2023

文章目录 引言正文AbstractIntroductionSystem Overview2.1 Latent Diffusion with sound-class-based conditioning以声音类别为条件的潜在扩散模型2.2 Variational Autoencoder and neural vocoder变分自编码器和神经声码器FAD-oriented Postprocessing filter&#xff08;专…

数据中心机房供电配电及能效管理系统设计

安科瑞虞佳豪壹捌柒陆壹伍玖玖零玖叁 摘要&#xff1a;现代的数据中心中都包括大量的计算机&#xff0c;对于这种场所的电力供应&#xff0c;都要求供电系统需要在所有的时间都有效&#xff0c;这就不同于一般建筑的供配电系统&#xff0c;它是一个交叉的系统&#xff0c;涉及…

使用图像处理跟踪瞳孔(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

HTTPS 加密全过程

加密协议以前是SSL,现在都是TLS, 而证书现在大多数都是SSL证书 抓包流程: TCP三次握手过后, 客户端发送Client Hello 服务器相应Server Hello 服务器再次响应发送证书: 服务器再发送公钥:

国内常用源开发环境换源(flutter换源,python换源,Linux换源,npm换源)

flutter换源 使用环境变量:PUB_HOSTED_URL FLUTTER_STORAGE_BASE_URL&#xff0c; upgrade出问题时可能会提示设置FLUTTER_GIT_URL变量。 flutter中国 PUB_HOSTED_URLhttps://pub.flutter-io.cn FLUTTER_STORAGE_BASE_URLhttps://storage.flutter-io.cn FLUTTER_GIT_URLhtt…

Python3无法调用Sqlalchemy解决(mysqldb)

原因 在安装Sqlalchemy后运行程序报错 无法导入mysqldb&#xff0c;缺失模块 ImportError: No module named ‘MySQLdb’ 既然缺少 MySQLdb 这个模块&#xff0c;尝试按照正常的想法执行 pip install MySQLdbpip install mysql-python 应该能解决&#xff0c;但是却找不到…

Zookeeper【Curator客户端Java版】从0到1——万字学习笔记

目录 初识Zookeeper Zookeeper作用 维护配置信息 分布式锁服务 集群管理 生产分布式唯一ID Zookeeper的设计目标 Zookeeper 工作机制 数据模型 ZooKeeper 命令操作 服务端常用命令 客户端常用命令 ZooKeeper JavaAPI操作 Curator 介绍 Curator API 常用操作 导入依赖 建立连接 …

SQL Server修改表结构

在SQL Server中修改的关键字是 ALTER(改变;(使)更改;修改(衣服使更合身);改动&#xff09; 列操作 添加列 添加列操作 alter tabel 表名 add 列名 数据类型--给员工表添加一个邮箱 alter的翻译是&#xff08;改变&#xff09; alter table people add PeopleMail varchar(2…