java SASL_SSL
帐号密码 方式访问 kafka
Producer Java Sample java生产者:
Properties props = new Properties();
props.put("bootstrap.servers",
"*******:9092,*******:9092");
props.put("acks", "all");//
props.put("retries", 3);
props.put("batch.size", 106384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_SSL");
props.put("ssl.truststore.location",
"D:/client_truststore.jks");
props.put("ssl.truststore.password", "WSO2_sp440");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required
username='kaf_crm' password='xxxxxxx';");
//注意passwod结尾的分号一定不要漏
props.put("ssl.endpoint.identification.algorithm", "");
long sys = System.currentTimeMillis();
String contractId=CRM_ContractID
String payload = "payload";
Producer producer = new KafkaProducer<>(props);
//Synchronized Mode, Producer will wait and block until Kafka
Server return response
try{
Future future =producer.send(new
ProducerRecord<>("CRM_Contract", contractId, payload));//
(topic, key, payload),the second parameter is the key
future.get();//。 If not care whether success or failure , no
need this code
producer.close();
} catch(Exception e) {
e.printStackTrace();// Connection, No Leader error can be
resolved by retry; but too large message error will not re-try and
throw exception immediately
}
//Asynchronized mode, Producer not wait for response,
Background process of Producer submit message to Kafka server by
Batch size. It need callback to handle whether message is sent to
Kafka Server. If error happen ,need to log the exception.
try{
producer.send(new ProducerRecord<>("CRM_Contract",
contractId, payload),new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e)
{
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is:
" + metadata.offset());}}});
}catch(Exception e) {
e.printStackTrace();
}
Consumer Java Sample java消费者:
Properties props = new Properties();
props.put("bootstrap.servers", "*******:9092");
props.put("group.id", "wso2_sp");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
"G:\\client_truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
"WSO2_sp440");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required
username='kaf_xxx' password='xxxxx';");//注意passwod结尾的分号一定不要漏
props.put("ssl.endpoint.identification.algorithm", "");
KafkaConsumer consumer = new
KafkaConsumer<>(props);
String topic = "file_poc";
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("partition= %d, offset = %d, key = %s, value
= %s\n", record.partition(), record.offset(), record.key(),
record.value());
}
consumer.commitSync();
}