目录
整合第三方技术
缓存
SpringBoot内置缓存解决方案
SpringBoot整合Ehcache缓存
SpringBoot整合Redis缓存
SpringBoot整合Memcached缓存
SpringBoot整合jetcache缓存
SpringBoot整合j2cache缓存
任务
Quartz
Task
邮件
消息
Java处理消息的标准规范
购物订单发送手机短信案例
SpringBoot整合ActiveMQ
SpringBoot整合RabbitMQ
SpringBoot整合RocketMQ
SpringBoot整合Kafka
整合第三方技术
缓存
SpringBoot内置缓存解决方案
缓存使用
步骤①:导入springboot提供的缓存技术对应的starter
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-cache</artifactId>
</dependency>
步骤②:启用缓存,在引导类上方标注注解@EnableCaching配置springboot程序中可以使用缓存
@SpringBootApplication
//开启缓存功能
@EnableCaching
public class Springboot19CacheApplication {public static void main(String[] args) {SpringApplication.run(Springboot19CacheApplication.class, args);}
}
步骤③:设置操作的数据是否使用缓存
@Service
public class BookServiceImpl implements BookService {@Autowiredprivate BookDao bookDao;@Cacheable(value="cacheSpace",key="#id")public Book getById(Integer id) {return bookDao.selectById(id);}
}
SpringBoot提供的缓存技术除了提供默认的缓存方案,还可以对其他缓存技术进行整合,统一接口,方便缓存技术的开发与管理
Generic
JCache
Ehcache
Hazelcast
Infinispan
Couchbase
Redis
Caffeine
Simple(默认)
SpringBoot整合Ehcache缓存
下面就开始springboot整合各种各样的缓存技术,第一个整合Ehcache技术。Ehcache是一种缓存技术,使用springboot整合Ehcache其实就是变更一下缓存技术的实现方式
步骤①:导入Ehcache的坐标
<dependency><groupId>net.sf.ehcache</groupId><artifactId>ehcache</artifactId>
</dependency>
步骤②:配置缓存技术实现使用Ehcache
spring:cache:type: ehcacheehcache:config: ehcache.xml
提供ehcache配置文件ehcache.xml
<?xml version="1.0" encoding="UTF-8"?>
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd"updateCheck="false"><diskStore path="D:\ehcache" /><!--默认缓存策略 --><!-- external:是否永久存在,设置为true则不会被清除,此时与timeout冲突,通常设置为false--><!-- diskPersistent:是否启用磁盘持久化--><!-- maxElementsInMemory:最大缓存数量--><!-- overflowToDisk:超过最大缓存数量是否持久化到磁盘--><!-- timeToIdleSeconds:最大不活动间隔,设置过长缓存容易溢出,设置过短无效果,可用于记录时效性数据,例如验证码--><!-- timeToLiveSeconds:最大存活时间--><!-- memoryStoreEvictionPolicy:缓存清除策略--><defaultCacheeternal="false"diskPersistent="false"maxElementsInMemory="1000"overflowToDisk="false"timeToIdleSeconds="60"timeToLiveSeconds="60"memoryStoreEvictionPolicy="LRU" /><cachename="smsCode"eternal="false"diskPersistent="false"maxElementsInMemory="1000"overflowToDisk="false"timeToIdleSeconds="10"timeToLiveSeconds="10"memoryStoreEvictionPolicy="LRU" />
</ehcache>
注意前面的案例中,设置了数据保存的位置是smsCode
@CachePut(value = "smsCode", key = "#tele")
public String sendCodeToSMS(String tele) {String code = codeUtils.generator(tele);return code;
}
这个设定需要保障ehcache中有一个缓存空间名称叫做smsCode的配置,前后要统一
SpringBoot整合Redis缓存
步骤①:导入redis的坐标
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
步骤②:配置缓存技术实现使用redis
spring:redis:host: localhostport: 6379cache:type: redis
如果需要对redis作为缓存进行配置,注意不是对原始的redis进行配置,而是配置redis作为缓存使用相关的配置,隶属于spring.cache.redis节点下,注意不要写错位置了。
spring:redis:host: localhostport: 6379cache:type: redisredis:use-key-prefix: falsekey-prefix: sms_cache-null-values: falsetime-to-live: 10s
SpringBoot整合Memcached缓存
安装
windows版安装包下载地址:Memcached下载
下载的安装包是解压缩就能使用的zip文件,解压缩完毕后会得到如下文件
可执行文件只有一个memcached.exe,使用该文件可以将memcached作为系统服务启动,执行此文件时会出现报错信息,如下:
此处出现问题的原因是注册系统服务时需要使用管理员权限,当前账号权限不足导致安装服务失败,切换管理员账号权限启动命令行
然后再次执行安装服务的命令即可,如下:
memcached.exe -d install
服务安装完毕后可以使用命令启动和停止服务,如下:
memcached.exe -d start # 启动服务
memcached.exe -d stop # 停止服务
变更缓存为Memcached
步骤①:导入xmemcached的坐标
<dependency><groupId>com.googlecode.xmemcached</groupId><artifactId>xmemcached</artifactId><version>2.4.7</version>
</dependency>
步骤②:配置memcached,制作memcached的配置类
@Configuration
public class XMemcachedConfig {@Beanpublic MemcachedClient getMemcachedClient() throws IOException {MemcachedClientBuilder memcachedClientBuilder = new XMemcachedClientBuilder("localhost:11211");MemcachedClient memcachedClient = memcachedClientBuilder.build();return memcachedClient;}
}
memcached默认对外服务端口11211
步骤③:使用xmemcached客户端操作缓存,注入MemcachedClient对象
@Service
public class SMSCodeServiceImpl implements SMSCodeService {@Autowiredprivate CodeUtils codeUtils;@Autowiredprivate MemcachedClient memcachedClient;public String sendCodeToSMS(String tele) {String code = codeUtils.generator(tele);try {memcachedClient.set(tele,10,code);} catch (Exception e) {e.printStackTrace();}return code;}public boolean checkCode(SMSCode smsCode) {String code = null;try {code = memcachedClient.get(smsCode.getTele()).toString();} catch (Exception e) {e.printStackTrace();}return smsCode.getCode().equals(code);}
}
设置值到缓存中使用set操作,取值使用get操作
上述代码对于服务器的配置使用硬编码写死到了代码中,将此数据提取出来,做成独立的配置属性
定义配置类,加载必要的配置属性,读取配置文件中memcached节点信息
@Component
@ConfigurationProperties(prefix = "memcached")
@Data
public class XMemcachedProperties {private String servers;private int poolSize;private long opTimeout;
}
定义memcached节点信息
memcached:servers: localhost:11211poolSize: 10opTimeout: 3000
-
在memcached配置类中加载信息
@Configuration
public class XMemcachedConfig {@Autowiredprivate XMemcachedProperties props;@Beanpublic MemcachedClient getMemcachedClient() throws IOException {MemcachedClientBuilder memcachedClientBuilder = new XMemcachedClientBuilder(props.getServers());memcachedClientBuilder.setConnectionPoolSize(props.getPoolSize());memcachedClientBuilder.setOpTimeout(props.getOpTimeout());MemcachedClient memcachedClient = memcachedClientBuilder.build();return memcachedClient;}
}
SpringBoot整合jetcache缓存
jetcache严格意义上来说,并不是一个缓存解决方案,只能说他算是一个缓存框架,然后把别的缓存放到jetcache中管理,这样就可以支持AB缓存一起用了
目前jetcache支持的缓存方案本地缓存支持两种,远程缓存支持两种,分别如下:
-
本地缓存(Local)
-
LinkedHashMap
-
Caffeine
-
-
远程缓存(Remote)
-
Redis
-
Tair
-
纯远程方案
步骤①:导入springboot整合jetcache对应的坐标starter,当前坐标默认使用的远程方案是redis
<dependency><groupId>com.alicp.jetcache</groupId><artifactId>jetcache-starter-redis</artifactId><version>2.6.2</version>
</dependency>
步骤②:远程方案基本配置
jetcache:remote:default:type: redishost: localhostport: 6379poolConfig:maxTotal: 50
其中poolConfig是必配项,否则会报错
步骤③:启用缓存,在引导类上方标注注解@EnableCreateCacheAnnotation配置springboot程序中可以使用注解的形式创建缓存
@SpringBootApplication
//jetcache启用缓存的主开关
@EnableCreateCacheAnnotation
public class Springboot20JetCacheApplication {public static void main(String[] args) {SpringApplication.run(Springboot20JetCacheApplication.class, args);}
}
步骤④:创建缓存对象Cache,并使用注解@CreateCache标记当前缓存的信息,然后使用Cache对象的API操作缓存,put写缓存,get读缓存。
@Service
public class SMSCodeServiceImpl implements SMSCodeService {@Autowiredprivate CodeUtils codeUtils;@CreateCache(name="jetCache_",expire = 10,timeUnit = TimeUnit.SECONDS)private Cache<String ,String> jetCache;public String sendCodeToSMS(String tele) {String code = codeUtils.generator(tele);jetCache.put(tele,code);return code;}public boolean checkCode(SMSCode smsCode) {String code = jetCache.get(smsCode.getTele());return smsCode.getCode().equals(code);}
}
上述方案中使用的是配置中定义的default缓存,其实这个default是个名字,可以随便写,也可以随便加。例如再添加一种缓存解决方案,参照如下配置进行:
jetcache:remote:default:type: redishost: localhostport: 6379poolConfig:maxTotal: 50sms:type: redishost: localhostport: 6379poolConfig:maxTotal: 50
如果想使用名称是sms的缓存,需要再创建缓存时指定参数area,声明使用对应缓存即可
@Service
public class SMSCodeServiceImpl implements SMSCodeService {@Autowiredprivate CodeUtils codeUtils;@CreateCache(area="sms",name="jetCache_",expire = 10,timeUnit = TimeUnit.SECONDS)private Cache<String ,String> jetCache;public String sendCodeToSMS(String tele) {String code = codeUtils.generator(tele);jetCache.put(tele,code);return code;}public boolean checkCode(SMSCode smsCode) {String code = jetCache.get(smsCode.getTele());return smsCode.getCode().equals(code);}
}
纯本地方案
远程方案中,配置中使用remote表示远程,换成local就是本地,只不过类型不一样而已。
步骤①:导入springboot整合jetcache对应的坐标starter
<dependency><groupId>com.alicp.jetcache</groupId><artifactId>jetcache-starter-redis</artifactId><version>2.6.2</version>
</dependency>
步骤②:本地缓存基本配置
jetcache:local:default:type: linkedhashmapkeyConvertor: fastjson
步骤③:启用缓存
@SpringBootApplication
//jetcache启用缓存的主开关
@EnableCreateCacheAnnotation
public class Springboot20JetCacheApplication {public static void main(String[] args) {SpringApplication.run(Springboot20JetCacheApplication.class, args);}
}
步骤④:创建缓存对象Cache时,标注当前使用本地缓存
@Service
public class SMSCodeServiceImpl implements SMSCodeService {@CreateCache(name="jetCache_",expire = 1000,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.LOCAL)private Cache<String ,String> jetCache;public String sendCodeToSMS(String tele) {String code = codeUtils.generator(tele);jetCache.put(tele,code);return code;}public boolean checkCode(SMSCode smsCode) {String code = jetCache.get(smsCode.getTele());return smsCode.getCode().equals(code);}
}
cacheType控制当前缓存使用本地缓存还是远程缓存,配置cacheType=CacheType.LOCAL即使用本地缓存
本地+远程方案
本地和远程方法都有了,两种方案一起使用如何配置呢?其实就是将两种配置合并到一起就可以了
jetcache:local:default:type: linkedhashmapkeyConvertor: fastjsonremote:default:type: redishost: localhostport: 6379poolConfig:maxTotal: 50sms:type: redishost: localhostport: 6379poolConfig:maxTotal: 50
在创建缓存的时候,配置cacheType为BOTH即则本地缓存与远程缓存同时使用
@Service
public class SMSCodeServiceImpl implements SMSCodeService {@CreateCache(name="jetCache_",expire = 1000,timeUnit = TimeUnit.SECONDS,cacheType = CacheType.BOTH)private Cache<String ,String> jetCache;
}
cacheType如果不进行配置,默认值是REMOTE,即仅使用远程缓存方案。关于jetcache的配置,参考以下信息
属性 | 默认值 | 说明 |
---|---|---|
jetcache.statIntervalMinutes | 0 | 统计间隔,0表示不统计 |
jetcache.hiddenPackages | 无 | 自动生成name时,隐藏指定的包名前缀 |
jetcache.[local|remote].${area}.type | 无 | 缓存类型,本地支持linkedhashmap、caffeine,远程支持redis、tair |
jetcache.[local|remote].${area}.keyConvertor | 无 | key转换器,当前仅支持fastjson |
jetcache.[local|remote].${area}.valueEncoder | java | 仅remote类型的缓存需要指定,可选java和kryo |
jetcache.[local|remote].${area}.valueDecoder | java | 仅remote类型的缓存需要指定,可选java和kryo |
jetcache.[local|remote].${area}.limit | 100 | 仅local类型的缓存需要指定,缓存实例最大元素数 |
jetcache.[local|remote].${area}.expireAfterWriteInMillis | 无穷大 | 默认过期时间,毫秒单位 |
jetcache.local.${area}.expireAfterAccessInMillis | 0 | 仅local类型的缓存有效,毫秒单位,最大不活动间隔 |
以上方案仅支持手工控制缓存,但是springcache方案中的方法缓存特别好用,给一个方法添加一个注解,方法就会自动使用缓存。jetcache也提供了对应的功能,即方法缓存。
方法缓存
jetcache提供了方法缓存方案,在对应的操作接口上方使用注解@Cached即可
步骤①:导入springboot整合jetcache对应的坐标starter
步骤②:配置缓存
jetcache:local:default:type: linkedhashmapkeyConvertor: fastjsonremote:default:type: redishost: localhostport: 6379keyConvertor: fastjsonvalueEncode: javavalueDecode: javapoolConfig:maxTotal: 50sms:type: redishost: localhostport: 6379poolConfig:maxTotal: 50
注意,为了实现Object类型的值进出redis,需要保障进出redis的Object类型的数据必须实现序列化接口
@Data
public class Book implements Serializable {private Integer id;private String type;private String name;private String description;
}
步骤③:启用缓存时开启方法缓存功能,并配置basePackages,说明在哪些包中开启方法缓存
@SpringBootApplication
//jetcache启用缓存的主开关
@EnableCreateCacheAnnotation
//开启方法注解缓存
@EnableMethodCache(basePackages = "com.green")
public class Springboot20JetCacheApplication {public static void main(String[] args) {SpringApplication.run(Springboot20JetCacheApplication.class, args);}
}
步骤④:使用注解@Cached标注当前方法使用缓存
@Service
public class BookServiceImpl implements BookService {@Autowiredprivate BookDao bookDao;@Override@Cached(name="book_",key="#id",expire = 3600,cacheType = CacheType.REMOTE)public Book getById(Integer id) {return bookDao.selectById(id);}
}
远程方案的数据同步
由于远程方案中redis保存的数据可以被多个客户端共享,这就存在了数据同步问题。jetcache提供了3个注解解决此问题,分别在更新、删除操作时同步缓存数据,和读取缓存时定时刷新数据
更新缓存
@CacheUpdate(name="book_",key="#book.id",value="#book")
public boolean update(Book book) {return bookDao.updateById(book) > 0;
}
删除缓存
@CacheInvalidate(name="book_",key = "#id")
public boolean delete(Integer id) {return bookDao.deleteById(id) > 0;
}
定时刷新缓存
@Cached(name="book_",key="#id",expire = 3600,cacheType = CacheType.REMOTE)
@CacheRefresh(refresh = 5)
public Book getById(Integer id) {return bookDao.selectById(id);
}
数据报表
jetcache还提供有简单的数据报表功能,帮助开发者快速查看缓存命中信息,只需要添加一个配置即可
jetcache:statIntervalMinutes: 1
设置后,每1分钟在控制台输出缓存数据命中信息
[DefaultExecutor] c.alicp.jetcache.support.StatInfoLogger : jetcache stat from 2022-02-28 09:32:15,892 to 2022-02-28 09:33:00,003 cache | qps| rate| get| hit| fail| expire| avgLoadTime| maxLoadTime ---------+-------+-------+------+-------+-------+---------+--------------+-------------- book_ | 0.66| 75.86%| 29| 22| 0| 0| 28.0| 188 ---------+-------+-------+------+-------+-------+---------+--------------+--------------
总结
1. jetcache是一个类似于springcache的缓存解决方案,自身不具有缓存功能,它提供有本地缓存与远程缓存多级共同使用的缓存解决方案
2. jetcache提供的缓存解决方案受限于目前支持的方案,本地缓存支持两种,远程缓存支持两种
3. 注意数据进入远程缓存时的类型转换问题
4. jetcache提供方法缓存,并提供了对应的缓存更新与刷新功能
5. jetcache提供有简单的缓存信息命中报表方便开发者即时监控缓存数据命中情况
SpringBoot整合j2cache缓存
jetcache可以在限定范围内构建多级缓存,但是灵活性不足,不能随意搭配缓存,介绍一种可以随意搭配缓存解决方案的缓存整合框架,j2cache,以Ehcache与redis整合为例:
步骤①:导入j2cache、redis、ehcache坐标
<dependency><groupId>net.oschina.j2cache</groupId><artifactId>j2cache-core</artifactId><version>2.8.4-release</version>
</dependency>
<dependency><groupId>net.oschina.j2cache</groupId><artifactId>j2cache-spring-boot2-starter</artifactId><version>2.8.0-release</version>
</dependency>
<dependency><groupId>net.sf.ehcache</groupId><artifactId>ehcache</artifactId>
</dependency>
步骤②:配置一级与二级缓存,并配置一二级缓存间数据传递方式,配置书写在名称为j2cache.properties的文件中。如果使用ehcache还需要单独添加ehcache的配置文件
# 1级缓存
j2cache.L1.provider_class = ehcache
ehcache.configXml = ehcache.xml# 2级缓存
j2cache.L2.provider_class = net.oschina.j2cache.cache.support.redis.SpringRedisProvider
j2cache.L2.config_section = redis
redis.hosts = localhost:6379# 1级缓存中的数据如何到达二级缓存
j2cache.broadcast = net.oschina.j2cache.cache.support.redis.SpringRedisPubSubPolicy
步骤③:使用缓存
@Service
public class SMSCodeServiceImpl implements SMSCodeService {@Autowiredprivate CodeUtils codeUtils;@Autowiredprivate CacheChannel cacheChannel;public String sendCodeToSMS(String tele) {String code = codeUtils.generator(tele);cacheChannel.set("sms",tele,code);return code;}public boolean checkCode(SMSCode smsCode) {String code = cacheChannel.get("sms",smsCode.getTele()).asString();return smsCode.getCode().equals(code);}
}
总结
-
j2cache是一个缓存框架,自身不具有缓存功能,它提供多种缓存整合在一起使用的方案
-
j2cache需要通过复杂的配置设置各级缓存,以及缓存之间数据交换的方式
-
j2cache操作接口通过CacheChannel实现
任务
Quartz
springboot整合Quartz前先普及几个Quartz的概念
-
工作(Job):用于定义具体执行的工作
-
工作明细(JobDetail):用于描述定时工作相关的信息
-
触发器(Trigger):描述了工作明细与调度器的对应关系
-
调度器(Scheduler):用于描述触发工作的执行规则,通常使用cron表达式定义规则
步骤①:导入springboot整合Quartz的starter
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
步骤②:定义任务Bean,按照Quartz的开发规范制作,继承QuartzJobBean
public class MyQuartz extends QuartzJobBean {@Overrideprotected void executeInternal(JobExecutionContext context) throws JobExecutionException {System.out.println("quartz task run...");}
}
步骤③:创建Quartz配置类,定义工作明细(JobDetail)与触发器的(Trigger)bean
@Configuration
public class QuartzConfig {@Beanpublic JobDetail printJobDetail(){//绑定具体的工作return JobBuilder.newJob(MyQuartz.class).storeDurably().build();}@Beanpublic Trigger printJobTrigger(){ScheduleBuilder schedBuilder = CronScheduleBuilder.cronSchedule("0/5 * * * * ?");//绑定对应的工作明细return TriggerBuilder.newTrigger().forJob(printJobDetail()).withSchedule(schedBuilder).build();}
}
Task
步骤①:开启定时任务功能,在引导类上开启定时任务功能的开关,使用注解@EnableScheduling
@SpringBootApplication
//开启定时任务功能
@EnableScheduling
public class Springboot22TaskApplication {public static void main(String[] args) {SpringApplication.run(Springboot22TaskApplication.class, args);}
}
步骤②:定义Bean,在对应要定时执行的操作上方,使用注解@Scheduled定义执行的时间,执行时间的描述方式还是cron表达式
@Component
public class MyBean {@Scheduled(cron = "0/1 * * * * ?")public void print(){System.out.println(Thread.currentThread().getName()+" :spring task run...");}
}
如何想对定时任务进行相关配置,可以通过配置文件进行
spring:task:scheduling:pool:size: 1 # 任务调度线程池大小 默认 1thread-name-prefix: ssm_ # 调度线程名称前缀 默认 scheduling- shutdown:await-termination: false # 线程池关闭时等待所有任务完成await-termination-period: 10s # 调度线程关闭前最大等待时间,确保最后一定关闭
邮件
学习邮件发送之前先了解3个概念,这些概念规范了邮件操作过程中的标准。
-
SMTP(Simple Mail Transfer Protocol):简单邮件传输协议,用于发送电子邮件的传输协议
-
POP3(Post Office Protocol - Version 3):用于接收电子邮件的标准协议
-
IMAP(Internet Mail Access Protocol):互联网消息协议,是POP3的替代协议
发送简单邮件
步骤①:导入springboot整合javamail的starter
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId>
</dependency>
步骤②:配置邮箱的登录信息
spring:mail:host: smtp.126.comusername: test@126.compassword: test
host配置的是提供邮件服务的主机协议,当前程序仅用于发送邮件,因此配置的是smtp的协议
password并不是邮箱账号的登录密码,是邮件供应商提供的一个加密后的密码,也是为了保障系统安全性
有关该密码的获取每个邮件供应商提供的方式都不一样,此处略过。可以到邮件供应商的设置页面找POP3或IMAP这些关键词找到对应的获取位置。下例仅供参考:
步骤③:使用JavaMailSender接口发送邮件
@Service
public class SendMailServiceImpl implements SendMailService {@Autowiredprivate JavaMailSender javaMailSender;//发送人private String from = "test@qq.com";//接收人private String to = "test@126.com";//标题private String subject = "测试邮件";//正文private String context = "测试邮件正文内容";@Overridepublic void sendMail() {SimpleMailMessage message = new SimpleMailMessage();message.setFrom(from+"(小甜甜)");message.setTo(to);message.setSubject(subject);message.setText(context);javaMailSender.send(message);}
}
发送多组件邮件(附件、复杂正文)
如果想发送复杂的邮件,需要更换邮件对象。使用MimeMessage可以发送特殊的邮件
发送网页正文邮件
@Service
public class SendMailServiceImpl2 implements SendMailService {@Autowiredprivate JavaMailSender javaMailSender;//发送人private String from = "test@qq.com";//接收人private String to = "test@126.com";//标题private String subject = "测试邮件";//正文private String context = "<img src='ABC.JPG'/><a href='https://www.baidu.com'>点开有惊喜</a>";public void sendMail() {try {MimeMessage message = javaMailSender.createMimeMessage();MimeMessageHelper helper = new MimeMessageHelper(message);helper.setFrom(to+"(小甜甜)");helper.setTo(from);helper.setSubject(subject);helper.setText(context,true); //此处设置正文支持html解析javaMailSender.send(message);} catch (Exception e) {e.printStackTrace();}}
}
发送带有附件的邮件
@Service
public class SendMailServiceImpl2 implements SendMailService {@Autowiredprivate JavaMailSender javaMailSender;//发送人private String from = "test@qq.com";//接收人private String to = "test@126.com";//标题private String subject = "测试邮件";//正文private String context = "测试邮件正文";public void sendMail() {try {MimeMessage message = javaMailSender.createMimeMessage();MimeMessageHelper helper = new MimeMessageHelper(message,true); //此处设置支持附件helper.setFrom(to+"(小甜甜)");helper.setTo(from);helper.setSubject(subject);helper.setText(context);//添加附件File f1 = new File("springboot_23_mail-0.0.1-SNAPSHOT.jar");File f2 = new File("resources\\logo.png");helper.addAttachment(f1.getName(),f1);helper.addAttachment("测试.png",f2);javaMailSender.send(message);} catch (Exception e) {e.printStackTrace();}}
}
消息
Java处理消息的标准规范
目前企业级开发中广泛使用的消息处理技术共三大类,具体如下:
-
JMS
-
AMQP
-
MQTT
购物订单发送手机短信案例
手机验证码案例需求如下:
-
执行下单业务时(模拟此过程),调用消息服务,将要发送短信的订单id传递给消息中间件
-
消息处理服务接收到要发送的订单id后输出订单id(模拟发短信)
由于不涉及数据读写,仅开发业务层与表现层,其中短信处理的业务代码独立开发,代码如下:
订单业务
业务层接口
public interface OrderService {void order(String id);
}
模拟传入订单id,执行下订单业务,参数为虚拟设定,实际应为订单对应的实体类
业务层实现
@Service
public class OrderServiceImpl implements OrderService {@Autowiredprivate MessageService messageService;@Overridepublic void order(String id) {//一系列操作,包含各种服务调用,处理各种业务System.out.println("订单处理开始");//短信消息处理messageService.sendMessage(id);System.out.println("订单处理结束");System.out.println();}
}
业务层转调短信处理的服务MessageService
表现层服务
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowiredprivate OrderService orderService;
@PostMapping("{id}")public void order(@PathVariable String id){orderService.order(id);}
}
表现层对外开发接口,传入订单id即可(模拟)
短信处理业务
业务层接口
public interface MessageService {void sendMessage(String id);String doMessage();
}
短信处理业务层接口提供两个操作,发送要处理的订单id到消息中间件,另一个操作目前暂且设计成处理消息,实际消息的处理过程不应该是手动执行,应该是自动执行,到具体实现时再进行设计
业务层实现
@Service
public class MessageServiceImpl implements MessageService {private ArrayList<String> msgList = new ArrayList<String>();
@Overridepublic void sendMessage(String id) {System.out.println("待发送短信的订单已纳入处理队列,id:"+id);msgList.add(id);}
@Overridepublic String doMessage() {String id = msgList.remove(0);System.out.println("已完成短信发送业务,id:"+id);return id;}
}
短信处理业务层实现中使用集合先模拟消息队列,观察效果
表现层服务
@RestController
@RequestMapping("/msgs")
public class MessageController {
@Autowiredprivate MessageService messageService;
@GetMappingpublic String doMessage(){String id = messageService.doMessage();return id;}
}
短信处理表现层接口暂且开发出一个处理消息的入口,但是此业务是对应业务层中设计的模拟接口,实际业务不需要设计此接口。
下面开启springboot整合各种各样的消息中间件,从严格满足JMS规范的ActiveMQ开始
SpringBoot整合ActiveMQ
安装
windows版安装包下载地址:ActiveMQ
下载的安装包是解压缩就能使用的zip文件,解压缩完毕后会得到如下文件
启动服务器
activemq.bat
运行bin目录下的win32或win64目录下的activemq.bat命令即可,根据自己的操作系统选择即可,默认对外服务端口61616。
访问web管理服务
ActiveMQ启动后会启动一个Web控制台服务,可以通过该服务管理ActiveMQ。
http://127.0.0.1:8161/
web管理服务默认端口8161,访问后可以打开ActiveMQ的管理界面,如下:
首先输入访问用户名和密码,初始化用户名和密码相同,均为:admin,成功登录后进入管理后台界面,如下:
看到上述界面视为启动ActiveMQ服务成功
整合
步骤①:导入springboot整合ActiveMQ的starter
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
步骤②:配置ActiveMQ的服务器地址
spring:activemq:broker-url: tcp://localhost:61616
步骤③:使用JmsMessagingTemplate操作ActiveMQ
@Service
public class MessageServiceActivemqImpl implements MessageService {@Autowiredprivate JmsMessagingTemplate messagingTemplate;
@Overridepublic void sendMessage(String id) {System.out.println("待发送短信的订单已纳入处理队列,id:"+id);messagingTemplate.convertAndSend("order.queue.id",id);}
@Overridepublic String doMessage() {String id = messagingTemplate.receiveAndConvert("order.queue.id",String.class);System.out.println("已完成短信发送业务,id:"+id);return id;}
}
发送消息需要先将消息的类型转换成字符串,然后再发送,所以是convertAndSend,定义消息发送的位置,和具体的消息内容,此处使用id作为消息内容。
接收消息需要先将消息接收到,然后再转换成指定的数据类型,所以是receiveAndConvert,接收消息除了提供读取的位置,还要给出转换后的数据的具体类型。
步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component
public class MessageListener {@JmsListener(destination = "order.queue.id")@SendTo("order.other.queue.id")public String receive(String id){System.out.println("已完成短信发送业务,id:"+id);return "new:"+id;}
}
使用注解@JmsListener定义当前方法监听ActiveMQ中指定名称的消息队列。
如果当前消息队列处理完还需要继续向下传递当前消息到另一个队列中使用注解@SendTo即可,这样即可构造连续执行的顺序消息队列。
步骤⑤:切换消息模型由点对点模型到发布订阅模型,修改jms配置即可
spring:activemq:broker-url: tcp://localhost:61616jms:pub-sub-domain: true
pub-sub-domain默认值为false,即点对点模型,修改为true后就是发布订阅模型。
总结
-
springboot整合ActiveMQ提供了JmsMessagingTemplate对象作为客户端操作消息队列
-
操作ActiveMQ需要配置ActiveMQ服务器地址,默认端口61616
-
企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@JmsListener
-
配置jms的pub-sub-domain属性可以在点对点模型和发布订阅模型间切换消息模型
SpringBoot整合RabbitMQ
RabbitMQ是MQ产品中的目前较为流行的产品之一,它遵从AMQP协议。RabbitMQ的底层实现语言使用的是Erlang,所以安装RabbitMQ需要先安装Erlang。
Erlang安装
windows版安装包下载地址:Erlang
下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕需要重启,需要重启,需要重启。
Erlang安装后需要配置环境变量,否则RabbitMQ将无法找到安装的Erlang。需要配置项如下,作用等同JDK配置环境变量的作用。
-
ERLANG_HOME
-
PATH
安装
windows版安装包下载地址:RabbitMQ
下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕后会得到如下文件
启动服务器
rabbitmq-service.bat start # 启动服务
rabbitmq-service.bat stop # 停止服务
rabbitmqctl status # 查看服务状态
整合(direct模型)
RabbitMQ满足AMQP协议,因此不同的消息模型对应的制作不同,先使用最简单的direct模型开发。
步骤①:导入springboot整合amqp的starter,amqp协议默认实现为rabbitmq方案
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
步骤②:配置RabbitMQ的服务器地址
spring:rabbitmq:host: localhostport: 5672
步骤③:初始化直连模式系统设置
由于RabbitMQ不同模型要使用不同的交换机,因此需要先初始化RabbitMQ相关的对象,例如队列,交换机等
@Configuration
public class RabbitConfigDirect {@Beanpublic Queue directQueue(){return new Queue("direct_queue");}@Beanpublic Queue directQueue2(){return new Queue("direct_queue2");}@Beanpublic DirectExchange directExchange(){return new DirectExchange("directExchange");}@Beanpublic Binding bindingDirect(){return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");}@Beanpublic Binding bindingDirect2(){return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");}
}
队列Queue与直连交换机DirectExchange创建后,还需要绑定他们之间的关系Binding,这样就可以通过交换机操作对应队列。
步骤④:使用AmqpTemplate操作RabbitMQ
@Service
public class MessageServiceRabbitmqDirectImpl implements MessageService {@Autowiredprivate AmqpTemplate amqpTemplate;
@Overridepublic void sendMessage(String id) {System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct),id:"+id);amqpTemplate.convertAndSend("directExchange","direct",id);}
}
amqp协议中的操作API接口名称看上去和jms规范的操作API接口很相似,但是传递参数差异很大。
步骤⑤:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component
public class MessageListener {@RabbitListener(queues = "direct_queue")public void receive(String id){System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id);}
}
使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
整合(topic模型)
步骤①:同上
步骤②:同上
步骤③:初始化主题模式系统设置
@Configuration
public class RabbitConfigTopic {@Beanpublic Queue topicQueue(){return new Queue("topic_queue");}@Beanpublic Queue topicQueue2(){return new Queue("topic_queue2");}@Beanpublic TopicExchange topicExchange(){return new TopicExchange("topicExchange");}@Beanpublic Binding bindingTopic(){return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.id");}@Beanpublic Binding bindingTopic2(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.orders.*");}
}
主题模式支持routingKey匹配模式,*表示匹配一个单词,#表示匹配任意内容,这样就可以通过主题交换机将消息分发到不同的队列中,详细内容请参看RabbitMQ系列课程。
匹配键 | topic.*.* | topic.# |
---|---|---|
topic.order.id | true | true |
order.topic.id | false | false |
topic.sm.order.id | false | true |
topic.sm.id | false | true |
topic.id.order | true | true |
topic.id | false | true |
topic.order | false | true |
步骤④:使用AmqpTemplate操作RabbitMQ
@Service
public class MessageServiceRabbitmqTopicImpl implements MessageService {@Autowiredprivate AmqpTemplate amqpTemplate;
@Overridepublic void sendMessage(String id) {System.out.println("待发送短信的订单已纳入处理队列(rabbitmq topic),id:"+id);amqpTemplate.convertAndSend("topicExchange","topic.orders.id",id);}
}
发送消息后,根据当前提供的routingKey与绑定交换机时设定的routingKey进行匹配,规则匹配成功消息才会进入到对应的队列中。
步骤⑤:使用消息监听器在服务器启动后,监听指定队列
@Component
public class MessageListener {@RabbitListener(queues = "topic_queue")public void receive(String id){System.out.println("已完成短信发送业务(rabbitmq topic 1),id:"+id);}@RabbitListener(queues = "topic_queue2")public void receive2(String id){System.out.println("已完成短信发送业务(rabbitmq topic 22222222),id:"+id);}
}
使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。
总结
-
springboot整合RabbitMQ提供了AmqpTemplate对象作为客户端操作消息队列
-
操作ActiveMQ需要配置ActiveMQ服务器地址,默认端口5672
-
企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RabbitListener
-
RabbitMQ有5种消息模型,使用的队列相同,但是交换机不同。交换机不同,对应的消息进入的策略也不同
SpringBoot整合RocketMQ
RocketMQ由阿里研发,后捐赠给apache基金会,目前是apache基金会顶级项目之一,也是目前市面上的MQ产品中较为流行的产品之一,它遵从AMQP协议
RocketMQ工作模式
在RocketMQ中,处理业务的服务器称为broker,生产者与消费者不是直接与broker联系的,而是通过命名服务器进行通信
启动服务器
mqnamesrv # 启动命名服务器
mqbroker # 启动broker
运行bin目录下的mqnamesrv命令即可启动命名服务器,默认对外服务端口9876
测试服务器启动状态
RocketMQ提供有一套测试服务器功能的测试程序,运行bin目录下的tools命令即可使用
tools org.apache.rocketmq.example.quickstart.Producer # 生产消息
tools org.apache.rocketmq.example.quickstart.Consumer # 消费消息
整合(异步消息)
步骤①:导入springboot整合RocketMQ的starter,此坐标不由springboot维护版本
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version>
</dependency>
步骤②:配置RocketMQ的服务器地址
rocketmq:name-server: localhost:9876producer:group: group_rocketmq
设置默认的生产者消费者所属组group。
步骤③:使用RocketMQTemplate操作RocketMQ
@Service
public class MessageServiceRocketmqImpl implements MessageService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;
@Overridepublic void sendMessage(String id) {System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id:"+id);SendCallback callback = new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息发送成功");}@Overridepublic void onException(Throwable e) {System.out.println("消息发送失败!!!!!");}};rocketMQTemplate.asyncSend("order_id",id,callback);}
}
使用asyncSend方法发送异步消息。
步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component
@RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq")
public class MessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String id) {System.out.println("已完成短信发送业务(rocketmq),id:"+id);}
}
RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接口,泛型为消息类型。
使用注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。
总结
-
springboot整合RocketMQ使用RocketMQTemplate对象作为客户端操作消息队列
-
操作RocketMQ需要配置RocketMQ服务器地址,默认端口9876
-
企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RocketMQMessageListener
SpringBoot整合Kafka
安装
windows版安装包下载地址:Kafka
下载完毕后得到tgz压缩文件,使用解压缩软件解压缩即可使用,解压后得到如下文件
windows 系统下3.0.0版本存在BUG,建议使用2.X版本
创建主题
和之前操作其他MQ产品相似,kakfa也是基于主题操作,操作之前需要先初始化topic
# 创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic itheima
# 查询topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
# 删除topic
kafka-topics.bat --delete --zookeeper localhost:2181 --topic itheima
测试服务器启动状态
Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。
kafka-console-producer.bat --broker-list localhost:9092 --topic itheima # 测试生产消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic itheima --from-beginning # 测试消息消费
整合
步骤①:导入springboot整合Kafka的starter,此坐标由springboot维护版本
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
步骤②:配置Kafka的服务器地址
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: order
设置默认的生产者消费者所属组id。
步骤③:使用KafkaTemplate操作Kafka
@Service
public class MessageServiceKafkaImpl implements MessageService {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;
@Overridepublic void sendMessage(String id) {System.out.println("待发送短信的订单已纳入处理队列(kafka),id:"+id);kafkaTemplate.send("itheima2022",id);}
}
使用send方法发送消息,需要传入topic名称。
步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息
@Component
public class MessageListener {@KafkaListener(topics = "itheima2022")public void onMessage(ConsumerRecord<String,String> record){System.out.println("已完成短信发送业务(kafka),id:"+record.value());}
}
使用注解@KafkaListener定义当前方法监听Kafka中指定topic的消息,接收到的消息封装在对象ConsumerRecord中,获取数据从ConsumerRecord对象中获取即可。
总结
-
springboot整合Kafka使用KafkaTemplate对象作为客户端操作消息队列
-
操作Kafka需要配置Kafka服务器地址,默认端口9092
-
企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@KafkaListener。接收消息保存在形参ConsumerRecord对象中
来源:黑马程序员SpringBoot2全套视频教程,springboot零基础到项目实战(spring boot2完整版)