文章目录
- 概念
- 基于ServiceDiscovery实现服务自动注册和发现
- Service:服务基本信息
- InstanceDetails:封装实例用过来保存到zk中
- ServiceProvider:服务提供者
- ServiceConsumer:服务消费者
- 运行
- 基于ServiceDiscovery、ServiceCache实现服务自动注册和发现
- Registry:服务注册Registry接口
- ZookeeperRegistry:基于zk 实现服务注册发现
- ServerInfo:注册信息
- ServiceProviderByCache:服务提供者
- ServiceComsumerByCache:服务发现者
概念
学习文章:https://blog.csdn.net/qq_34021712/article/details/82887942
基于ServiceDiscovery实现服务自动注册和发现
Service:服务基本信息
package com.demo.rpc.curator;import java.util.List;/*** @author: weijie* @Date: 2020/9/27 16:32* @Description: Service:方法、方法描述、方法参数*/
public class Service {private String methodName;private String desc;private List<String> params;public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}public List<String> getParams() {return params;}public void setParams(List<String> params) {this.params = params;}
}
InstanceDetails:封装实例用过来保存到zk中
package com.demo.rpc.curator;import java.util.HashMap;
import java.util.Map;public class InstanceDetails {//zk根节点路径public static final String ROOT_PATH = "/service";//该服务拥有哪些方法public Map<String, Service> services = new HashMap<>();//服务描述private String serviceDesc;public InstanceDetails() {}public InstanceDetails(String serviceDesc, Map<String, Service> services) {this.services = services;this.serviceDesc = serviceDesc;}public static String getRootPath() {return ROOT_PATH;}public Map<String, Service> getServices() {return services;}public void setServices(Map<String, Service> services) {this.services = services;}public String getServiceDesc() {return serviceDesc;}public void setServiceDesc(String serviceDesc) {this.serviceDesc = serviceDesc;}
}
ServiceProvider:服务提供者
package com.demo.rpc.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.*;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;public class ServiceProvider {public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",2000,2000, new ExponentialBackoffRetry(1000, 3));client.start();client.blockUntilConnected();//服务构造器ServiceInstanceBuilder<InstanceDetails> serviceInstanceBuilder = ServiceInstance.builder();//该服务中所有接口Map<String, Service> services = new HashMap<>();Service addOrderService = new Service();addOrderService.setDesc("添加订单");addOrderService.setMethodName("addOrder");addOrderService.setParams(Arrays.asList("uid", "createTime", "sid", "status"));services.put("addOrder", addOrderService);//添加删除订单服务接口Service delOrderService = new Service();delOrderService.setDesc("删除订单");delOrderService.setMethodName("delOrder");delOrderService.setParams(Arrays.asList("sid"));services.put("delOrder", delOrderService);InstanceDetails payload = new InstanceDetails("订单服务", services);//将服务添加到ServiceInstanceServiceInstance<InstanceDetails> orderService = serviceInstanceBuilder.address("127.0.0.1").port(8080).name("OrderService").payload(payload).uriSpec(new UriSpec("{scheme}://{address}:{port}")).build();//构建ServiceDiscovery用来做服务发现ServiceDiscovery<InstanceDetails> serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).serializer(new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class)).basePath(InstanceDetails.ROOT_PATH).build();//服务注册serviceDiscovery.registerService(orderService);serviceDiscovery.start();System.out.println("服务注册成功");//创建钩子方法正常关闭Runtime.getRuntime().addShutdownHook(new Thread(() -> {try {serviceDiscovery.close();client.close();} catch (IOException e) {e.printStackTrace();}System.out.println("shutdown hook...");}));//保持服务运行状态ServiceProvider serviceProvider = new ServiceProvider();synchronized (serviceProvider){serviceProvider.wait();}}}
ServiceConsumer:服务消费者
package com.demo.rpc.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;public class ServiceConsumer {public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));client.start();client.blockUntilConnected();//构建ServiceDiscovery用来做服务发现ServiceDiscovery<InstanceDetails> serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(InstanceDetails.ROOT_PATH).serializer(new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class)).build();serviceDiscovery.start();//模拟请求调用服务while (true) {Collection<ServiceInstance<InstanceDetails>> orderServices = serviceDiscovery.queryForInstances("OrderService");if (orderServices.size() == 0) {System.out.println("当前没有发现服务");TimeUnit.SECONDS.sleep(10);continue;}for (ServiceInstance<InstanceDetails> service : orderServices) {//获取请求的scheme 例如:http://127.0.0.1:8080String uriSpec = service.buildUriSpec();//获取服务的其他信息InstanceDetails payload = service.getPayload();//服务描述String serviceDesc = payload.getServiceDesc();//获取该服务下的所有接口Map<String, Service> allService = payload.getServices();Set<Map.Entry<String, Service>> entries = allService.entrySet();for (Map.Entry<String, Service> entry : entries) {System.out.println(serviceDesc + uriSpec+ "/" + service.getName()+ "/" + entry.getKey()+ " 该方法需要的参数为:"+ entry.getValue().getParams().toString());}}System.out.println("---------------------");Thread.sleep(5 * 1000);}}}
运行
基于ServiceDiscovery、ServiceCache实现服务自动注册和发现
Registry:服务注册Registry接口
package com.demo.rpc.curator;import org.apache.curator.x.discovery.ServiceInstance;import java.util.List;public interface Registry<T> {void registerService(ServiceInstance<T> service) throws Exception;void unRegisterService(ServiceInstance<T> service) throws Exception;List<ServiceInstance<T>> queryForInstances(String name) throws Exception;}
ZookeeperRegistry:基于zk 实现服务注册发现
package com.demo.rpc.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;import java.util.List;
import java.util.stream.Collectors;public class ZookeeperRegistry<T> implements Registry{private InstanceSerializer serializer = new JsonInstanceSerializer(ServerInfo.class);private ServiceDiscovery<T> serviceDiscovery;private ServiceCache<T> serviceCache;private String ROOT = "/dubbo-demo";private String serviceName = "demoService";private String host;public ZookeeperRegistry(String host) {this.host = host;}public void start() throws Exception{CuratorFramework client = CuratorFrameworkFactory.newClient(this.host, new ExponentialBackoffRetry(1000, 5));client.start();serviceDiscovery = ServiceDiscoveryBuilder.builder(ServerInfo.class).client(client).basePath(ROOT).serializer(serializer).build();serviceDiscovery.start();//监听zk节点信息变化,方便后续的读取serviceCache = serviceDiscovery.serviceCacheBuilder()//多个serviceName如何监听??.name(serviceName).build();serviceDiscovery.start();serviceCache.start();}@Overridepublic void registerService(ServiceInstance service) throws Exception {serviceDiscovery.registerService(service);}@Overridepublic void unRegisterService(ServiceInstance service) throws Exception {serviceDiscovery.unregisterService(service);}/*** @param name* @return 根据name查询缓存数据* @throws Exception*/@Overridepublic List<ServiceInstance> queryForInstances(String name) throws Exception {return serviceCache.getInstances().stream().filter(s -> s.getName().equals(name)).collect(Collectors.toList());}
}
ServerInfo:注册信息
package com.demo.rpc.curator;import java.io.Serializable;public class ServerInfo implements Serializable {private String host;private int port;public ServerInfo() {}public ServerInfo(String host, int port) {this.host = host;this.port = port;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}@Overridepublic String toString() {return "ServerInfo{" +"host='" + host + '\'' +", port=" + port +'}';}
}
ServiceProviderByCache:服务提供者
package com.demo.rpc.curator;import org.apache.curator.x.discovery.ServiceInstance;public class ServiceProviderByCache {public static void main(String[] args) throws Exception {ZookeeperRegistry<ServerInfo> zookeeperRegistry = new ZookeeperRegistry<>("127.0.0.1:2181");zookeeperRegistry.start();ServerInfo serverInfo = new ServerInfo("127.0.0.1", 8080);zookeeperRegistry.registerService(ServiceInstance.builder().name("demoService").payload(serverInfo).build());synchronized (ServiceProviderByCache.class){ServiceProviderByCache.class.wait();}}}
ServiceComsumerByCache:服务发现者
package com.demo.rpc.curator;import org.apache.curator.x.discovery.ServiceInstance;import java.util.List;public class ServiceComsumerByCache {public static void main(String[] args) throws Exception {ZookeeperRegistry<ServerInfo> zookeeperRegistry = new ZookeeperRegistry<>("127.0.0.1:2181");zookeeperRegistry.start();// while (true){List<ServiceInstance> serviceInstances = zookeeperRegistry.queryForInstances("demoService");for (ServiceInstance serviceInstance : serviceInstances){System.out.println(serviceInstance.getPayload().toString());}
// }}}