目录
- 1 Avro与Schema Registry
- 2 搭建Schema Registry
- 2.1 下载Confluent并解压
- 2.2 设置环境变量
- 2.3 修改配置
- 2.4 启动服务
- 3 API列表
1 Avro与Schema Registry
Apache Avro 是一种高效的数据序列化系统,用于在不同的应用程序和平台之间传输和存储数据。它提供了一种紧凑且高效的二进制数据编码格式,相比其他常见的序列化方式,Avro能够实现更快的序列化和更小的数据存储。
而Confluent Schema Registry是由Confluent公司提供的一个开源组件,旨在解决分布式系统中的数据模式演化和兼容性的问题。它是建立在Apache Avro之上的一个服务,可以用于集中管理和存储Avro数据的模式(Schema),确保分布式系统中的数据一致性和兼容性。它广泛应用于事件流处理平台(如Kafka),为数据流的可靠性和互操作性提供了支持。
2 搭建Schema Registry
2.1 下载Confluent并解压
curl -O https://packages.confluent.io/archive/7.4/confluent-community-7.4.3.tar.gz
sudo tar xvf confluent-community-7.4.3.tar.gz -C /usr/local/bin
2.2 设置环境变量
vim ~/.bashrc
添加:
export SCHEMA_REG_HOME=/usr/local/bin/confluent-7.4.3
export PAHT=$PAHT:${SCHEMA_REG_HOME}/bin
source ~/.bashrc
2.3 修改配置
获取本机IP
ip addr
修改配置文件:
vim /usr/local/bin/confluent-7.4.3/etc/schema-registry/schema-registry.properties
listeners=http://172.26.143.96:8081
kafkastore.bootstrap.servers=PLAINTEXT://172.26.143.96:9092
2.4 启动服务
schema-registry-start $SCHEMA_REG_HOME/etc/schema-registry/schema-registry.properties
调用API
curl -X GET http://172.26.143.96:8081/subjects
curl -X GET http://172.26.143.96:8081/subjects/product-value/versions
curl -X GET http://172.26.143.96:8081/schemas/ids/3
3 API列表
更多信息可参考:
Confluent Schema Registry开发指南
# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/subjects/Kafka-key/versions{"id":1}# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/subjects/Kafka-value/versions{"id":1}# List all subjects
$ curl -X GET http://localhost:8081/subjects["Kafka-value","Kafka-key"]# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions[1]# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1{"schema":"\"string\""}# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest{"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/33# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value[1, 2, 3, 4, 5]# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/subjects/Kafka-key{"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"schema": "{\"type\": \"string\"}"}' \http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest{"is_compatible":true}# Get top level config
$ curl -X GET http://localhost:8081/config{"compatibilityLevel":"BACKWARD"}# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"compatibility": "NONE"}' \http://localhost:8081/config{"compatibility":"NONE"}# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \--data '{"compatibility": "BACKWARD"}' \http://localhost:8081/config/Kafka-value{"compatibility":"BACKWARD"}