大数据学习——SparkStreaming整合Kafka完成网站点击流实时统计

1.安装并配置zk

2.安装并配置Kafka

3.启动zk

4.启动Kafka

5.创建topic

[root@mini3 kafka]# bin/kafka-console-producer.sh --broker-list mini1:9092 --topic cyf-test

 

程序代码

package org.apache.sparkimport java.net.InetSocketAddressimport org.apache.spark.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.kafka.KafkaUtilsobject KafkaWordCount {val updateFunction = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(v => (x, v)) }}def main(args: Array[String]) {val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount")val ssc = new StreamingContext(conf, Seconds(5))//回滚点设置在本地
//    ssc.checkpoint("./")//将回滚点写到hdfsssc.checkpoint("hdfs://mini1:9000/kafkatest")//val Array(zkQuorum, groupId, topics, numThreads) = argsval Array(zkQuorum, groupId, topics, numThreads) = Array[String]("mini1:2181,mini2:2181,mini3:2181", "g1", "cyf-test", "2")val topicMap = topics.split(",").map((_, numThreads.toInt)).toMapval lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap).map(_._2)val results = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)results.print()ssc.start()ssc.awaitTermination()}}

 

 记一次遇到的问题 https://www.cnblogs.com/feifeicui/p/11018761.html

转载于:https://www.cnblogs.com/feifeicui/p/11018774.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/362376.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

日期/时间格式/解析,Java 8样式

自Java 几乎 开始以来,Java开发人员就通过java.util.Date类(自JDK 1.0起)和java.util.Calendar类(自JDK 1.1起 )来处理日期和时间。 在这段时间内,成千上万(甚至可能数百万)的Java开…

无废话WPF系列5:控件派生图

1. WPF类控件的派生关系图,紫色的部分开始才算是进入WPF的框架里。 2. WPF控件图 WPF的UI控件主要有以下类型,ContentControl, HeaderedContentControl, ItemsControl, HeaderedItemsControl, Panel, Adorner(文字点缀元素), Flow Text(流式文本元素), T…

安装ipython和jupyter

本节内容; 安装ipython安装jupyterPycharm介绍 Python软件包管理一、安装ipython 1. python的交互式环境2. 安装ipython 可以使用pip命令安装。如果你是用pyenv安装的python的话,pip命令已经有了。 当需要安装包的时候,最好进入虚拟环境&…

vue 图片资源应该如何存放并引入(public、assets)?

全局cli配置:vue.config.js 之前写项目就想着怎么简单怎么来,图片要用了,就直接在要用图片的页面,新建一个跟页面同等级的 imgs 文件夹,然后在页面中直接 “./imgs/图片.png”,不得不说这样写很方便。 但是…

layui 树形组件下拉框

采用 layui 树形组件,版本:V2.6.8。只需要更新layui版本,不需要下载tableSelect。 原作者博客:https://blog.csdn.net/m0_67402588/article/details/123526860。 从 官网 更新日志可以看到,树形组件在2.5.7版本还在更新…

layui table表格的复选框checkbox设置部分为不可选

需求:如上图,某些数据禁用删除功能,那么全选时,这些数据前面的复选框也不能选。 实现:在layui数据表格中设置了字段为 type:checkbox 但是想要实现部分不显示,不可选的功能。layui内置没有该功能&#xff…

layui upload阻止文件上传问题,及多选文件上传

1、效果展示: 2、需求: 下拉框及月份都为不空,且有文件数据才能提交上传。 3、环境: 目前项目中引用的 layui 版本是 2.4.5。在 before 中进行判断,使用 return false 想要阻止文件上传没反应,文件仍然会…

2019.06.17课件:[洛谷P1310]表达式的值 题解

P1310 表达式的值 题目描述 给你一个带括号的布尔表达式,其中表示或操作|,*表示与操作&,先算*再算。但是待操作的数字(布尔值)不输入。 求能使最终整个式子的值为0的方案数。 题外话 不久之前我在codewars上做过一…

vue+element 封装日期范围组件(双向绑定)

像这样的日期组件,在后台管理项目中是比较多的,而且加了快捷选项,代码量较多,因此封装成组件。 封装这一类型的组组件,主要是了解输入框双向绑定 v-model 的过程。 1、了解输入框双向绑定的过程: 官网&am…

用Hystrix保护您的应用程序

在之前的帖子http://www.javacodegeeks.com/2014/07/rxjava-java8-java-ee-7-arquillian-bliss.html中,我们讨论了微服务以及如何使用(RxJava)的Reactive Extensions编排微服务。 但是,如果一项或多项服务由于已被暂停或引发异常而…

若依 从下载到成功运行及打包

官网:http://www.ruoyi.vip/ 目录 一、下载并运行项目 二、关于 若依 接口地址配置 2.1 若依的跨域代理介绍 2.2 配置跨域代理,调用后台接口 2.2.1 配置 后台 ip 地址 2.2.2 页面报“系统接口404”错误 三、打包配置 3.1 打包之后静态资源404…

uniAPP小程序 子组件使用watch不生效,H5正常,小程序不正常(其实是子组件model选项的问题)

第一次用 uniapp 写小程序,还是遇到挺多问题的。写了一个下拉多选组件,发现同样的代码,在H5上运行效果正常,在小程序上压根不走 watch 。 uniapp官网:【全局配置 | uni-app官网】 看文档 watch 是支持H5、小程序的&…

jQuery EasyUI/TopJUI创建日期时间输入框

jQuery EasyUI/TopJUI创建日期时间输入框 日期时间输入框组件 HTML 和日期输入框类似&#xff0c;日期时间输入框允许用户选择日期和指定的时间并按照指定的输出格式显示。相比日期输入框&#xff0c;它在下拉面板中添加了一个时间微调器。 <div class"topjui-containe…

table 设置边框

本文引自&#xff1a;https://www.cnblogs.com/leona-d/p/6125896.html 示例代码&#xff1a; <!DOCTYPE html><html lang"zh"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width…

uniapp小程序 设置自定义导航栏

如下截图&#xff0c;通过 wx.getSystemInfoSync 计算得到的整个导航栏高度&#xff0c;其实是有3个部分的&#xff1a; 黄色&#xff1a;状态栏高度&#xff0c;uniapp文档中有给出&#xff1b;红色&#xff1a;胶囊高度&#xff0c;可以计算得出&#xff1b;绿色&#xff1a;…

Akka Notes –演员记录和测试

在前两部分&#xff08; 一 &#xff0c; 二 &#xff09;中&#xff0c;我们简要讨论了Actor以及消息传递的工作方式。 在这一部分中&#xff0c;让我们看一下如何修复并记录我们的TeacherActor 。 概括 这就是我们上一部分中的Actor的样子&#xff1a; class TeacherActor …

vue笔记(二)Vue-class与style、事件、计算属性、数据监听、指令+自定义指令、过滤器

vue官网 一 、class、style操作 二、事件 三、计算属性 四、数据监听、观测 五、指令自定义指令 六、过滤器 一 、class、style操作 官网 1. class使用&#xff1a; &#xff08;1&#xff09;v-bind:class“数据|属性|变量|表达式” &#xff08;2&#xff09;v-bind:class“…

Nsum问题

题目 题解 暴力法 class Solution:def fourSum(self, nums: List[int], target: int) -> List[List[int]]:if len(nums) < 4:return []nums.sort()N len(nums)res []for i in range(N-3):for j in range(i1, N-2):for k in range(j1, N-1):for m in range(k1, N):tmp…

js笔记(一)js基础、程序结构、函数

大标题小节一、js 基础1. javascript的组成&#xff1b;2. 运行js&#xff1b;3. 打印信息&#xff1b;4. 关键字var&#xff1b;5. js中的数据类型&#xff1b;6. NaN&#xff08;not a number&#xff09;&#xff1b;7. js运算符&#xff1b;8. 数据类型转换&#xff1b;9. …

DB2 9 底子(730 考试)认证指南,第 3 局部: 拜访 DB2 数据(3)

建树第一个数据库First Steps在 DB2 的安顿进程中&#xff0c;会表示 First Steps 面板&#xff0c;它答运用户生成要操作的示例数据库&#xff1a; 选择 Database Creation 选项将表示一个附加菜单&#xff0c;可以建树 SAMPLE 数据库。 大大都用户希冀建树 SAMPLE 数据库并运…