spark-stream 访问 Redis

最近在spark-stream上写了一些流计算处理程序,程序架构如下

clipboard.png

程序运行在Spark-stream上,我的目标是kafka、Redis的参数都支持在启动时指定。

在写代码时参考了这篇文章 https://www.iteblog.com/archi...,该文讲的比较清楚,但是有两个问题:

  1. 用scala实现的

  2. Redis服务器的地址是写死的,我的程序要挪个位置,要重新改代码编译。

当时倒腾了一些时间,现在写出来和大家分享,提高后来者的效率。

clipboard.png

如上图Spark是分布式引擎,Driver中创建的Redis Pool,在Worker上又得重新创建,参考文章中是定义一个Redis连接池管理类,Redis Pool是类的静态变量,类加载时由JVM自动创建。这个和我的预期有差距。

在Driver中创建Redis管理对象,然后将该对象广播,然后在Worker上获取该广播对象,从而实现参数可变,但是Redis管理对象在每个Worker上又只实例化了一次。

Driver

Driver 指定序列化方式,Spark支持两种序列化方式,Java 和 Kryo,Kryo更高效。

资料上说Kryo方式需要注册类,但是我没有注册也能成功运行。

public static void main(String[] args) {if (args.length < 3) {System.err.println("Usage: kafka_spark_redis <brokers> <topics> <redisServer>\n" +"  <brokers> Kafka broker列表\n" +"  <topics> 要消费的topic列表\n" +" <redisServer> redis 服务器地址 \n\n");System.exit(1);}/* 解析参数 */String brokers = args[0];String topics = args[1];String redisServer = args[2];// 创建stream context,两秒钟的数据算一批SparkConf sparkConf = new SparkConf().setAppName("kafka_spark_redis");
//        sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer");//java的序列号速度没有Kryo速度快sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//        sparkConf.set("spark.kryo.registrator", "MyRegistrator");JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));JavaSparkContext sc = jssc.sparkContext();HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));HashMap<String, String> kafkaParams = new HashMap<String, String>();kafkaParams.put("metadata.broker.list", brokers);kafkaParams.put("group.id","kakou-test");//Redis连接池管理类RedisClient redisClient = new RedisClient(redisServer);//创建redis连接池管理类//广播Reids连接池管理对象final Broadcast<RedisClient> broadcastRedis = sc.broadcast(redisClient);// 创建流处理对象JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc,String.class,               /* kafka key class */String.class,               /* kafka value class */StringDecoder.class,        /* key 解码类 */StringDecoder.class,        /* value 解码类 */kafkaParams,                /* kafka 参数,如设置kafka broker */topicsSet                   /* 待消费的topic名称 */);// 将行分拆为单词JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {//@Override// kafka传来key-value对public String call(Tuple2<String, String> tuple2) {// 取value值return tuple2._2();}});/* 大量省略 */........}

RedisClient

RedisClient 是自己实现的类,在类中重载write/read这两个序列化和反序列化函数,需要注意的是如果是Java Serializer 需要实现其它的接口。

在Driver广播时会触发调用write序列化函数。

public class RedisClient implements KryoSerializable {public static JedisPool jedisPool;public String host;public RedisClient(){Runtime.getRuntime().addShutdownHook(new CleanWorkThread());}public RedisClient(String host){this.host=host;Runtime.getRuntime().addShutdownHook(new CleanWorkThread());jedisPool = new JedisPool(new GenericObjectPoolConfig(), host);}static class CleanWorkThread extends Thread{@Overridepublic void run() {System.out.println("Destroy jedis pool");if (null != jedisPool){jedisPool.destroy();jedisPool = null;}}}public Jedis getResource(){return jedisPool.getResource();}public void returnResource(Jedis jedis){jedisPool.returnResource(jedis);}public void write(Kryo kryo, Output output) {kryo.writeObject(output, host);}public void read(Kryo kryo, Input input) {host=kryo.readObject(input, String.class);this.jedisPool =new JedisPool(new GenericObjectPoolConfig(), host) ;}
}

Worker

在foreachRDD中获取广播变量,由广播变量触发先调用RedisClient的无参反序列化函数,然后再调用反序列化函数,我们的做法是在反序列化函数中创建Redis Pool。

        //标准输出,对车辆的车牌和黑名单进行匹配,对与匹配成功的,保存到redis上。paircar.foreachRDD(new Function2<JavaRDD<HashMap<String, String>>, Time, Void>() {public Void call(JavaRDD<HashMap<String, String>> rdd, Time time) throws Exception {Date now=new Date();rdd.foreachPartition(new VoidFunction<Iterator<HashMap<String, String>>>() {public void call(Iterator<HashMap<String, String>> it) throws Exception {String tmp1;String tmp2;Date now=new Date();RedisClient redisClient=broadcastRedis.getValue();Jedis jedis=redisClient.getResource();......redisClient.returnResource(jedis);}});

结语

Spark对分布式计算做了封装,但很多场景下还是要了解它的工作机制,很多问题和性能优化都和Spark的工作机制紧密相关。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/397871.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

东软睿云用户认证_【硬件资讯】尘埃落定!11代酷睿规格曝光!i7、i9难分差距,退回8核16线程!...

新闻①&#xff1a;Intel第11代酷睿处理器规格曝光&#xff0c;旗舰i9-11900K与i7-11700K同为8核16线程Intel的代号为Rocket Lake-S的第11代酷睿台式机CPU阵容将于明年推出&#xff0c;其中四个型号的规格现已曝光。拥有8个Cypress Cove核心、5.3GHz、PL2功耗限制250W的酷睿i9-…

环上的游戏

环上的游戏&#xff08;cycle&#xff09;有一个取数的游戏。初始时&#xff0c;给出一个环&#xff0c;环上的每条边上都有一个非负整数。这些整数中至少有一个0。然后&#xff0c;将一枚硬币放在环上的一个节点上。两个玩家就是以这个放硬币的节点为起点开始这个游戏&#xf…

python基础课程_2学习笔记3:图形用户界面

图形用户界面 丰富的平台 写作Python GUI程序前&#xff0c;须要决定使用哪个GUI平台。 简单来说&#xff0c;平台是图形组件的一个特定集合。能够通过叫做GUI工具包的给定Python模块进行訪问。工具包 描写叙述 Tkinter 使用Tk平台。非常easy得到。半标准。 wxpython 基于…

idea ssm打war包_IDEA下从零开始搭建SpringBoot工程

SpringBoot的具体介绍可以参看其他网上介绍&#xff0c;这里就不多说了&#xff0c;就这几天的学习&#xff0c;个人理解&#xff0c;简而言之&#xff1a;如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring&#xff0c;MyBatis&#xff0c;Netty源码分析的朋友…

dataframe 众数的方法_学习数据分析数据方法论 [描述性统计分析]

数理统计&#xff1a;数理统计是以概率论为基础&#xff0c;研究社会和自然界中大量随机现象数量变化基本规律的一种方法。分为&#xff1a;描述统计(描述统计的任务是搜集资料&#xff0c;进行整理、分组&#xff0c;编制次数分配表&#xff0c;绘制次数分配曲线&#xff0c;计…

c语言高级语言期中测试答案,上海理工大学C语言2011期中试题和答案

C语言2010/2011学年 第二学期 期中测试高级语言程序设计(C)试卷 A □BA1. 输入一行字符&#xff0c;统计其中的英文字母个数。#include void main(){ char ch;int n0;printf(“Input a string:\n”);while(1){ chgetchar();if (ch \n ) break;if (ch> a && ch< z…

如何构建ASP.NET MVC4JQueryAJaxJSon示例

背景&#xff1a; 博客中将构建一个小示例&#xff0c;用于演示在ASP.NET MVC4项目中&#xff0c;如何使用JQuery Ajax。 直接查看JSon部分 步骤&#xff1a; 1&#xff0c;添加控制器(HomeController)和动作方法(Index),并为Index动作方法添加视图(Index.cshtml),视图中HTML如…

echarts 有引导线和内部文字_点、线、面构图的异同以及相互转化

点、线、面构图既有相似性&#xff0c;又有差异性。相似的是都有对齐、强调、群组、重复、突出层次的作用&#xff0c;不同的是点的特点是聚焦、线的特点是运动和方向性&#xff0c;面的特性是体量感、稳定性。点的情感最弱&#xff0c;线、面的情感要比点丰富。一、点、线、面…

《Python黑帽子:黑客与渗透测试编程之道》 Web攻击

Web的套接字函数库&#xff1a;urllib2 一开始以urllib2.py命名脚本&#xff0c;在Sublime Text中运行会出错&#xff0c;纠错后发现是重名了&#xff0c;改过来就好&#xff1a; #!/usr/bin/python #codingutf-8 import urllib2url "http://www.baidu.com"headers …

vCenter Converter Standalone使用文档

文档目的能够使用vCenter Converter Standalone 将物理机操作系统迁移到虚拟机操作系统基础知识vCenter Converter Standalone 能将物理机上的操作系统、VMware虚拟机上的操作系统或者Hype-V 上的虚拟机操作系统迁移到VMware上。系统环境操作系统&#xff1a;Windows Server 20…

1093芯片做正弦波逆变器_正弦波逆变器中的SPWM调制(钟任生)

欢迎加入技术交流QQ群(2000人)&#xff1a;电力电子技术与新能源 905723370高可靠新能源行业顶尖自媒体在这里有电力电子、新能源干货、行业发展趋势分析、最新产品介绍、众多技术达人与您分享经验&#xff0c;欢迎关注我们&#xff0c;搜索微信公众号&#xff1a;电力电子技术…

android 手机短信恢复,安卓手机短信删除了怎么恢复?简单恢复的方法

原标题&#xff1a;安卓手机短信删除了怎么恢复&#xff1f;简单恢复的方法安卓手机短信删除了怎么恢复&#xff1f;手机短信是生活中不经常使用到&#xff0c;但是依然是十分重要的存在&#xff0c;因为我们现在比较喜欢用社交软件与别人进行交流&#xff0c;但是在一些相对重…

bash下个人习惯的一些文件设置

2019独角兽企业重金招聘Python工程师标准>>> bash_profile export PATH/usr/local/bin:$PATH export EDITORviinputrc set editing-mode vi #set editing-mode emacs set show-all-if-ambiguous on set completion-ignore-case on set meta-flag on set conver…

docker打包镜像上传_Jenkins | 一键打包部署Spring Boot 应用的Docker镜像

一、前言1、本文主要内容将在项目中实际使用到的相关东西整理记录一波&#xff0c;同时可以方便其他同学在使用到的时候参考一下(自己也备忘)&#xff0c;有不对的地方&#xff0c;欢迎指出~~Docker部署SpringBoot 项目通用Dockerfile文件、脚本Jenkins新建任务图文详解3、本文…

android 发送短信 广播 demo,向Android模拟器打电话发短信的简单方法

在开发android应用程序时&#xff0c;有时候需要测试一下向android手机拨打电话发送短信时该应用程序的反应。譬如编写一个广播接收器&#xff0c;来提示用户有短信收到或者处理短信&#xff0c;就需要向该手机发送短信来进行测试。这里介绍一种简单的向android模拟器打电话发短…

android 隐藏键盘时ui延迟恢复,android 软键盘的显示与隐藏问题的研究

在android中&#xff0c;常常会和输入法的软件键盘交互。在Manifest文件中&#xff0c;系统给activity的一个属性-windowSoftInputMode来控制输入法的显示方式。该属性提供了Activity的window与软键盘的window交互的方式。这里的属性设置有双方面的影响&#xff1a;1.软键盘的显…

天才基本法_《天才基本法》强推!年度神仙小说,看完这本书我竟然爱上了数学...

《天才基本法》——长洱小说文案元宝的书评这个真的是本年度的神仙小说&#xff0c;讲述了女主回到少女时代&#xff0c;可以和他一直暗恋的数学天才重来一遍。女主让男主改变了觉得她碌碌无为的看法&#xff0c;也改变了自己的人生。本书最大的主角其实是数学&#xff01;天知…

千年鸿蒙 盼尔来兮是什么意思,鸿蒙是什么意思_鸿蒙的意思和出处_我爱历史网...

鸿蒙&#xff0c;是一个汉语词语&#xff0c;亦作“鸿濛”。中国神话传说的远古时代&#xff0c;传说在盘古开天辟地之前&#xff0c;世界是一团混沌状&#xff0c;因此把那个时代称作鸿蒙时代&#xff0c;后来该词也常被用来泛指远古时代。引证解释亦作“鸿濛”。1、宇宙形成前…

在VS2015中用C++创建DLL并用C#调用且同时实现对DLL的调试

from:http://m.blog.csdn.net/article/details?id51075023 在VS2015中先创建C#项目&#xff0c;然后再创建要编写的动态库DLL项目&#xff0c;这样做的好处是整个解决方案的编程环境是C#模式&#xff0c;这样就可以有很多智能的提示或快捷的编程方式在整个解决方案中都可以使用…

(转)CocosCreator零基础制作游戏《极限跳跃》二、制作游戏开始场景

CocosCreator零基础制作游戏《极限跳跃》二、制作游戏开始场景 我们刚刚分析了《极限跳跃》这款游戏&#xff0c;下面我们开始制作第一个游戏场景&#xff0c;也就是游戏的开始场景。 首先&#xff0c;打开CocosCreator&#xff0c;新建HelloWorld项目。选择项目路径&#xff0…