//A bounded BlockingQueue backed by an array.  
//This queue orders elements FIFO (first-in-first-out).  
//The head of the queue is that element that has been on the queue the longest time.  
//The tail of the queue is that element that has been on the queue the shortest time. 
//New elements are inserted at the tail of the queue, 
//and the queue retrieval operations obtain elements at the head of the queue.
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, { //The queued itemsfinal Object[] items;//items index for next take, poll, peek or removeint takeIndex;//items index for next put, offer, or addint putIndex;//Number of elements in the queueint count;//Main lock guarding all accessfinal ReentrantLock lock;//Condition for waiting takesprivate final Condition notEmpty;//Condition for waiting putsprivate final Condition notFull;//Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.public ArrayBlockingQueue(int capacity) {this(capacity, false);}//Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0) {throw new IllegalArgumentException();}this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}//Inserts the specified element at the tail of this queue, //waiting for space to become available if the queue is full.public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {notFull.await();}enqueue(e);} finally {lock.unlock();}}//Inserts element at current put position, advances, and signals.//Call only when holding lock.private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length) {putIndex = 0;}count++;notEmpty.signal();}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {notEmpty.await();}return dequeue();} finally {lock.unlock();}}//Returns the number of elements in this queue.public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}...



//An optionally-bounded BlockingQueue based on linked nodes.
//This queue orders elements FIFO (first-in-first-out).
//The head of the queue is that element that has been on the queue the longest time.
//The tail of the queue is that element that has been on the queue the shortest time. 
//New elements are inserted at the tail of the queue, 
//and the queue retrieval operations obtain elements at the head of the queue.
//Linked queues typically have higher throughput than array-based queues 
//but less predictable performance in most concurrent applications.
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, {...//The capacity bound, or Integer.MAX_VALUE if noneprivate final int capacity;//Current number of elementsprivate final AtomicInteger count = new AtomicInteger();//Head of linked list.transient Node<E> head;//Tail of linked list.private transient Node<E> last;//Lock held by take, poll, etcprivate final ReentrantLock takeLock = new ReentrantLock();//Lock held by put, offer, etcprivate final ReentrantLock putLock = new ReentrantLock();//Wait queue for waiting takesprivate final Condition notEmpty = takeLock.newCondition();//Wait queue for waiting putsprivate final Condition notFull = putLock.newCondition();//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {last = = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}private E dequeue() {Node<E> h = head;Node<E> first =; = h; // help GChead = first;E x = first.item;first.item = null;return x;}public int size() {return count.get();}...




//An unbounded BlockingQueue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations.  
//While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). 
//This class does not permit null elements.
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, { //Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, //if comparator is null: For each node n in the heap and each descendant d of n, n <= d.//The element with the lowest value is in queue[0], assuming the queue is nonempty.private transient Object[] queue;//The number of elements in the priority queue.private transient int size;//The comparator, or null if priority queue uses elements' natural ordering.private transient Comparator<? super E> comparator;//Lock used for all public operationsprivate final ReentrantLock lock;//Condition for blocking when emptyprivate final Condition notEmpty;//Spinlock for allocation, acquired via CAS.private transient volatile int allocationSpinLock;//Creates a PriorityBlockingQueue with the default initial capacity (11) that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that //orders its elements according to their Comparable natural ordering.public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}//Creates a PriorityBlockingQueue with the specified initial capacity that orders its elements according to the specified comparator.public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {if (initialCapacity < 1) {throw new IllegalArgumentException();}this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never block.public void put(E e) {offer(e); // never need to block}//Inserts the specified element into this priority queue.//As the queue is unbounded, this method will never return false.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length)) {tryGrow(array, cap);}try {Comparator<? super E> cmp = comparator;if (cmp == null) {siftUpComparable(n, e, array);} else {siftUpUsingComparator(n, e, array, cmp);}size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}//Tries to grow array to accommodate at least one more element (but normally expand by about 50%), //giving up (allowing retry) on contention (which we expect to be rare). Call only while holding lock.private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {int minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE) {throw new OutOfMemoryError();}newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array) {newArray = new Object[newCap];}} finally {allocationSpinLock = 0;}}if (newArray == null) {// back off if another thread is allocatingThread.yield();}lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = key;}private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {while(k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (, (T) e) >= 0) {break;}array[k] = e;k = parent;}array[k] = x;}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null) {notEmpty.await();}} finally {lock.unlock();}return result;}public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return size;} finally {lock.unlock();}}...








//An unbounded BlockingQueue of Delayed elements, in which an element can only be taken when its delay has expired.
//The head of the queue is that Delayed element whose delay expired furthest in the past.
//If no delay has expired there is no head and poll will return null. 
//Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.
//Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. 
//For example, the size method returns the count of both expired and unexpired elements.
//This queue does not permit null elements.
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();//Thread designated to wait for the element at the head of the queue.//When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.//The leader thread must signal some other thread before returning from take() or poll(...), //unless some other thread becomes leader in the interim.  //Whenever the head of the queue is replaced with an element with an earlier expiration time, //the leader field is invalidated by being reset to null, and some waiting thread, //but not necessarily the current leader, is signalled. //So waiting threads must be prepared to acquire and lose leadership while waiting.private Thread leader = null;//Condition signalled when a newer element becomes available at the head of the queue or a new thread may need to become leader.private final Condition available = lock.newCondition();//Creates a new {@code DelayQueue} that is initially empty.public DelayQueue() {}//Inserts the specified element into this delay queue. //As the queue is unbounded this method will never block.public void put(E e) {offer(e);}//Inserts the specified element into this delay queue.public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}//Retrieves and removes the head of this queue, //waiting if necessary until an element with an expired delay is available on this queue.public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {available.await();} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) {return q.poll();}  first = null; // don't retain ref while waitingif (leader != null) {available.await();} else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread) {leader = null;}}}}}} finally {if (leader == null && q.peek() != null) {available.signal();}lock.unlock();}}...
}public class PriorityQueue<E> extends AbstractQueue<E> implements {//Priority queue represented as a balanced binary heap: the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)].//The priority queue is ordered by comparator, or by the elements' natural ordering, if comparator is null: //For each node n in the heap and each descendant d of n, n <= d.  //The element with the lowest value is in queue[0], assuming the queue is nonempty.transient Object[] queue; //The number of elements in the priority queue.private int size = 0;//The comparator, or null if priority queue uses elements' natural ordering.private final Comparator<? super E> comparator;public E peek() {return (size == 0) ? null : (E) queue[0];}//Inserts the specified element into this priority queue.public boolean offer(E e) {if (e == null) {throw new NullPointerException();}modCount++;int i = size;if (i >= queue.length) {grow(i + 1);}size = i + 1;if (i == 0) {queue[0] = e;} else {siftUp(i, e);}return true;}//Increases the capacity of the array.private void grow(int minCapacity) {int oldCapacity = queue.length;// Double size if small; else grow by 50%int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1));// overflow-conscious codeif (newCapacity - MAX_ARRAY_SIZE > 0) {newCapacity = hugeCapacity(minCapacity);}queue = Arrays.copyOf(queue, newCapacity);}private void siftUp(int k, E x) {if (comparator != null) {siftUpUsingComparator(k, x);} else {siftUpComparable(k, x);}}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (key.compareTo((E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = key;}@SuppressWarnings("unchecked")private void siftUpUsingComparator(int k, E x) {while (k > 0) {int parent = (k - 1) >>> 1;Object e = queue[parent];if (, (E) e) >= 0) {break;}queue[k] = e;k = parent;}queue[k] = x;}...






public class Executors {...//Creates a thread pool that creates new threads as needed, //but will reuse previously constructed threads when they are available.//These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.//Calls to execute will reuse previously constructed threads if available. //If no existing thread is available, a new thread will be created and added to the pool. //Threads that have not been used for sixty seconds are terminated and removed from the cache. //Thus, a pool that remains idle for long enough will not consume any resources. //Note that pools with similar properties but different details (for example, timeout parameters)//may be created using ThreadPoolExecutor constructors.public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}...




























然后使用ReentrantLock.lockInterruptibly()方法来获取一个可被中断的锁,加锁的目的是保证数据添加到队列过程中的安全性 + 避免队列长度超阈值。




public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, {...private final int capacity;//阻塞队列的最大容量,默认是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁//使用两个Condition,分别阻塞和唤醒出队时的线程和入队时的线程private final Condition notEmpty = takeLock.newCondition();//出队的等待队列conditionprivate final Condition notFull = putLock.newCondition();//入队的等待队列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}//Inserts the specified element at the tail of this queue, //waiting if necessary for space to become available.public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;//将添加的元素封装成一个Node对象Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//当前队列元素的数量putLock.lockInterruptibly();//加可被中断的锁try {//注意:这里用到了一个很重要的属性notFull,它是一个Condition对象,用来阻塞和唤醒生产者线程//如果阻塞队列当前的元素个数等于最大容量,就调用notFull.await()方法来阻塞生产者线程while (count.get() == capacity) {notFull.await();//阻塞当前线程,并释放锁}//把封装的Node对象存储到链表中enqueue(node);//通过AtomicInteger来递增当前阻塞队列中的元素个数,用于后续判断是否已超阻塞队列的最大容量c = count.getAndIncrement();//根据AtomicInteger类型的变量判断队列元素是否已超阈值if (c + 1 < capacity) {notFull.signal();}} finally {putLock.unlock();//释放锁}if (c == 0) {signalNotEmpty();}}//Links node at end of queue.private void enqueue(Node<E> node) {//node先成为当前last的next//然后last又指向last的next(即node)last = = node;}//Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock takeLock.)private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}...







public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, {...private final int capacity;//阻塞队列的最大容量,默认是Integer.MAX_VALUEprivate final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁//使用两个Condition,分别阻塞和唤醒出队时的线程和入队时的线程private final Condition notEmpty = takeLock.newCondition();//出队的等待队列conditionprivate final Condition notFull = putLock.newCondition();//入队的等待队列condition//Creates a LinkedBlockingQueue with a capacity of Integer#MAX_VALUE.public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}//Creates a LinkedBlockingQueue with the given (fixed) capacity.public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//获取一个可中断的锁takeLock.lockInterruptibly();try {//判断元素个数是否为0while (count.get() == 0) {notEmpty.await();//阻塞当前线程并释放锁}//调用dequeue()方法从链表中获取一个元素x = dequeue();//通过AtomicInteger来递减当前阻塞队列中的元素个数c = count.getAndDecrement();//判断阻塞队列中的元素个数是否大于1//如果是,则调用notEmpty.signal()唤醒被阻塞的消费者消除if (c > 1) {notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity) {signalNotFull();}return x;}//首先获取链表的头结点head//然后拿到头结点的下一个结点first//然后把原来的头结点从队列中移除,设置first结点的数据为null,并将first结点设置为新的头结点//最后返回first结点的数据private E dequeue() {Node<E> h = head;//h指向headNode<E> first =;//first指向h的 = h;// help GChead = first;E x = first.item;first.item = null;return x;}...













public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, {...private final AtomicInteger count = new AtomicInteger();//阻塞队列的元素个数transient Node<E> head;//链表的头结点private transient Node<E> last;//链表的尾结点//使用两把锁,使put()和take()的锁分离,提升并发性能private final ReentrantLock takeLock = new ReentrantLock();//出队的锁private final ReentrantLock putLock = new ReentrantLock();//入队的锁public int size() {return count.get();}public Iterator<E> iterator() {return new Itr();}private class Itr implements Iterator<E> {private Node<E> current;private Node<E> lastRet;private E currentElement;Itr() {fullyLock();try {current =;if (current != null) {currentElement = current.item;}} finally {fullyUnlock();}}...}void fullyLock() {putLock.lock();takeLock.lock();}void fullyUnlock() {takeLock.unlock();putLock.unlock();}...






























此外还要估算有界队列LinkedBlockingQueue的队列长度应设置多少才合适。假如每一条需要同步给集群其他实例的请求信息,有6个字段,占30字节。平均每一条batch信息会包含100条请求信息,也就是会占3000字节 = 3KB。那么1000条batch消息,才占用3000KB = 3MB。因此可以设置第二个内存队列LinkedBlockingQueue的长度为1000。


public class PeersReplicator {//集群同步生成batch的间隔时间:500msprivate static final long PEERS_REPLICATE_BATCH_INTERVAL = 500;private static final PeersReplicator instance = new PeersReplicator();private PeersReplicator() {//启动接收请求和打包batch的线程AcceptorBatchThread acceptorBatchThread = new AcceptorBatchThread();acceptorBatchThread.setDaemon(true); acceptorBatchThread.start();//启动同步发送batch的线程PeersReplicateThread peersReplicateThread = new PeersReplicateThread();peersReplicateThread.setDaemon(true);peersReplicateThread.start(); }public static PeersReplicator getInstance() {return instance;}//第一个内存队列:处理高并发的服务请求,所以存在高并发的写入情况,无界队列private ConcurrentLinkedQueue<AbstractRequest> acceptorQueue = new ConcurrentLinkedQueue<AbstractRequest>();//第二个内存队列:有界队列,用于同步batch消息到其他集群实例private LinkedBlockingQueue<PeersReplicateBatch> replicateQueue = new LinkedBlockingQueue<PeersReplicateBatch>(10000);  //同步服务注册请求public void replicateRegister(RegisterRequest request) {request.setType(AbstractRequest.REGISTER_REQUEST); //将请求消息放入第一个内存队列acceptorQueue.offer(request);}//同步服务下线请求public void replicateCancel(CancelRequest request) {request.setType(AbstractRequest.CANCEL_REQUEST);//将请求消息放入第一个内存队列acceptorQueue.offer(request);}//同步发送心跳请求public void replicateHeartbeat(HeartbeatRequest request) {request.setType(AbstractRequest.HEARTBEAT_REQUEST);//将请求消息放入第一个内存队列acceptorQueue.offer(request);}//负责接收数据以及打包为batch的后台线程class AcceptorBatchThread extends Thread {long latestBatchGeneration = System.currentTimeMillis();@Overridepublic void run() {while(true) {try {//每隔500ms生成一个batchPeersReplicateBatch batch = new PeersReplicateBatch();long now = System.currentTimeMillis();if (now - latestBatchGeneration >= PEERS_REPLICATE_BATCH_INTERVAL) {//已经到了500ms的时间间隔//将batch消息放入第二个内存队列replicateQueue.offer(batch);//更新latestBatchGenerationlatestBatchGeneration = System.currentTimeMillis();//重置batchbatch = new PeersReplicateBatch();} else {//还没到500ms的时间间隔//从第一层队列获取数据,然后batch放入到第二层队列中AbstractRequest request = acceptorQueue.poll();if (request != null) {batch.add(request);  } else {Thread.sleep(100);}            }} catch (Exception e) {e.printStackTrace(); }}}}//集群同步线程class PeersReplicateThread extends Thread {@Overridepublic void run() {while(true) {try {PeersReplicateBatch batch = replicateQueue.take();if (batch != null) {//遍历其他的register-server地址//给每个地址的register-server都发送一个http请求同步batchSystem.out.println("给其他的register-server发送请求,同步batch......");      }} catch (Exception e) {e.printStackTrace(); }}}}
public class PeersReplicateBatch {private List<AbstractRequest> requests = new ArrayList<AbstractRequest>();public void add(AbstractRequest request) {this.requests.add(request);}public List<AbstractRequest> getRequests() {return requests;}public void setRequests(List<AbstractRequest> requests) {this.requests = requests;}
public class RegisterServerController {//服务注册表private ServiceRegistry registry = ServiceRegistry.getInstance();//服务注册表的缓存private ServiceRegistryCache registryCache = ServiceRegistryCache.getInstance();//集群同步组件private PeersReplicator peersReplicator = PeersReplicator.getInstance();//服务注册public RegisterResponse register(RegisterRequest registerRequest) {RegisterResponse registerResponse = new RegisterResponse();try {//在注册表中加入这个服务实例ServiceInstance serviceInstance = new ServiceInstance();serviceInstance.setHostname(registerRequest.getHostname()); serviceInstance.setIp(registerRequest.getIp()); serviceInstance.setPort(registerRequest.getPort()); serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId()); serviceInstance.setServiceName(registerRequest.getServiceName());registry.register(serviceInstance);//更新自我保护机制的阈值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() + 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//过期掉注册表缓存registryCache.invalidate();//进行集群同步peersReplicator.replicateRegister(registerRequest);registerResponse.setStatus(RegisterResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); registerResponse.setStatus(RegisterResponse.FAILURE);}return registerResponse;}//服务下线  public void cancel(CancelRequest cancelRequest) {//从服务注册中摘除实例registry.remove(cancelRequest.getServiceName(), cancelRequest.getServiceInstanceId());//更新自我保护机制的阈值synchronized (SelfProtectionPolicy.class) {SelfProtectionPolicy selfProtectionPolicy = SelfProtectionPolicy.getInstance();selfProtectionPolicy.setExpectedHeartbeatRate(selfProtectionPolicy.getExpectedHeartbeatRate() - 2);selfProtectionPolicy.setExpectedHeartbeatThreshold((long)(selfProtectionPolicy.getExpectedHeartbeatRate() * 0.85));}//过期掉注册表缓存registryCache.invalidate();//进行集群同步peersReplicator.replicateCancel(cancelRequest);  }//发送心跳public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) { HeartbeatResponse heartbeatResponse = new HeartbeatResponse();try {//获取服务实例ServiceInstance serviceInstance = registry.getServiceInstance(heartbeatRequest.getServiceName(), heartbeatRequest.getServiceInstanceId());if (serviceInstance != null) {serviceInstance.renew();}//记录一下每分钟的心跳的次数HeartbeatCounter heartbeatMessuredRate = HeartbeatCounter.getInstance();heartbeatMessuredRate.increment();//进行集群同步peersReplicator.replicateHeartbeat(heartbeatRequest);heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS); } catch (Exception e) {e.printStackTrace(); heartbeatResponse.setStatus(HeartbeatResponse.FAILURE); }return heartbeatResponse;}//同步batch数据public void replicateBatch(PeersReplicateBatch batch) {for (AbstractRequest request : batch.getRequests()) {if (request.getType().equals(AbstractRequest.REGISTER_REQUEST)) {register((RegisterRequest) request);} else if (request.getType().equals(AbstractRequest.CANCEL_REQUEST)) {cancel((CancelRequest) request);} else if (request.getType().equals(AbstractRequest.HEARTBEAT_REQUEST)) {heartbeat((HeartbeatRequest) request); }}}//拉取全量注册表public Applications fetchFullRegistry() {return (Applications) registryCache.get(CacheKey.FULL_SERVICE_REGISTRY);}//拉取增量注册表public DeltaRegistry fetchDeltaRegistry() {return (DeltaRegistry) registryCache.get(CacheKey.DELTA_SERVICE_REGISTRY); }




