kafka复习:(25)kafka stream

一、java代码:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class KafkaTest25 {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "k8s-master:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" "))).groupBy((key, value) -> value).count();counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {public void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
}

二、启动console producer:

bin/kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic streams-plaintext-input

三、启动console consumer:

./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

四、在producer端输入字符串(空格分割),看consumer输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72463.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决Microsoft Edge无法正常运行的有效方案分享!

Microsoft Edge打不开或不能加载网页是用户在Windows 10、Android、Mac和iOS设备上的网络浏览器上遇到的许多错误之一。其他Microsoft Edge问题可能包括浏览器窗口和选项卡冻结、网站崩溃、互联网连接错误消息以及丢失Microsoft Edge书签、收藏夹、密码和收藏。 Microsoft Edg…

spring 面试题总结

spring boot 变更jackson的json序列化方式 要变更Jackson库的JSON序列化方式&#xff0c;你可以使用自定义的JsonSerializer&#xff0c;并在需要进行自定义序列化的属性或类上使用JsonSerialize注解来指定自定义的JsonSerializer。下面是一个简单的示例&#xff1a; 首先&am…

什么是Lambda表达式?

Lambda表达式是Java 8引入的一个重要特性&#xff0c;用于简化函数式编程中的匿名函数的定义和使用。它可以被视为一种轻量级的匿名函数&#xff0c;可以作为参数传递给方法或存储在变量中。 Lambda表达式的语法形式如下&#xff1a; (parameters) -> expression 或 (para…

mysql中慢sql处理方案

前言 Mysql的慢查询日志是MySql提供的一种日志记录&#xff0c;它用来记录在Mysql中响应时间超过阈值的SQL语句&#xff0c;具体是指运行时间超过 long_query_time 值的sql会被记录到慢查询日志中。 开启慢查询 Mysql默认情况下&#xff0c;是没有开启慢查询日志的&#xff0c…

微信小程序实现数值监听(页面和组件属性)

简介 目前文章主要介绍对页面属性值的监听以及组件属性值的监听。需要异页面监听数据&#xff0c;请跳转至另一个文章介绍 为什么需要监听属性值 当需要通过一个属性变化时候&#xff0c;需要计算相应的方法等。pc网站经常需要监听属性&#xff0c;那么小程序应该怎么去实现…

【启扬方案】启扬多尺寸安卓屏一体机,助力仓储物料管理系统智能化管理

随着企业供应链管理的不断发展&#xff0c;对仓储物料管理的要求日益提高。企业需要实时追踪和管理物料的流动&#xff0c;提高物流效率、降低库存成本和减少库存的风险。因此&#xff0c;仓储物料管理系统的实现成为必要的手段。 仓储物料管理系统一体机作为一种新型的物料管理…

算法面试-深度学习基础面试题整理-AIGC相关(2023.9.01)

1、stable diffusion和GAN哪个好&#xff1f;为什么 &#xff1f; Stable diffusion是一种基于随机微分方程的生成方法&#xff0c;它通过逐步增加噪声来扰动原始图像&#xff0c;直到完全随机化。然后&#xff0c;它通过逐步减少噪声来恢复图像&#xff0c;同时使用一个神经网…

管理类联考——数学——汇总篇——知识点突破——数据分析——1. 计数原理——排列组合——公式

排列组合 排列与组合的推导: 从n个不同的元素中取出m(m≤n)个元素做排列为 A n m A_n^m An

Linux:工具(vim,gcc/g++,make/Makefile,yum,git,gdb)

目录 ---工具功能 1. vim 1.1 vim的模式 1.2 vim常见指令 2. gcc/g 2.1 预备知识 2.2 gcc的使用 3.make,Makefile make.Makefile的使用 4.yum --yum三板斧 5.git --git三板斧 --Linux下提交代码到远程仓库 6.gdb 6.1 gdb的常用指令 学习目标&#xff1a; 1.知道…

Android jni引用第三方so动态库和.a静态库并且调用(c)方法

