前言
配置中心,通过key=value的形式存储环境变量。配置中心的属性做了修改,项目中可以通过配置中心的依赖(sdk)立即感知到。需要做的就是如何在属性发生变化时,改变带有@ConfigurationProperties的bean的相关属性。
配置中心
在读配置中心源码的时候发现,里面维护了一个Environment,以及ZookeeperPropertySource。当配置中心属性发生变化的时候,清空ZookeeperPropertySource,并放入最新的属性值。
public class ZookeeperPropertySource extends EnumerablePropertySource<Properties>
ZookeeperPropertySource重写了equals和hahscode方法,根据这两个方法可以判定配置中心是否修改了属性。
配置中心定义的属性变量
message.center.channels[0].type=HELIUYAN message.center.channels[0].desc=和留言系统 message.center.channels[1].type=EC_BACKEND message.center.channels[1].desc=电商后台 message.center.channels[2].type=BILL_FLOW message.center.channels[2].desc=话费和流量提醒 message.center.channels[3].type=INTEGRATED_CASHIER message.center.channels[3].desc=综合收银台message.center.businesses[0].type=BIZ_EXP_REMINDER message.center.businesses[0].desc=业务到期提醒 message.center.businesses[0].topic=message-center-biz-expiration-reminder-topic message.center.businesses[1].type=RECHARGE_TRANSACTION_PUSH message.center.businesses[1].desc=充值交易实时推送 message.center.businesses[1].topic=message-center-recharge-transaction-push-topicmessage.center.businesses2Channels[BIZ_EXP_REMINDER]=EC_BACKEND message.center.businesses2Channels[RECHARGE_TRANSACTION_PUSH]=INTEGRATED_CASHIERmessage.center.bizTypeForMsgType[RECHARGE_TRANSACTION_PUSH]=data.type:pay-finish,data.type:rechr-finish,data.type:refund-finish
java属性配置映射类
import org.springframework.boot.context.properties.ConfigurationProperties;import java.util.List; import java.util.Map; import java.util.Objects;/*** @author hujunzheng* @create 2018-06-28 11:37**/ @ConfigurationProperties(prefix = "message.center") public class MessageCenterConstants {private List<Business> businesses;private List<Channel> channels;private Map<String, String> businesses2Channels;private Map<String, String> bizTypeForMsgType;public void setBusinesses(List<Business> businesses) {this.businesses = businesses;}public void setChannels(List<Channel> channels) {this.channels = channels;}public List<Business> getBusinesses() {return businesses;}public List<Channel> getChannels() {return channels;}public Map<String, String> getBusinesses2Channels() {return businesses2Channels;}public void setBusinesses2Channels(Map<String, String> businesses2Channels) {this.businesses2Channels = businesses2Channels;}public Map<String, String> getBizTypeForMsgType() {return bizTypeForMsgType;}public void setBizTypeForMsgType(Map<String, String> bizTypeForMsgType) {this.bizTypeForMsgType = bizTypeForMsgType;}public static class Business implements Comparable<Business> {//业务类型private String type;//业务描述private String desc;//对应 kafka 的 topicprivate String topic;public String getType() {return type;}public void setType(String type) {this.type = type;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}@Overridepublic int compareTo(Business o) {if (type.compareTo(o.type) == 0 || topic.compareTo(o.topic) == 0) {return 0;}return Objects.hash(type, topic);}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;Business business = (Business) o;return Objects.equals(type, business.type) ||Objects.equals(topic, business.topic);}@Overridepublic int hashCode() {return Objects.hash(type, topic);}@Overridepublic String toString() {return "Business{" +"type='" + type + '\'' +", desc='" + desc + '\'' +", topic='" + topic + '\'' +'}';}}public static class Channel implements Comparable<Channel> {//渠道类型private String type;//渠道描述private String desc;public String getType() {return type;}public void setType(String type) {this.type = type;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic int compareTo(Channel o) {return this.type.compareTo(o.type);}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;Channel channel = (Channel) o;return Objects.equals(type, channel.type);}@Overridepublic int hashCode() {return Objects.hash(type);}@Overridepublic String toString() {return "Channel{" +"type='" + type + '\'' +", desc='" + desc + '\'' +'}';}} }
属性刷新方案
@Bean public MergedProperties kafkaMessageMergedProperties() {return ConfigCenterUtils.createToRefreshPropertiesBean(MergedProperties.class); }public static class MergedProperties {private Map<String, MessageCenterConstants.Business> businesses;private Map<String, MessageCenterConstants.Channel> channels;//业务映射渠道private Map<String, String> businesses2Channels;//消息类型映射业务类型private Map<String, String> msgType2BizType;public MergedProperties() throws GeneralException {this.refreshProperties();}private void refreshProperties() throws GeneralException {
//获取到配置中心最新的propertySourceZookeeperPropertySource propertySource = ConfigHelper.getZookeeperPropertySource();MessageCenterConstants messageCenterConstants = null;
//判断属性是否刷新if (ConfigCenterUtils.propertySourceRefresh(propertySource)) {
//将属性binding到带有@ConfigurationProperties注解的类中messageCenterConstants =RelaxedConfigurationBinder.with(MessageCenterConstants.class).setPropertySources(propertySource).doBind();}
//以下是自定义处理,可忽略if (!Objects.isNull(messageCenterConstants)) {//Business.type <-> Businessthis.setBusinesses(Maps.newHashMap(Maps.uniqueIndex(Sets.newHashSet(messageCenterConstants.getBusinesses()), business -> business.getType())));//Channel.type <-> Channelthis.setChannels(Maps.newHashMap(Maps.uniqueIndex(Sets.newHashSet(messageCenterConstants.getChannels()), channel -> channel.getType())));//business <-> channelsthis.setBusinesses2Channels(messageCenterConstants.getBusinesses2Channels());//消息类型映射业务类型this.setMsgType2BizType(messageCenterConstants.getBizTypeForMsgType().entrySet().stream().map(entry -> {Map<String, String> tmpMap = Maps.newHashMap();if (StringUtils.isBlank(entry.getValue())) {return tmpMap;}Arrays.stream(entry.getValue().split(",")).forEach(value -> tmpMap.put(value, entry.getKey()));return tmpMap;}).flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())));}}
//刷新方法private void catchRefreshProperties() {try {this.refreshProperties();} catch (Exception e) {LOGGER.error("KafkaMessageConfig 配置中心属性刷新失败", e);}}
//get方法上指定刷新属性@ToRefresh(method = "catchRefreshProperties")public Map<String, MessageCenterConstants.Business> getBusinesses() {return businesses;}public void setBusinesses(Map<String, MessageCenterConstants.Business> businesses) {this.businesses = businesses;}@ToRefresh(method = "catchRefreshProperties")public Map<String, MessageCenterConstants.Channel> getChannels() {return channels;}public void setChannels(Map<String, MessageCenterConstants.Channel> channels) {this.channels = channels;}@ToRefresh(method = "catchRefreshProperties")public Map<String, String> getBusinesses2Channels() {return businesses2Channels;}public void setBusinesses2Channels(Map<String, String> businesses2Channels) {this.businesses2Channels = businesses2Channels;}@ToRefresh(method = "catchRefreshProperties")public Map<String, String> getMsgType2BizType() {return msgType2BizType;}public void setMsgType2BizType(Map<String, String> msgType2BizType) {this.msgType2BizType = msgType2BizType;} }
工具类
ConfigCenterUtils
import com.cmos.cfg.core.ConfigHelper; import com.cmos.cfg.zookeeper.ZookeeperPropertySource; import org.apache.commons.lang3.StringUtils; import org.springframework.cglib.proxy.Enhancer; import org.springframework.cglib.proxy.MethodInterceptor; import org.springframework.cglib.proxy.MethodProxy; import org.springframework.core.BridgeMethodResolver; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.util.ReflectionUtils;import java.lang.reflect.Method; import java.util.Objects;/*** @author hujunzheng* @create 2018-07-04 15:45**/ public class ConfigCenterUtils {private static ZookeeperPropertySource propertySource = ConfigHelper.getZookeeperPropertySource();
//判断配置中心属性是否刷新public synchronized static boolean propertySourceRefresh(ZookeeperPropertySource newPropertySource) {if (propertySource.equals(newPropertySource)) {return false;}if (propertySource.hashCode() == newPropertySource.hashCode()) {return false;}propertySource = newPropertySource;return true;}
//创建代理类,代理@ToRefresh注解的方法,调用相应的刷新方法public static <T> T createToRefreshPropertiesBean(Class<T> clazz) {Enhancer enhancer = new Enhancer();// 设置代理对象父类 enhancer.setSuperclass(clazz);// 设置增强enhancer.setCallback(new MethodInterceptor() {@Overridepublic Object intercept(Object target, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {ToRefresh toRefresh = AnnotationUtils.findAnnotation(method, ToRefresh.class);if (Objects.isNull(toRefresh) || StringUtils.isBlank(toRefresh.method())) {return methodProxy.invokeSuper(target, args);}Method refreshMethod = ReflectionUtils.findMethod(target.getClass(), toRefresh.method());if (Objects.isNull(refreshMethod)) {return methodProxy.invokeSuper(target, args);}refreshMethod = BridgeMethodResolver.findBridgedMethod(refreshMethod);refreshMethod.setAccessible(true);refreshMethod.invoke(target, null);return methodProxy.invokeSuper(target, args);}});return (T) enhancer.create();// 创建代理对象 } }
import org.apache.commons.lang3.StringUtils;import java.lang.annotation.Documented; import java.lang.annotation.Retention; import java.lang.annotation.Target;import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.RetentionPolicy.RUNTIME;/*** @author hujunzheng* @create 2018-07-06 9:59**/ @Target({METHOD}) @Retention(RUNTIME) @Documented public @interface ToRefresh {//刷新方法String method() default StringUtils.EMPTY; }
RelaxedConfigurationBinder
动态将propertysource绑定到带有@ConfigurationProperties注解的bean中
参考:org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor
import com.cmos.common.exception.GeneralException; import org.springframework.boot.bind.PropertiesConfigurationFactory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.core.env.*; import org.springframework.validation.Validator; import org.springframework.validation.beanvalidation.SpringValidatorAdapter;import javax.validation.Validation;import static org.springframework.core.annotation.AnnotatedElementUtils.getMergedAnnotation;/*** @author hujunzheng* @create 2018-07-03 18:01** 不强依赖ConfigurationProperties,进行配置注入**/ public class RelaxedConfigurationBinder<T> {private final PropertiesConfigurationFactory<T> factory;public RelaxedConfigurationBinder(T object) {this(new PropertiesConfigurationFactory<>(object));}public RelaxedConfigurationBinder(Class<T> type) {this(new PropertiesConfigurationFactory<>(type));}public static <T> RelaxedConfigurationBinder<T> with(T object) {return new RelaxedConfigurationBinder<>(object);}public static <T> RelaxedConfigurationBinder<T> with(Class<T> type) {return new RelaxedConfigurationBinder<>(type);}public RelaxedConfigurationBinder(PropertiesConfigurationFactory<T> factory) {this.factory = factory;ConfigurationProperties properties = getMergedAnnotation(factory.getObjectType(), ConfigurationProperties.class);javax.validation.Validator validator = Validation.buildDefaultValidatorFactory().getValidator();factory.setValidator(new SpringValidatorAdapter(validator));factory.setConversionService(new DefaultConversionService());if (null != properties) {factory.setIgnoreNestedProperties(properties.ignoreNestedProperties());factory.setIgnoreInvalidFields(properties.ignoreInvalidFields());factory.setIgnoreUnknownFields(properties.ignoreUnknownFields());factory.setTargetName(properties.prefix());factory.setExceptionIfInvalid(properties.exceptionIfInvalid());}}public RelaxedConfigurationBinder<T> setTargetName(String targetName) {factory.setTargetName(targetName);return this;}public RelaxedConfigurationBinder<T> setPropertySources(PropertySource<?>... propertySources) {MutablePropertySources sources = new MutablePropertySources();for (PropertySource<?> propertySource : propertySources) {sources.addLast(propertySource);}factory.setPropertySources(sources);return this;}public RelaxedConfigurationBinder<T> setPropertySources(Environment environment) {factory.setPropertySources(((ConfigurableEnvironment) environment).getPropertySources());return this;}public RelaxedConfigurationBinder<T> setPropertySources(PropertySources propertySources) {factory.setPropertySources(propertySources);return this;}public RelaxedConfigurationBinder<T> setConversionService(ConversionService conversionService) {factory.setConversionService(conversionService);return this;}public RelaxedConfigurationBinder<T> setValidator(Validator validator) {factory.setValidator(validator);return this;}public RelaxedConfigurationBinder<T> setResolvePlaceholders(boolean resolvePlaceholders) {factory.setResolvePlaceholders(resolvePlaceholders);return this;}public T doBind() throws GeneralException {try {return factory.getObject();} catch (Exception ex) {throw new GeneralException("配置绑定失败!", ex);}} }