自动配置 Kafka
整理服务器内容时,发现一个测试 Kafka 的的一个脚本,它可以自动部署 Kafka ,指定三个参数,完成 Kafka 的配置过程。
basePath=$1
brokerId=$2
zookeeperConnect=$3
localIp=`ifconfig |grep inet| awk '{print $2}'| head -1`
echo $localIp
cd $basePath
tar -xvf kafka_2.12-2.3.0.tgz
mv kafka_2.12-2.3.0 kafka
cd kafka/configsed -i -c "s/broker.id=.*/broker.id=${brokerId}/i" server.properties
sed -i -c "s/zookeeper.connect=.*/zookeeper.connect=${zookeeperConnect}/i" server.properties
echo "listeners=PLAINTEXT://$localIp:9092" >> server.properties
Kafka Java 测试代码
```typescript
public static void main(String[] args) {Properties kafkaProps = new Properties();kafkaProps.put(DataShareConstant.ACKS, DataShareConstant.DEFAULT_ACKS);kafkaProps.put(DataShareConstant.KAFKA_PRODUCER_TYPE, DataShareConstant.SYNC);//Avro mapkafkaProps.put(DataShareConstant.VALUE_SERIALIZER, DataShareConstant.AVRO_MAP_SERIALIZER);kafkaProps.put(DataShareConstant.KEY_SERIALIZER, DataShareConstant.DEFAULT_SERIALIZER);kafkaProps.put(DataShareConstant.BOOTSTRAP_SERVERS, "localhost:9092");//默认是30000mskafkaProps.put(DataShareConstant.REQUEST_TIMEOUT, "5000");kafkaProps.put("transaction.timeout.ms", "5000");kafkaProps.put("max.block.ms", "6000"); // 该属性决定连接超时的kafkaProps.put(DataShareConstant.BATCH_SIZE, "1048576");kafkaProps.put(DataShareConstant.LINGER, "10");kafkaProps.put(DataShareConstant.BUFFER_MEMORY, "33554432");KafkaProducer kafkaProducer = null;try{kafkaProducer = new KafkaProducer("mytestkafka", kafkaProps);} catch (Exception e) {logger.info("Construct producer error {}", e.getMessage());}Map<String, Object> testMapData = new HashMap<>(1);testMapData.put("DATA", "Kafka测试");testMapData.put("TIME", DateFormatUtils.format(System.currentTimeMillis(), CommonConstant.EsIndexDayFormat + " HH:mm:ss"));Future send = kafkaProducer2.send(testMapData);assert send != null;try {send.get();System.out.println("send ok.");} catch (Exception e) {String errorMsg = e.getMessage();System.out.println(errorMsg);} finally {kafkaProducer2.close();}}