最近花了一周时间来入门学习 Android JNI方面的知识,因为后续的工作很多需要用到c c++库,我需要用jni来包装一下c函数,来提供给上次java调用。总之多学点知识对自己有好处。 案例效果: 上文我们讲解了 android studio cmake生成.a文件(静态库)及调用(c c++)静态库.a 本文…

java网络编程,套接字socket

目录 一 网络概述 二 网络的类型分类 三 网络体系结构 四 网络通信协议概述 五 网络通信协议种类 六 Socket简介 七 Socket路径 八 java网络编程三要素 九 基于UDP协议的Socket编程 十 基于TCP协议的Socket编程 十一 基于TCP协议和UDP的区别 一 网络概述 多台相互连…

Python网络编程详解

概要 Python作为一种强大的编程语言&#xff0c;拥有丰富的网络编程库和框架&#xff0c;能够方便地进行各种网络编程任务。本文将介绍Python网络编程的基础知识&#xff0c;包括socket编程和HTTP协议&#xff0c;然后深入探讨一些流行的Python Web框架&#xff0c;包括Flask和…

安卓绘制原理概览

绘制原理 Android 程序员都知道 Android 的绘制流程分为 Measure、Layout、Draw 三步骤&#xff0c;其中 Measure 负责测量 View 的大小Layout 负责确定 View 的位置Draw 负责将 View 画在屏幕上 由 ViewRootImpl 实现的 performTraversal 方法是 Measure、layout、draw 的真正…

漏洞复现【目录集合】

Apache Apache_HTTPD_换行解析漏洞(CVE-2017-15715) Apache_HTTPD_多后缀解析漏洞 Apache HTTPD-未知后缀名解析 Apache_HTTP_2.4.50_路径穿越漏洞(CVE-2021-42013) Apache_HTTP_2.4.49_路径穿越漏洞(CVE-2021-41773) Aapache_Tomcat_AJP协议_文件包含漏洞(CVE-2020-1938)…

2023高教社杯数学建模C题思路代码 - 蔬菜类商品的自动定价与补货决策

# 1 赛题 在生鲜商超中&#xff0c;一般蔬菜类商品的保鲜期都比较短&#xff0c;且品相随销售时间的增加而变差&#xff0c; 大部分品种如当日未售出&#xff0c;隔日就无法再售。因此&#xff0c; 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬菜…

Matplotlib:Python数据可视化的全面指南

数据可视化是数据分析的一个重要方面&#xff0c;可以帮助我们有效地传达数据中的洞察和模式。Python提供了几个用于数据可视化的库&#xff0c;其中最突出和广泛使用的是Matplotlib。在本文中&#xff0c;我们将探索Matplotlib的基本概念和功能&#xff0c;并学习如何创建各种…

jemalloc 5.3.0源码总结

注意&#xff1a;jemalloc 的最新版本里没有所谓的 huge class&#xff0c;bin 中slab外面也不再套一个run的概念了&#xff0c;看其它人分享的文章时需要注意。 简述 用户侧通过 tcache 来访问&#xff0c;tcache 是一个线程的申请又释放的对象的缓存&#xff0c;它绑定了一…

JavaScript 中的 every() 方法和some() 方法

1.every()方法的定义与用法 every()方法用于检测数组中的所有元素是否都满足指定条件every()方法会遍历数组中的每一项&#xff0c;如果有一项不满足条件&#xff0c;则表达式返回false&#xff0c;剩余的项将不会进行检测&#xff1b;如果遍历完数组后&#xff0c;每一项都符…

Maven 的其它插件

文章目录 Maven 的其它插件dockerfile 插件Apache Maven Checkstyle Pluginp3c-pmd Maven 的其它插件 dockerfile 插件 dockerfile-maven-plugin 是 spotify 公司新提供的、用以替代 docker-maven-plugin 的插件&#xff0c;它同样是用于在 maven 中将当前项目打成一个 docke…

Jenkins 持续集成:Linux 系统 两台机器互相免密登录

背景知识 我们把public key放在远程系统合适的位置&#xff0c;然后从本地开始进行ssh连接。 此时&#xff0c;远程的sshd会产生一个随机数并用我们产生的public key进行加密后发给本地&#xff0c;本地会用private key进行解密并把这个随机数发回给远程系统。 最后&#xf…