文章目录
- 一、观察者模式
- 1. 观察者模式基本概念
- 2. 观察者模式的应用场景
- 3. 观察者模式的类图
- 二、设计异步多渠道群发框架
- 2.1. 定义消息观察者抽象接口
- 2.2. 创建观察者
- 2.3. 主题通知所有观察者
- 2.4. 观察者注册
- 2.5. 自定义线程池
- 2.6. 签单通知入口
- 2.6. 异步通知接口测试
- 2.7. 依赖
- 三、Spring事件通知
- 3.1. 定义消息实体类
- 3.2. 定义(邮件)事件通知
- 3.3. 定义(短信)事件通知
- 3.4. 签单同步通知入口
- 3.5. 测试效果
- 3.6. 开源项目
一、观察者模式
1. 观察者模式基本概念
一个对象状态改变,通知给其他所有的对象
2. 观察者模式的应用场景
Zk的事件监听、分布式配置中心刷新配置文件、业务中群发不同渠道消息
3. 观察者模式的类图
二、设计异步多渠道群发框架
2.1. 定义消息观察者抽象接口
package com.gblfy.observer;import com.alibaba.fastjson.JSONObject;/*** 定义消息观察者(ObServer)抽象接口** @author gblfy* @date 2022-03-15*/
public interface GblfyObServer {void sendMsg(JSONObject jsonObject);
}
2.2. 创建观察者
短信观察者
package com.gblfy.observer.impl;import com.alibaba.fastjson.JSONObject;
import com.gblfy.observer.GblfyObServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;/*** 定义(短信)事件通知** @author gblfy* @date 2022-03-15*/
@Slf4j
@Component
public class SmsObServer implements GblfyObServer {@Override@Async("customAsyncThreadPool")public void sendMsg(JSONObject jsonObject) {log.info("观察者模式发送->短信-->{}", jsonObject.toJSONString());}
}
邮件观察者
package com.gblfy.observer.impl;import com.alibaba.fastjson.JSONObject;
import com.gblfy.observer.GblfyObServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;/*** 定义(邮件)事件通知** @author gblfy* @date 2022-03-15*/
@Slf4j
@Component
public class EmailObServer implements GblfyObServer {@Override@Async("customAsyncThreadPool")public void sendMsg(JSONObject jsonObject) {log.info("观察者模式发送->邮件-->{}",jsonObject.toJSONString());}
}
2.3. 主题通知所有观察者
package com.gblfy.observer;import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;/*** 定义消注册中心,主题通知所有观察者** @author gblfy* @date 2022-03-15*/
@Component
public class GblfySubject {//观察者容器private List<GblfyObServer> obServerList = new ArrayList<>();/*** 添加观察者** @param gblfyObServer*/public void addObServer(GblfyObServer gblfyObServer) {obServerList.add(gblfyObServer);}/*** 通知所有的观察者** @param jsonObject*/public void notifyObServer(JSONObject jsonObject) {obServerList.stream().forEach(p -> p.sendMsg(jsonObject));}
}
2.4. 观察者注册
项目启动自动注册观察者
package com.gblfy.start;import com.gblfy.observer.GblfyObServer;
import com.gblfy.observer.GblfySubject;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;import java.util.Map;/*** 项目启动注册观察者** @author gblfy* @date 2022-03-15*/
@Component
public class StartService implements ApplicationRunner, ApplicationContextAware {@Autowiredprivate GblfySubject gblfySubject;//初始化上下文对象private ApplicationContext applicationContext;/*** 项目启动成功注册在观察着集合(ObServer的子类实例)** @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {/*** 根据接口类获取相应bena对象,自动注册观察者* 1.使用spring获取ObServer下所有子类的bean对象* 2.将bean对象依次添加到gblfySubject观察者集合中即可*/Map<String, GblfyObServer> map = applicationContext.getBeansOfType(GblfyObServer.class);for (String key : map.keySet()) {GblfyObServer gblfyObServer = map.get(key);gblfySubject.addObServer(gblfyObServer);}}//获取上下文@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}
}
2.5. 自定义线程池
package com.gblfy.config;import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;/*** @Deacription 自定义异步线程池* @Author gblfy* @Date 2022-03-15 10:38**/
@Component
@EnableAsync
public class AsyncScheduledTaskConfig {@Bean("customAsyncThreadPool")public Executor customAsyncThreadPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//最大线程数executor.setMaxPoolSize(100);//核心线程数executor.setCorePoolSize(10);//任务队列的大小executor.setQueueCapacity(10);//线程池名的前缀executor.setThreadNamePrefix("gblfy-signpolicy-asynnotify-");//允许线程的空闲时间30秒executor.setKeepAliveSeconds(30);//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Beanexecutor.setWaitForTasksToCompleteOnShutdown(true);//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住executor.setAwaitTerminationSeconds(60);/*** 拒绝处理策略* CallerRunsPolicy():交由调用方线程运行,比如 main 线程。* AbortPolicy():直接抛出异常。* DiscardPolicy():直接丢弃。* DiscardOldestPolicy():丢弃队列中最老的任务。*//*** 特殊说明:* 1. 这里演示环境,拒绝策略咱们采用抛出异常* 2.真实业务场景会把缓存队列的大小会设置大一些,* 如果,提交的任务数量超过最大线程数量或将任务环缓存到本地、redis、mysql中,保证消息不丢失* 3.如果项目比较大的话,异步通知种类很多的话,建议采用MQ做异步通知方案*/executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//线程初始化executor.initialize();return executor;}
}
2.6. 签单通知入口
package com.gblfy.controller;import com.alibaba.fastjson.JSONObject;
import com.gblfy.observer.GblfySubject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** 签单通知入口** @author gblfy* @date 2022-03-15*/
@Slf4j
@RestController
public class SignPolicyController {@Autowiredprivate GblfySubject gblfySubject;/*** 签单异步通知(短信、邮件等多种方式)** @return*/@GetMapping("/signPolicyToAsynNotify")public String signPolicy() {log.info("将签单信息保存数据库处理");JSONObject jsonObject = new JSONObject();jsonObject.put("sms", "1766666666");jsonObject.put("email", "1766@163.com");gblfySubject.notifyObServer(jsonObject);return "success";}
}
2.6. 异步通知接口测试
http://localhost:8080/signPolicyToAsynNotify
2.7. 依赖
<!--字符串工具类--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><!--数据json处理--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency><!--SpringMVC--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
三、Spring事件通知
3.1. 定义消息实体类
package com.gblfy.entity;import org.springframework.context.ApplicationEvent;/*** 定义消息实体类** @author gblfy* @date 2022-03-15*/
public class SignPolicyMsgEntity extends ApplicationEvent {private String email;private String phone;private String userId;public SignPolicyMsgEntity(Object source) {super(source);}public SignPolicyMsgEntity(Object source, String email, String phone) {super(source);this.email = email;this.phone = phone;}@Overridepublic String toString() {return "email:" + email + ",phone:" + phone;}
}
3.2. 定义(邮件)事件通知
package com.gblfy.listener;import com.gblfy.entity.SignPolicyMsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;/*** 定义(邮件)事件通知** @author gblfy* @date 2022-03-15*/
@Component
@Slf4j
public class EmailListener implements ApplicationListener<SignPolicyMsgEntity> {@Overridepublic void onApplicationEvent(SignPolicyMsgEntity event) {log.info("eamil:->{}", event.toString());}
}
3.3. 定义(短信)事件通知
package com.gblfy.listener;import com.gblfy.entity.SignPolicyMsgEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;/*** 定义(短信)事件通知** @author gblfy* @date 2022-03-15*/
@Component
@Slf4j
public class SmsListener implements ApplicationListener<SignPolicyMsgEntity> {@Overridepublic void onApplicationEvent(SignPolicyMsgEntity event) {log.info("sms:->{}", event.toString());}}
3.4. 签单同步通知入口
@Autowired
private ApplicationEventPublisher applicationEventPublisher;/*** 签单同步通知(短信、邮件等多种方式)* 使用spring事件通知** @return*/@GetMapping("/signPolicyToSyncNotify")public String signPolicyToSyncNotify() {log.info("将签单信息保存数据库处理");SignPolicyMsgEntity signPolicyMsgEntity = new SignPolicyMsgEntity(this, "1766@163.com", "1766666666");applicationEventPublisher.publishEvent(signPolicyMsgEntity);return "success";}
3.5. 测试效果
http://localhost:8080/signPolicyToSyncNotify
3.6. 开源项目
https://gitee.com/gblfy/design-pattern/tree/observer-mode/