Kafka环境搭建
- 下载地址:https://link.zhihu.com/?target=https%3A//kafka.apache.org/downloads
- 解压
- 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
需要注意的是 : " c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目录和 " / c o n f i g / z o o k e e p e r . p r o p e r t i e s " 目录是不同的 . 前者指当前目录中 c o n f i g 目录下的 z o o k e e p e r . p r o p e r t i e s 文件, 后者代表根目录中 c o n f i g 目录下的 z o o k e e p e r . p r o p e r t i e s 文件。 \color{red}需要注意的是:\\ "config/zookeeper.properties"目录和 "/config/zookeeper.properties"目录是不同的.\\ 前者指当前目录中config目录下的zookeeper.properties文件,\\ 后者代表根目录中config目录下的zookeeper.properties文件。 需要注意的是:"config/zookeeper.properties"目录和"/config/zookeeper.properties"目录是不同的.前者指当前目录中config目录下的zookeeper.properties文件,后者代表根目录中config目录下的zookeeper.properties文件。
若启动不成功,需要将zookeeper.properties中的admin.EnableServer=false修改为admin.EnableServer=true
或者关闭zookeeper并重新启动:
bin/zookeeper-server-stop.sh
- 启动kafka
bin/kafka-server-start.sh config/server.properties
- 创建topic
kafka-topics.sh --create --zookeeper cluster1:9092,cluster2: 9092,cluster3: 9092--replication-factor 3 --partitions 1 --topic ljg
若发生错误:”zookeeper is not a recognized option”则将参数换成“—BOOTSTRAP-SERVER”,即:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic ljg
上述两者的区别是,–zookeeper 和cluster都是老版本的命令参数,新版本可能不再支持。
- 创建生产者
kafka-console-producer.sh --broker-list cluster1:9092 --topic ljg
或者:
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic ljg
上述两者的区别是,–broker-list 和cluster都是老版本的命令参数,新版本可能不再支持。
- 创建消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ljg
此时生产者即可进入等待输入,并将消息发送给消费者。
2181端口用于管理Kafka集群的元数据信息,包括Kafka的配置信息、分区信息、消费者信息等。而9092端口是Kafka Broker的默认端口,用于接收和处理生产者和消费者的消息,以及进行数据的存储和传输。
参考链接:https://www.cnblogs.com/anquing/p/14523046.html
maven下载安装、设置
- 下载解压
- 设置工作目录
- 设置镜像
- 编译java项目
mave命令:
mvn clean:清理
mvn compile:编译主程序
mvn test-compile:编译测试程序
mvn test:执行测试
mvn package:打包
mvn install:安装
maven项目目录结构:
Hello.java内容:
package com.maven.test;public class hello {public String sayHello(String name){return "Hello "+name+"!";}
}
注意pom.xml中的name 和artifactId字段的区别。
pom.xml例子:
<?xml version="1.0" ?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.maven.test</groupId><artifactId>hello</artifactId><version>0.0.1-SNAPSHOT</version><name>hello</name><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.0</version><scope>test</scope></dependency></dependencies>
</project>
数据库安装和操作
安装mabiadb数据库
yum install mariadb-server
systemctl start mariadb
systemctl enable mariadb
mysql_secure_installation
创建数据表:
mysql -uroot -p
create database kafkaTestDB;
use kafkaTestDB;
create table kafkaTestTable(tickcount varchar(64), value varchar(64),time varchar(64));
java连接和操作数据库:
package Main;import java.sql.*;public class JDBC {public static void main(String[] args) throws SQLException, ClassNotFoundException {
// 1.加载驱动Class.forName("com.mysql.cj.jdbc.Driver");
// 2.用户信息和urlString url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true";String username="root";String password="root";
// 3.连接成功,数据库对象 ConnectionConnection connection = DriverManager.getConnection(url,username,password);
// 4.执行SQL对象Statement,执行SQL的对象Statement statement = connection.createStatement();
// 5.执行SQL的对象去执行SQL,返回结果集String sql = "SELECT *FROM studentinfo;";ResultSet resultSet = statement.executeQuery(sql);while(resultSet.next()){System.out.println("SNo="+resultSet.getString("SNo"));System.out.println("SName="+resultSet.getString("SName"));System.out.println("Birth="+resultSet.getString("Birth"));System.out.println("SPNo="+resultSet.getString("SPNo"));System.out.println("Major="+resultSet.getString("Major"));System.out.println("Grade="+resultSet.getString("Grade"));System.out.println("SInstructor="+resultSet.getString("SInstructor"));System.out.println("SPwd="+resultSet.getString("SPwd"));}
// 6.释放连接resultSet.close();statement.close();connection.close();}
}
JAVA代码(maven)构建生产者消费者
工程目录:
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bjtu.kafkaTest</groupId><artifactId>kafkaTest</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.1</version></dependency><dependency><groupId> org.apache.cassandra</groupId><artifactId>cassandra-all</artifactId><version>0.8.1</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version>
</dependency><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>2.9.8</version>
</dependency><dependency><groupId>org.json</groupId><artifactId>json</artifactId><version>20180130</version>
</dependency></dependencies></project>
java源码:
consumer:
package com.bjtu.kafkaTest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.sql.*;
import java.util.Properties;
import org.json.*;public class ConsumerDemo {public static void mysqlAccess(String tick,String value,String t) {try{Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://localhost:3306/";String username="root";String password="1234";Connection connection = DriverManager.getConnection(url,username,password);Statement statement = connection.createStatement();String sql = "use kafkaTestDB;";ResultSet resultSet = statement.executeQuery(sql);//sql = "SELECT *FROM kafkaTestTable;";//resultSet = statement.executeQuery(sql);//while(resultSet.next()){// System.out.println("tickcount:"+resultSet.getString("tickcount"));//}sql = "insert into kafkaTestTable values(" + tick +"," + value + "," + t + ")";resultSet = statement.executeQuery(sql);System.out.println(sql);resultSet.close();statement.close();connection.close();}catch(Exception e){System.out.println(e.toString());}}public static void main(String[] args){System.out.println("consumer start\r\n");Statement statement = null;Connection connection = null;ResultSet resultSet = null;try{Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://localhost:3306/";String username="root";String password="1234";connection = DriverManager.getConnection(url,username,password);statement = connection.createStatement();String sql = "use kafkaTestDB;";resultSet = statement.executeQuery(sql);//sql = "SELECT *FROM kafkaTestTable;";//resultSet = statement.executeQuery(sql);//while(resultSet.next()){// System.out.println("tickcount:"+resultSet.getString("tickcount"));//}}catch(Exception e){System.out.println(e.toString());return;}Properties properties = new Properties();properties.put("bootstrap.servers", "0.0.0.0:9092");properties.put("group.id", "zabbix_perf");properties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000");/*** earliest* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费* latest* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据* none* topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常**/properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");/*** 反序列化* 把kafka集群二进制消息反序列化指定类型。*/properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Arrays.asList("ljg"));while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//100是超时时间for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, value = %s", record.offset(), record.value());JSONObject jo = new JSONObject(record.value());try{String tick = jo.getString("tickcount");String value = jo.getString("value");String t = jo.getString("time");//mysqlAccess(tick,value,t);//System.out.println("tick:"+tick + ",value:"+value + ",time:"+t);String sql = "insert into kafkaTestTable values(\"" + tick + "\",\"" + value + "\",\"" + t + "\")";System.out.println(sql);int result = statement.executeUpdate(sql);}catch(Exception e){System.out.println(e.toString());break;}}}//try{//resultSet.close();//statement.close();//connection.close();//}catch(Exception e){// System.out.println(e.toString());//}}}
producer:
package com.bjtu.kafkaTest;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;
import java.util.Date;import java.time.LocalDate;
import org.joda.time.DateTime;import java.text.SimpleDateFormat;public class ProducerDemo {public static void main(String[] args){System.out.println("producer start\r\n");Properties properties = new Properties();/***bootstrap.server用于建立到Kafka集群的初始连接的主机/端口对的列表,如果有两台以上的机器,逗号分隔*/properties.put("bootstrap.servers", "0.0.0.0:9092");/*** acks有三种状态* acks=0 不等待服务器确认直接发送消息,无法保证服务器收到消息数据* acks=1 把消息记录写到本地,但不会保证所有的消息数据被确认记录的情况下进行释放* acks=all 确认所有的消息数据被同步副本确认,这样保证了记录不会丢失**/properties.put("acks", "all");/*** 设置成大于0将导致客户端重新发送任何发送失败的记录**/properties.put("retries", 0);/***16384字节是默认设置的批处理的缓冲区*/properties.put("batch.size", 16384);properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);/*** 序列化类型。* kafka是以键值对的形式发送到kafka集群的,key是可选的,value可以是任意类型,Message再被发送到kafka之前,Producer需要* 把不同类型的消息转化成二进制类型。*/properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = null;try {producer = new KafkaProducer<String, String>(properties);Random r = new Random();for (int i = 0; i < 1000; i++) {int rv = r.nextInt(0x10000000);long timestamp = System.currentTimeMillis();Date date = new Date(timestamp + rv);SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");String formattedDate = sdf.format(date);//System.out.println("格式化后的日期:" + formattedDate);String msg = "{\"tickcount\":\"" + formattedDate + "\",\"value\":\"" +formattedDate +"\",\"time\":\"" + formattedDate + "\"}" ;producer.send(new ProducerRecord<String, String>("ljg", msg));System.out.println("Sent:" + msg);//Thread.sleep(1);}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}System.out.println("producer end\r\n");}
}
安装依赖包:
mvn idea:module
编译运行:
执行:
mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ConsumerDemo"
mvn exec:java -Dexec.mainClass="com.bjtu.kafkaTest.ProducerDemo"
参考链接:
https://www.cnblogs.com/qqran/p/14772713.html