加油,新时代打工人!
1、导入Maven Kafka POM依赖
<repositories><repository><id>central</id><url>http://maven.aliyun.com/nexus/content/groups/public//</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled><updatePolicy>always</updatePolicy><checksumPolicy>fail</checksumPolicy></snapshots></repository>
</repositories><dependencies><!-- kafka客户端工具 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><!-- 工具类 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-io</artifactId><version>1.3.2</version></dependency><!-- SLF桥接LOG4J日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.6</version></dependency><!-- SLOG4J日志 --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins>
</build>
2、代码
- 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.100:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- 创建一个生产者对象KafkaProducer
- 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
- 再调用一个Future.get()方法等待响应
- 关闭生产者
参考代码:
public class KafkaProducerTest {public static void main(String[] args) {// 1. 创建用于连接Kafka的Properties配置Properties props = new Properties();props.put("bootstrap.servers", "192.168.88.100:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建一个生产者对象KafkaProducerKafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);// 3. 调用send发送1-100消息到指定Topic testfor(int i = 0; i < 100; ++i) {try {// 获取返回值Future,该对象封装了返回值Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));// 调用一个Future.get()方法等待响应future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}// 5. 关闭生产者producer.close();}
}