1.broker文件存储机制
去查看真正的存储文件:
在/opt/module/kafka/datas/ 路径下
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
如果是6415那么这个会存储在563的log文件之中,因为介于6410和10090之间。
2.文件清除策略
日志压缩用的比较少,比如针对用户,18岁19岁,只需要保存最新的就行。
3.高效读写数据
4.kafka消费者
5.消费者组初始化流程
6.独立消费者案例
package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}