kafka大数据采集技术实验(未完待续)

Kafka环境搭建

  1. 下载地址:https://link.zhihu.com/?target=https%3A//kafka.apache.org/downloads
  2. 解压
  3. 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

需要注意的是 : " c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目录和 " / c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目录是不同的 . 前者指当前目录中 c o n f i g 目录下的 z o o k e e p e r . p r o p e r t i e s 文件, 后者代表根目录中 c o n f i g 目录下的 z o o k e e p e r . p r o p e r t i e s 文件。 \color{red}需要注意的是:\\ "config/zookeeper.properties"目录和 "/config/zookeeper.properties"目录是不同的.\\ 前者指当前目录中config目录下的zookeeper.properties文件,\\ 后者代表根目录中config目录下的zookeeper.properties文件。 需要注意的是:"config/zookeeper.properties"目录和"/config/zookeeper.properties"目录是不同的.前者指当前目录中config目录下的zookeeper.properties文件,后者代表根目录中config目录下的zookeeper.properties文件。

若启动不成功,需要将zookeeper.properties中的admin.EnableServer=false修改为admin.EnableServer=true
或者关闭zookeeper并重新启动:

bin/zookeeper-server-stop.sh 
  1. 启动kafka
bin/kafka-server-start.sh config/server.properties
  1. 创建topic
kafka-topics.sh --create --zookeeper cluster1:9092,cluster2: 9092,cluster3: 9092--replication-factor 3 --partitions 1 --topic ljg

若发生错误:”zookeeper is not a recognized option”则将参数换成“—BOOTSTRAP-SERVER”,即:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic ljg

上述两者的区别是,–zookeeper 和cluster都是老版本的命令参数,新版本可能不再支持。

  1. 创建生产者
kafka-console-producer.sh --broker-list cluster1:9092 --topic ljg

或者:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic ljg

上述两者的区别是,–broker-list 和cluster都是老版本的命令参数,新版本可能不再支持。

  1. 创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ljg

此时生产者即可进入等待输入,并将消息发送给消费者。

在这里插入图片描述

2181端口用于管理Kafka集群的元数据信息,包括Kafka的配置信息、分区信息、消费者信息等。而9092端口是Kafka Broker的默认端口,用于接收和处理生产者和消费者的消息,以及进行数据的存储和传输。

参考链接:https://www.cnblogs.com/anquing/p/14523046.html

maven下载安装、设置

  1. 下载解压
  2. 设置工作目录
  3. 设置镜像
  4. 编译java项目

mave命令:

mvn clean:清理
mvn compile:编译主程序
mvn test-compile:编译测试程序
mvn test:执行测试
mvn package:打包
mvn install:安装

maven项目目录结构:
在这里插入图片描述

Hello.java内容:

package com.maven.test;public class hello {public String sayHello(String name){return "Hello "+name+"!";}
}

注意pom.xml中的name 和artifactId字段的区别。
pom.xml例子:

<?xml version="1.0" ?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.maven.test</groupId><artifactId>hello</artifactId><version>0.0.1-SNAPSHOT</version><name>hello</name><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.0</version><scope>test</scope></dependency></dependencies>
</project>

数据库安装和操作

安装mabiadb数据库

yum install mariadb-server 
systemctl start mariadb
systemctl enable mariadb
mysql_secure_installation

创建数据表:

mysql -uroot -p
create database kafkaTestDB;
use kafkaTestDB;
create table kafkaTestTable(tickcount varchar(64), value  varchar(64),time varchar(64));

java连接和操作数据库:

