

  • 1、点对点测试
    • 1.1、获取 kafka-consumer-groups.sh 的帮助信息
    • 1.2、列出所有的 消费者组
    • 1.3、创建消费者1并指定组 my_group1
    • 1.4、创建消费者2并指定组 my_group1
    • 1.5、创建消费者3并指定组 my_group1
    • 1.6、创建生产者发送消息到 my_topic1 主题
      • 1.6.1、发送第一条消息romantic
      • 1.6.2、发送第二条消息access
      • 1.6.3、发送第三条消息ability
      • 1.6.4、发送第四条消息accompany
      • 1.6.5、发送第五条消息abandon


1.1、获取 kafka-consumer-groups.sh 的帮助信息

[root@localhost ~]# kafka-consumer-groups.sh --help
Missing required argument "[bootstrap-server]"
Option                                  Description                            
------                                  -----------                            
--all-groups                            Apply to all consumer groups.          
--all-topics                            Consider all topics assigned to a      group in the `reset-offsets` process.
--bootstrap-server <String: server to   REQUIRED: The server(s) to connect to. connect to>                                                                  
--by-duration <String: duration>        Reset offsets to offset by duration    from current timestamp. Format:      'PnDTnHnMnS'                         
--command-config <String: command       Property file containing configs to be config property file>                   passed to Admin Client and Consumer. 
--delete                                Pass in groups to delete topic         partition offsets and ownership      information over the entire consumer group. For instance --group g1 --    group g2                             
--delete-offsets                        Delete offsets of consumer group.      Supports one consumer group at the   time, and multiple topics.           
--describe                              Describe consumer group and list       offset lag (number of messages not   yet processed) related to given      group.                               
--dry-run                               Only show results without executing    changes on Consumer Groups.          Supported operations: reset-offsets. 
--execute                               Execute operation. Supported           operations: reset-offsets.           
--export                                Export operation execution to a CSV    file. Supported operations: reset-   offsets.                             
--from-file <String: path to CSV file>  Reset offsets to values defined in CSV file.                                
--group <String: consumer group>        The consumer group we wish to act on.  
--help                                  Print usage information.               
--list                                  List all consumer groups.              
--members                               Describe members of the group. This    option may be used with '--describe' and '--bootstrap-server' options     only.                                Example: --bootstrap-server localhost: 9092 --describe --group group1 --    members                              
--offsets                               Describe the group and list all topic  partitions in the group along with   their offset lag. This is the        default sub-action of and may be     used with '--describe' and '--       bootstrap-server' options only.      Example: --bootstrap-server localhost: 9092 --describe --group group1 --    offsets                              
--reset-offsets                         Reset offsets of consumer group.       Supports one consumer group at the   time, and instances should be        inactive                             Has 2 execution options: --dry-run     (the default) to plan which offsets  to reset, and --execute to update    the offsets. Additionally, the --    export option is used to export the  results to a CSV format.             You must choose one of the following   reset specifications: --to-datetime, --by-period, --to-earliest, --to-    latest, --shift-by, --from-file, --  to-current.                          To define the scope use --all-topics   or --topic. One scope must be        specified unless you use '--from-    file'.                               
--shift-by <Long: number-of-offsets>    Reset offsets shifting current offset  by 'n', where 'n' can be positive or negative.                            
--state [String]                        When specified with '--describe',      includes the state of the group.     Example: --bootstrap-server localhost: 9092 --describe --group group1 --    state                                When specified with '--list', it       displays the state of all groups. It can also be used to list groups with specific states.                     Example: --bootstrap-server localhost: 9092 --list --state stable,empty     This option may be used with '--       describe', '--list' and '--bootstrap-server' options only.                
--timeout <Long: timeout (ms)>          The timeout that can be set for some   use cases. For example, it can be    used when describing the group to    specify the maximum amount of time   in milliseconds to wait before the   group stabilizes (when the group is  just created, or is going through    some changes). (default: 5000)       
--to-current                            Reset offsets to current offset.       
--to-datetime <String: datetime>        Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'    
--to-earliest                           Reset offsets to earliest offset.      
--to-latest                             Reset offsets to latest offset.        
--to-offset <Long: offset>              Reset offsets to a specific offset.    
--topic <String: topic>                 The topic whose consumer group         information should be deleted or     topic whose should be included in    the reset offset process. In `reset- offsets` case, partitions can be     specified using this format: `topic1:0,1,2`, where 0,1,2 are the          partition to be included in the      process. Reset-offsets also supports multiple topic inputs.               
--verbose                               Provide additional information, if     any, when describing the group. This option may be used with '--          offsets'/'--members'/'--state' and   '--bootstrap-server' options only.   Example: --bootstrap-server localhost: 9092 --describe --group group1 --    members --verbose                    
--version                               Display Kafka version.  

1.2、列出所有的 消费者组

console-consumer-92430 是没有指定组的消费者,默认自己是一组如果没有指定组的消费者长时间关闭,会自动删除组
同一个消费者组内的多个消费者 消费一个主题的消息时就是点对点,每个消息消费一次

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server --list

1.3、创建消费者1并指定组 my_group1

[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server --topic my_topic1 --group my_group1
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server --group my_group1 --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       0          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       2          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1

1.4、创建消费者2并指定组 my_group1

[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server --topic my_topic1 --group my_group1
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server --group my_group1 --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       0          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       2          0               0               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 / consumer-my_group1-1

1.5、创建消费者3并指定组 my_group1

[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server --topic my_topic1 --group my_group1
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server --group my_group1 --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       2          0               0               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 / consumer-my_group1-1
my_group1       my_topic1       0          0               0               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 / consumer-my_group1-1

1.6、创建生产者发送消息到 my_topic1 主题



[root@localhost ~]# kafka-console-producer.sh --bootstrap-server --topic my_topic1


发送一个消息romantic后发现消费者3中的 current-offset 和 log-end-offset 都变成了1

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server --group my_group1 --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       2          0               0               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 / consumer-my_group1-1
my_group1       my_topic1       0          1               1               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 / consumer-my_group1-1



发送一个消息access后发现消费者2中的 current-offset 和 log-end-offset 都变成了1

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server --group my_group1 --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       2          1               1               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 / consumer-my_group1-1
my_group1       my_topic1       0          1               1               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 / consumer-my_group1-1



发送一个消息ability后发现消费者3中的 current-offset 和 log-end-offset 都变成了2,因为第一次消息也是消费者3接收

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server --group my_group1 --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       2          1               1               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 / consumer-my_group1-1
my_group1       my_topic1       0          2               2               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 / consumer-my_group1-1



发送一个消息accompany后发现消费者2中的 current-offset 和 log-end-offset 都变成了2,因为第二次消息也是消费者2接收

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server --group my_group1 --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          0               0               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       2          2               2               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 / consumer-my_group1-1
my_group1       my_topic1       0          2               2               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 / consumer-my_group1-1



发送一个消息abandon后发现消费者1中的 current-offset 和 log-end-offset 都变成了1,因为消费者1是第一次接收到消息

[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server --group my_group1 --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
my_group1       my_topic1       1          1               1               0               consumer-my_group1-1-c2ff5a19-af5c-47fc-9ad9-d6028844f86c / consumer-my_group1-1
my_group1       my_topic1       2          2               2               0               consumer-my_group1-1-c6a31cdb-c924-49bb-99da-cf45ffdbefb6 / consumer-my_group1-1
my_group1       my_topic1       0          2               2               0               consumer-my_group1-1-19852a4a-9a4e-4b41-b605-0a78530d0cd8 / consumer-my_group1-1





