kafka为什么有些属性没有配置却能正常工作,那是因为kafka-clients对有些消费者设置了默认值,具体看下ConsumerConfig类的静态模块,具体如下所示:
kafka为什么有些属性没有配置却能正常工作,那是因为kafka-clients对有些消费者设置了默认值,具体看下ConsumerConfig类的静态模块,具体如下所示:static {CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,Type.LIST,Collections.emptyList(),new ConfigDef.NonNullValidator(),Importance.HIGH,CommonClientConfigs.BOOTSTRAP_SERVERS_DOC).define(CLIENT_DNS_LOOKUP_CONFIG,Type.STRING,ClientDnsLookup.USE_ALL_DNS_IPS.toString(),in(ClientDnsLookup.DEFAULT.toString(),ClientDnsLookup.USE_ALL_DNS_IPS.toString(),ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),Importance.MEDIUM,CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC).define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC).define(GROUP_INSTANCE_ID_CONFIG,Type.STRING,null,Importance.MEDIUM,GROUP_INSTANCE_ID_DOC).define(SESSION_TIMEOUT_MS_CONFIG,Type.INT,10000,Importance.HIGH,SESSION_TIMEOUT_MS_DOC).define(HEARTBEAT_INTERVAL_MS_CONFIG,Type.INT,3000,Importance.HIGH,HEARTBEAT_INTERVAL_MS_DOC).define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,Type.LIST,Collections.singletonList(RangeAssignor.class),new ConfigDef.NonNullValidator(),Importance.MEDIUM,PARTITION_ASSIGNMENT_STRATEGY_DOC).define(METADATA_MAX_AGE_CONFIG,Type.LONG,5 * 60 * 1000,atLeast(0),Importance.LOW,CommonClientConfigs.METADATA_MAX_AGE_DOC).define(ENABLE_AUTO_COMMIT_CONFIG,Type.BOOLEAN,true,Importance.MEDIUM,ENABLE_AUTO_COMMIT_DOC).define(AUTO_COMMIT_INTERVAL_MS_CONFIG,Type.INT,5000,atLeast(0),Importance.LOW,AUTO_COMMIT_INTERVAL_MS_DOC).define(CLIENT_ID_CONFIG,Type.STRING,"",Importance.LOW,CommonClientConfigs.CLIENT_ID_DOC).define(CLIENT_RACK_CONFIG,Type.STRING,"",Importance.LOW,CommonClientConfigs.CLIENT_RACK_DOC).define(MAX_PARTITION_FETCH_BYTES_CONFIG,Type.INT,DEFAULT_MAX_PARTITION_FETCH_BYTES,atLeast(0),Importance.HIGH,MAX_PARTITION_FETCH_BYTES_DOC).define(SEND_BUFFER_CONFIG,Type.INT,128 * 1024,atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND),Importance.MEDIUM,CommonClientConfigs.SEND_BUFFER_DOC).define(RECEIVE_BUFFER_CONFIG,Type.INT,64 * 1024,atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND),Importance.MEDIUM,CommonClientConfigs.RECEIVE_BUFFER_DOC).define(FETCH_MIN_BYTES_CONFIG,Type.INT,1,atLeast(0),Importance.HIGH,FETCH_MIN_BYTES_DOC).define(FETCH_MAX_BYTES_CONFIG,Type.INT,DEFAULT_FETCH_MAX_BYTES,atLeast(0),Importance.MEDIUM,FETCH_MAX_BYTES_DOC).define(FETCH_MAX_WAIT_MS_CONFIG,Type.INT,500,atLeast(0),Importance.LOW,FETCH_MAX_WAIT_MS_DOC).define(RECONNECT_BACKOFF_MS_CONFIG,Type.LONG,50L,atLeast(0L),Importance.LOW,CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC).define(RECONNECT_BACKOFF_MAX_MS_CONFIG,Type.LONG,1000L,atLeast(0L),Importance.LOW,CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC).define(RETRY_BACKOFF_MS_CONFIG,Type.LONG,100L,atLeast(0L),Importance.LOW,CommonClientConfigs.RETRY_BACKOFF_MS_DOC).define(AUTO_OFFSET_RESET_CONFIG,Type.STRING,"latest", //in("latest", "earliest", "none"),Importance.MEDIUM,AUTO_OFFSET_RESET_DOC).define(CHECK_CRCS_CONFIG,Type.BOOLEAN,true,Importance.LOW,CHECK_CRCS_DOC).define(METRICS_SAMPLE_WINDOW_MS_CONFIG,Type.LONG,30000,atLeast(0),Importance.LOW,CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC).define(METRICS_NUM_SAMPLES_CONFIG,Type.INT,2,atLeast(1),Importance.LOW,CommonClientConfigs.METRICS_NUM_SAMPLES_DOC).define(METRICS_RECORDING_LEVEL_CONFIG,Type.STRING,Sensor.RecordingLevel.INFO.toString(),in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),Importance.LOW,CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC).define(METRIC_REPORTER_CLASSES_CONFIG,Type.LIST,Collections.emptyList(),new ConfigDef.NonNullValidator(),Importance.LOW,CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC).define(KEY_DESERIALIZER_CLASS_CONFIG,Type.CLASS,Importance.HIGH,KEY_DESERIALIZER_CLASS_DOC).define(VALUE_DESERIALIZER_CLASS_CONFIG,Type.CLASS,Importance.HIGH,VALUE_DESERIALIZER_CLASS_DOC).define(REQUEST_TIMEOUT_MS_CONFIG,Type.INT,30000,atLeast(0),Importance.MEDIUM,REQUEST_TIMEOUT_MS_DOC).define(DEFAULT_API_TIMEOUT_MS_CONFIG,Type.INT,60 * 1000,atLeast(0),Importance.MEDIUM,CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC).define(SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,Type.LONG,CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS,Importance.MEDIUM,CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC).define(SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,Type.LONG,CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS,Importance.MEDIUM,CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC)/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,Type.LONG,9 * 60 * 1000,Importance.MEDIUM,CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC).define(INTERCEPTOR_CLASSES_CONFIG,Type.LIST,Collections.emptyList(),new ConfigDef.NonNullValidator(),Importance.LOW,INTERCEPTOR_CLASSES_DOC).define(MAX_POLL_RECORDS_CONFIG,Type.INT,500,atLeast(1),Importance.MEDIUM,MAX_POLL_RECORDS_DOC).define(MAX_POLL_INTERVAL_MS_CONFIG,Type.INT,300000,atLeast(1),Importance.MEDIUM,MAX_POLL_INTERVAL_MS_DOC).define(EXCLUDE_INTERNAL_TOPICS_CONFIG,Type.BOOLEAN,DEFAULT_EXCLUDE_INTERNAL_TOPICS,Importance.MEDIUM,EXCLUDE_INTERNAL_TOPICS_DOC).defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,Type.BOOLEAN,true,Importance.LOW).defineInternal(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED,Type.BOOLEAN,false,Importance.LOW).define(ISOLATION_LEVEL_CONFIG,Type.STRING,DEFAULT_ISOLATION_LEVEL,in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),Importance.MEDIUM,ISOLATION_LEVEL_DOC).define(ALLOW_AUTO_CREATE_TOPICS_CONFIG,Type.BOOLEAN,DEFAULT_ALLOW_AUTO_CREATE_TOPICS,Importance.MEDIUM,ALLOW_AUTO_CREATE_TOPICS_DOC)// security support.define(SECURITY_PROVIDERS_CONFIG,Type.STRING,null,Importance.LOW,SECURITY_PROVIDERS_DOC).define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,Type.STRING,CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,Importance.MEDIUM,CommonClientConfigs.SECURITY_PROTOCOL_DOC).withClientSslSupport().withClientSaslSupport();
}
像auto.offset.reset这个配置默认值为latest一样,再看下ConsumerConfig的几个构造方法
public ConsumerConfig(Properties props) {super(CONFIG, props);}public ConsumerConfig(Map<String, Object> props) {super(CONFIG, props);}
是的,所有的ConsumerConfig构造方法都将上面的默认配置CONFIG传入了构造方法,将下来的处理就是如果显式配置了对应的配置项就使用显式配置数据,没有则使用CONFIG里面的默认配置。
PS:
上面的默认配置除了有一些配置的默认配置,一些枚举属性还有其可选值,比如
auto.offset.reset的可选项