package Main;import java.sql.*;public class JDBC {public static void main(String[] args) throws SQLException, ClassNotFoundException {
//        1.加载驱动Class.forName("com.mysql.cj.jdbc.Driver");
//        2.用户信息和urlString url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true";String username="root";String password="root";
//        3.连接成功,数据库对象 ConnectionConnection connection = DriverManager.getConnection(url,username,password);
//        4.执行SQL对象Statement,执行SQL的对象Statement statement = connection.createStatement();
//        5.执行SQL的对象去执行SQL,返回结果集String sql = "SELECT *FROM studentinfo;";ResultSet resultSet = statement.executeQuery(sql);while(resultSet.next()){System.out.println("SNo="+resultSet.getString("SNo"));System.out.println("SName="+resultSet.getString("SName"));System.out.println("Birth="+resultSet.getString("Birth"));System.out.println("SPNo="+resultSet.getString("SPNo"));System.out.println("Major="+resultSet.getString("Major"));System.out.println("Grade="+resultSet.getString("Grade"));System.out.println("SInstructor="+resultSet.getString("SInstructor"));System.out.println("SPwd="+resultSet.getString("SPwd"));}
//        6.释放连接resultSet.close();statement.close();connection.close();}
}

JAVA代码(maven)构建生产者消费者

工程目录:

在这里插入图片描述

pom.xml:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bjtu.kafkaTest</groupId><artifactId>kafkaTest</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.1</version></dependency><dependency><groupId> org.apache.cassandra</groupId><artifactId>cassandra-all</artifactId><version>0.8.1</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version>
</dependency><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.9.8</version>
</dependency><dependency><groupId>org.json</groupId><artifactId>json</artifactId><version>20180130</version>
</dependency></dependencies></project>

java源码:

consumer:

package com.bjtu.kafkaTest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.sql.*;
import java.util.Properties;
import org.json.*;public class ConsumerDemo {public static void mysqlAccess(String tick,String value,String t)  {try{Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://localhost:3306/";String username="root";String password="1234";Connection connection = DriverManager.getConnection(url,username,password);Statement statement = connection.createStatement();String sql = "use kafkaTestDB;";ResultSet resultSet = statement.executeQuery(sql);//sql = "SELECT *FROM kafkaTestTable;";//resultSet = statement.executeQuery(sql);//while(resultSet.next()){//    System.out.println("tickcount:"+resultSet.getString("tickcount"));//}sql = "insert into kafkaTestTable values(" + tick +"," + value + "," + t + ")";resultSet = statement.executeQuery(sql);System.out.println(sql);resultSet.close();statement.close();connection.close();}catch(Exception e){System.out.println(e.toString());}}public static void main(String[] args){System.out.println("consumer start\r\n");Statement statement = null;Connection connection = null;ResultSet resultSet = null;try{Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://localhost:3306/";String username="root";String password="1234";connection = DriverManager.getConnection(url,username,password);statement = connection.createStatement();String sql = "use kafkaTestDB;";resultSet = statement.executeQuery(sql);//sql = "SELECT *FROM kafkaTestTable;";//resultSet = statement.executeQuery(sql);//while(resultSet.next()){//    System.out.println("tickcount:"+resultSet.getString("tickcount"));//}}catch(Exception e){System.out.println(e.toString());return;}Properties properties = new Properties();properties.put("bootstrap.servers", "0.0.0.0:9092");properties.put("group.id", "zabbix_perf");properties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000");/*** earliest*   当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费*   latest*   当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据*   none*   topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常**/properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");/*** 反序列化* 把kafka集群二进制消息反序列化指定类型。*/properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Arrays.asList("ljg"));while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//100是超时时间for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, value = %s", record.offset(), record.value());JSONObject jo = new JSONObject(record.value());try{String tick = jo.getString("tickcount");String value = jo.getString("value");String t = jo.getString("time");//mysqlAccess(tick,value,t);//System.out.println("tick:"+tick + ",value:"+value + ",time:"+t);String sql = "insert into kafkaTestTable values(\"" + tick + "\",\"" + value + "\",\"" + t + "\")";System.out.println(sql);int result = statement.executeUpdate(sql);}catch(Exception e){System.out.println(e.toString());break;}}}//try{//resultSet.close();//statement.close();//connection.close();//}catch(Exception e){//	System.out.println(e.toString());//}}}

producer:

package com.bjtu.kafkaTest;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;
import java.util.Date;import java.time.LocalDate;
import org.joda.time.DateTime;import java.text.SimpleDateFormat;public class ProducerDemo {public static void main(String[] args){System.out.println("producer start\r\n");Properties properties = new Properties();/***bootstrap.server用于建立到Kafka集群的初始连接的主机/端口对的列表,如果有两台以上的机器,逗号分隔*/properties.put("bootstrap.servers", "0.0.0.0:9092");/*** acks有三种状态* acks=0 不等待服务器确认直接发送消息,无法保证服务器收到消息数据* acks=1 把消息记录写到本地,但不会保证所有的消息数据被确认记录的情况下进行释放* acks=all 确认所有的消息数据被同步副本确认,这样保证了记录不会丢失**/properties.put("acks", "all");/*** 设置成大于0将导致客户端重新发送任何发送失败的记录**/properties.put("retries", 0);/***16384字节是默认设置的批处理的缓冲区*/properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);/*** 序列化类型。* kafka是以键值对的形式发送到kafka集群的,key是可选的,value可以是任意类型,Message再被发送到kafka之前,Producer需要* 把不同类型的消息转化成二进制类型。*/properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {producer = new KafkaProducer<String, String>(properties);Random r = new Random();for (int i = 0; i < 1000; i++) {int rv = r.nextInt(0x10000000);long timestamp = System.currentTimeMillis();Date date = new Date(timestamp + rv);SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");String formattedDate = sdf.format(date);//System.out.println("格式化后的日期:" + formattedDate);String msg = "{\"tickcount\":\"" + formattedDate + "\",\"value\":\"" +formattedDate +"\",\"time\":\"" 			+ formattedDate + "\"}" ;producer.send(new ProducerRecord<String, String>("ljg", msg));System.out.println("Sent:" + msg);//Thread.sleep(1);}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}System.out.println("producer end\r\n");}
}

安装依赖包:

mvn idea:module

编译运行:

在这里插入图片描述

执行:

mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ConsumerDemo"
mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ProducerDemo"

参考链接:
https://www.cnblogs.com/qqran/p/14772713.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/3253.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解密Java线程池源码

一、线程池中的保活和回收源码分析 1、线程池中线程的创建时机 1、核心线程创建时机 在研究线程池的源码前首先想一个问题 public class Main {public static void main(String[] args) {ThreadPoolExecutor executor new ThreadPoolExecutor(10, 20, 0l, TimeUnit.MILLIS…

从Linux角度具体理解程序翻译过程-----预处理、编译、汇编、链接

目录 前言&#xff1a; 翻译过程 1.预处理 2.编译 3.汇编 4.链接 Linux下对其理解&#xff1a; 1.预处理 拓展&#xff1a; Linux下文件信息&#xff1a; 文件类型&#xff1a; 硬链接数&#xff1a; 文件拥有者&#xff1a; 文件所属组&#xff1a; other&#x…

区块链安全应用-------压力测试

基于已有的链进行测试&#xff08;build_chain默认建的链 四个节 点&#xff09;&#xff1a; 第一步&#xff1a;搭链 1. 安装依赖 在ubuntu操作系统中&#xff0c;操作步骤如下&#xff1a; sudo apt install -y openssl curl 2. 创建操作目录, 下载安装脚本 ## 创建操作…

3个比较不错的Linux云音乐应用程序整理

在现代音乐流媒体时代&#xff0c;基于云的音乐应用程序因其便利性和可访问性而变得非常流行。Linux 用户尤其寻求可靠且功能丰富的音乐播放器来无缝地享受他们喜爱的音乐。 在这里&#xff0c;我们探讨了三个最好的基于云的音乐应用程序&#xff0c;每个应用程序都提供专为 L…

Java Web 网页设计(1)

不要让追求之舟停泊在幻想的港湾 而应扬起奋斗的风帆 驶向现实生活的大海 网页设计 1.首先 添加框架支持 找到目录右键添加 找到Web Application选中 点击OK 然后 编辑设置 找到Tomcat--local 选中 点击OK 名称可以自己设置 找到对应文件夹路径 把Tomcat添加到项目里面 因为…

【Hadoop】-HDFS的Shell操作[3]

目录 前言 一、HDFS集群启停命令 1.一键启停脚本可用 2.独立进程启停可用 二、文件系统操作命令 1、创建文件夹 2、查看指定目录下内容 3、上传文件到HDFS指定目录下 4、查看HDFS文件内容 5、下载HDFS文件 6、拷贝HDFS文件 7、追加数据到HDFS文件中 8、HDFS数据移…

哪吒汽车把最后的翻身筹码,全压在了这辆新车上

正如比亚迪王传福所说&#xff0c;新能源车市场已进入惨烈淘汰赛环节。 近几年国内新能源车销量增长势头迅猛&#xff0c;仅过去的 2023 年产销便分别达 958.7 万辆和 949.5 万辆&#xff0c;同比增长 35.8% 和 37.9%。 销量高速增长背后自然也带来了越来越激烈的竞争。 过去…

Footprint Analytics 与 GalaChain 达成战略合作

​ Footprint Analytics 宣布与 GalaChain 达成战略合作。GalaChain 是 Gala 旗下的 Layer 1 区块链。此次合作标志着双方在游戏&#xff08;包括 Gala Games) 、娱乐和金融等多个行业的区块链生态系统革新方面迈出了重要的一步。 GalaChain 致力于满足企业级项目的广泛需求&…

算法-栈操作

1047. 删除字符串中的所有相邻重复项 - 力扣&#xff08;LeetCode&#xff09; class Solution { public:string removeDuplicates(string s) {string stack;for(char& ch:s){if(stack.size()>0&&chstack.back()){stack.pop_back();}else{stack.push_back(ch);}…

AI大模型实现软件智能化落地实践

1、什么是大模型 大型语言模型&#xff08;Large Language Model&#xff0c;LLM&#xff1b;Large Language Models&#xff0c;LLMs)。 大语言模型是一种深度学习模型&#xff0c;特别是属于自然语言处理&#xff08;NLP&#xff09;的领域&#xff0c;一般是指包含数干亿&…

Pandas 模块-操纵数据(11)-二元运算--超级add、sub、mul、div、mod、pow等等

目录 1. DataFrame.add 1.1 DataFrame.add 语法结构 1.2 DataFrame.add 参数说明 1.3 DataFrame.add 用法示例 1.3.1 正常的使用 1.3.2 需要注意类型相符合 2. DataFrame.sub 2.1 DataFrame.sub 语法结构 2.2 DataFrame.sub 参数说明 2.3 DataFrame.sub 用法示例 3.…

传媒论坛编辑部传媒论坛杂志社传媒论坛杂志2024年第7期目录

专题│场景传播研究 场景传播&#xff1a;一场遮盖自我与寻找自我的博弈 胡沈明; 3 基于CiteSpace的中国场景传播研究热点分析 管倩;粟银慧; 4-610《传媒论坛》投稿&#xff1a;cnqikantg126.com 数字世界的美与危&#xff1a;场景传播的失范与应对之举 王依晗;章洁…

分布式-知识体系

分布式系统 本质就是一堆机器的协同&#xff0c;要做的就是用各种手段来让机器的运行达到预期 分布式业务场景 分布式四纵四横说 基于 MSA&#xff08;微服务架构&#xff09;的分布式知识体系 相关概念 – 【摘自网络原文】 节点与网络 节点 传统的节点也就是一台单体的物…

MySQL数据类型:字符串类型详解

MySQL数据类型&#xff1a;字符串类型详解 在MySQL数据库中&#xff0c;字符串数据类型用于存储各种文本信息。这些数据类型主要包括CHAR、VARCHAR、TEXT和BLOB等。 CHAR与VARCHAR CHAR CHAR类型用于存储固定长度的字符串。它的长度在创建表时就已确定&#xff0c;长度范围…

QJ71C24N-R2 三菱Q系列串行通信模块

三菱Q系列串行通信模块是通过串行通信用的RS-232、RS-422/485线路将对方设备与Q系列可编程控制器CPU相连接,以实现如下所示的数据通信的模块。通过使用调制解调器/终端适配器,可以利用公共线路(模拟/数字)实现与远程设备间的数据通信。 QJ71C24N-R2参数说明&#xff1a;串行RS-…

为什么36KbRAM会配置为32K×1,少的那4Kb去哪了?

首先我们需要了解BRAM的相关知识&#xff0c;可以参考下面两篇文章&#xff1a; Xinlinx FPGA内的存储器BRAM全解-CSDN博客 为何有时简单双口RAM是真双口RAM资源的一半-CSDN博客 本问题的背景是&#xff1a; 每个36Kb块RAM也可以配置成深度宽度为64K 1(当与相邻的36KB块RA…

淘宝新店没有流量和访客怎么办

淘宝新店没有流量和访客时&#xff0c;可以采取以下措施来提升店铺的流量和吸引更多的访客&#xff1a; 3an推客是给商家提供的营销工具&#xff0c;3an推客CPS推广模式由商家自主设置佣金比例&#xff0c;以及设置商品优惠券&#xff0c;激励推广者去帮助商家推广商品链接&…

SVG 绘制微信订阅号icon

效果 代码 <!DOCTYPE html> <html> <body><svg xmlns"http://www.w3.org/2000/svg" version"1.1" width"600" height"600"><rect x"0" y"0" rx"0" ry"0" width&…

JavaEE 初阶篇-深入了解 UDP 通信与 TCP 通信(综合案例:实现 TCP 通信群聊)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 UDP 通信 1.1 DatagramSocket 类 1.2 DatagramPacket 类 1.3 实现 UDP 通信&#xff08;一发一收&#xff09; 1.3.1 客户端的开发 1.3.2 服务端的开发 1.4 实现 …

Arm功耗管理精讲与实战

安全之安全(security)博客目录导读 思考 1、为什么要功耗管理&#xff1f;SOC架构中功耗管理示例&#xff1f;功耗管理挑战&#xff1f; 2、从单核->多核->big.LITTLE->DynamIQ&#xff0c;功耗管理架构演进? 3、什么是电压域&#xff1f;什么是电源域&#xff1f…