今天我们来做一个典型的消费力度能达到万级别的并发场景,老师点名-学生签到
正常情况
正常情况来说是不同班级下的老师发布不同的点名--然后不同班级下的很多学生同一时间进行签到,签到成功就去修改数据库,签到失败就返回,但是这样的话 签到的学生一多,数据库修改每一行的内容,都会加上行锁,那么改的多了,数据库很可能出现卡顿的情况,导致学生明明在规定时间内签到了,但是却出现签到结束的情况,或者说出现其他的冗余签到的情况,这样显然是不希望我们看到的,也不希望学生看到
并发级处理
怎么解决前面的那种签到错误的场景呢?
那么当然就是传统级别的 面对并发情况下的重拳三连了哈哈哈
mysql-redis-rabbitMq
首先 我们这个业务需要怎么写?
redis的key怎么选择,学生的key怎么选都是一个问题,下面我们来一一的进行分析
MySQL表的业务数据关联
因为我们是测试demo,所以我们只做出了关键的表结构关联,像老师表我们是没有做的
看上图,首先我们最顶部有一个课程表,写的有一个课程id和名称,还有还有学生表,学生表和课程表之间有一个中间的表关联,叫学生课程表(student-courses),然后我们老师点名的时候是属于课堂活动表,里面记录的课堂的活动,比如点名和提问,这个表(class_activities)与课程表关联,最后的是每一个学生在该课程下的做出的课堂活动,也就是学生活动表(student-activities),她关联了学生表,课堂活动表和课程表。
主要流程
老师发布点名,然后课堂互动表记录一条会过期的课堂活动,状态是进行中,然后学生签到,签到之后,找到该课程下的该签到过的学生,像学生活动表中添加一条签到过的数据
Redis业务
在redis方面,我们主要做的就是对学生签到数据的存储,对老师发布的签到数据的存储
我们知道 redis的string的数据类型是比较占用空间的,所以对于我们单个的老师发布的签到数据,我们可以用string类型,对于不同班级下的多个学生的签到情况,我们可以用hash结构 ,因为对于ihash结构,我们的数据一般是使用ziplist压缩,更省空间
RabbitMQ业务
我们mq主要做的就是读取redis中的签到过的学生数据,然后把学生数据做一个异步写入mysql,这样减缓签到高峰时段mysql的压力
我们mq首先从redis中查到签到过的学生数据,然后跟该课程下的学生数据做对比,如果该课程下学生有数据,redis中学生签到无数据,那么该学生就是未签到
如果签到,就把签到数据存入数据库
总体代码
老师点名-学生签到
package com.example.tabledemo.controller;import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.RandomUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.tabledemo.config.RabbitConfig;
import com.example.tabledemo.generator.service.ClassActivitiesService;
import com.example.tabledemo.generator.service.CourseService;
import com.example.tabledemo.pojo.Result;
import com.example.tabledemo.pojo.entity.ClassActivitiesEntity;
import com.example.tabledemo.pojo.entity.CourseEntity;
import com.example.tabledemo.pojo.request.ClassActivitiesRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Objects;import static cn.hutool.core.date.DateTime.now;/*** @Author: wyz* @Date: 2025-04-08-16:17* @Description:课堂活动*/
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/class/activities")
public class ClassActivitiesController {private final ClassActivitiesService classActivitiesService;private final CourseService courseService;private final StringRedisTemplate redisTemplate;private final RabbitTemplate rabbitTemplate;/*** 老师点名*/@PostMapping("/teacher/rollCall")public Result teacherRollCall(@RequestBody ClassActivitiesRequest.TeacherRollCall teacherRollCall) {//判断是否有课程CourseEntity course = courseService.getById(teacherRollCall.getCourseId());if (Objects.isNull(course)) {return Result.fail("没有该课程");}//查看该课程下是否有点名活动LambdaQueryWrapper<ClassActivitiesEntity> eq = Wrappers.lambdaQuery(ClassActivitiesEntity.class).eq(ClassActivitiesEntity::getCourseId, teacherRollCall.getCourseId()).eq(ClassActivitiesEntity::getActiveType, 1).eq(ClassActivitiesEntity::getActiveStatus, 0);ClassActivitiesEntity one = classActivitiesService.getOne(eq);if(!Objects.isNull(one)){return Result.fail("该课程已存在点名,请勿重复点名");}//生成签到码//// String signCode = RandomUtil.randomNumbers(4);String signCode = "1234";ClassActivitiesEntity classActivitiesEntity = new ClassActivitiesEntity();classActivitiesEntity.setCourseId(teacherRollCall.getCourseId());// 获取当前时间DateTime now = now();classActivitiesEntity.setStartTime(now);// 使用Calendar计算未来时间Calendar calendar = Calendar.getInstance();calendar.setTime(now);calendar.add(Calendar.SECOND, teacherRollCall.getSignSeconds());Date endTime = calendar.getTime();classActivitiesEntity.setEndTime(endTime);classActivitiesEntity.setActiveType(1);classActivitiesEntity.setActiveStatus(0);//课堂活动存入数据库boolean save = classActivitiesService.save(classActivitiesEntity);//redis中生成签到码的keyString signCodeKey = "sign_" + teacherRollCall.getCourseId() + "_" + signCode;redisTemplate.opsForValue().set(signCodeKey, signCode);//发给rabbitmq 延迟队列 让延迟队列处理 最终的签到情况//1. 学生查看课堂的活动的信息 应该在 课堂活动表中查看//2. 延迟队列处理 签到结束后的情况HashMap<Object, Object> map = new HashMap<>();map.put("course_id", teacherRollCall.getCourseId());map.put("class_activities_id", classActivitiesEntity.getId());map.put("sign_code", signCode);rabbitTemplate.convertAndSend(RabbitConfig.ROLL_CALL_DEAD_EXCHANGE, RabbitConfig.ROLL_CALL_DEAD_ROUTING_KEY, map, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(teacherRollCall.getSignSeconds()*1000);return message;}});return Result.success("发布签到成功",signCode);}/*** 学生签到*/@PostMapping("/student/sign")public Result studentSign(@RequestBody ClassActivitiesRequest.StudentSign studentSign) {//判断该学生是否在班级当中//这里我们不判断 知道就行String signCodeKey = "sign_" + studentSign.getCourseId() + "_" + studentSign.getSignCode();//不为空 证明有该签到String signCode = redisTemplate.opsForValue().get(signCodeKey);if (!Objects.isNull(signCode)) {if (!signCode.equals(studentSign.getSignCode())) {return Result.fail("签到码错误,签到失败");}//学生签到keyString studentSignKey="student_sign_"+studentSign.getStudentId();if(redisTemplate.opsForHash().hasKey("h"+signCodeKey,studentSignKey)){return Result.fail("您已经签到成功,请勿重复签到");}//value正常应该是 签到时间 我们换成签到码redisTemplate.opsForHash().put("h"+signCodeKey,studentSignKey,signCode);return Result.success("签到成功");} else {return Result.fail("签到已过期或已被删除");}}
}
mq配置
package com.example.tabledemo.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: wyz* @Date: 2025-04-08-17:19* @Description:*/
@Configuration
public class RabbitConfig {@Beanpublic MessageConverter messageConverter() {// 定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();return jackson2JsonMessageConverter;}// //点名延迟交换机
// public static final String ROLL_CALL_EXCHANGE = "roll_call_exchange";
// //点名延迟队列
// public static final String ROLL_CALL_QUEUE = "roll_call_queue";//点名死信交换机public static final String ROLL_CALL_DEAD_EXCHANGE = "roll_call_dead_exchange";//点名死信队列public static final String ROLL_CALL_DEAD_QUEUE = "roll_call_dead_queue";public static final String ROLL_CALL_DEAD_ROUTING_KEY = "roll_call";/*** 绑定 点名消息队列 -> 点名私信交换机->点名私信队列** @return*/
// @Bean
// public Queue bindMsgDeadQueue() {
// return QueueBuilder.durable(ROLL_CALL_QUEUE)
// .deadLetterExchange(ROLL_CALL_DEAD_EXCHANGE)
// .deadLetterRoutingKey(ROLL_CALL_DEAD_ROUTING_KEY)
//
// .build();
// }
//
//
//
//
// /**
// * 声明点名交换机
// */
// @Bean
// Exchange rollCallExchange() {
// return ExchangeBuilder.directExchange(ROLL_CALL_EXCHANGE)
// .durable(true)
// .build();
// }
//
// /**
// * 绑定 点名 交换机队列
// */
// @Bean
// Binding bingingRollCallExchangeQueue() {
// return BindingBuilder.bind(bindMsgDeadQueue())
// .to(rollCallExchange())
// .with(ROLL_CALL_DEAD_ROUTING_KEY).noargs();
// }/*** 声明点名死信队列*/@BeanQueue rollCallDeadQueue() {return QueueBuilder.durable(ROLL_CALL_DEAD_QUEUE).build();}/*** 声明点名 死信交换机*/@BeanExchange rollCallDeadExchange() {return ExchangeBuilder.directExchange(ROLL_CALL_DEAD_EXCHANGE).delayed().durable(true).build();}/*** 绑定点名 私信交换机队列*/@BeanBinding bindingRollCallExchangeQueue() {return BindingBuilder.bind(rollCallDeadQueue()).to(rollCallDeadExchange()).with(ROLL_CALL_DEAD_ROUTING_KEY).noargs();}}
消费者配置
package com.example.tabledemo.consumer;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.tabledemo.config.RabbitConfig;
import com.example.tabledemo.generator.service.ClassActivitiesService;
import com.example.tabledemo.generator.service.StudentActivitiesService;
import com.example.tabledemo.pojo.entity.ClassActivitiesEntity;
import com.example.tabledemo.pojo.entity.StudentActivitiesEntity;
import com.example.tabledemo.student.StudentCoursesEntity;
import com.example.tabledemo.student.service.StudentCoursesService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;import static java.time.LocalTime.now;/*** @Author: wyz* @Date: 2025-04-08-20:40* @Description:处理学生签到的消费者*/
@Component
@Slf4j
public class SignConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate StudentCoursesService studentCoursesService;@Autowiredprivate ClassActivitiesService classActivitiesService;@Autowiredprivate StudentActivitiesService studentActivitiesService;@RabbitListener(queues = RabbitConfig.ROLL_CALL_DEAD_QUEUE)@RabbitHandler// 直接引用队列名public void studentSignConsumer(HashMap<Object, Object> map, Channel channel, Message message) throws IOException {try {log.info(now() + "----------老师点名延迟消息处理开始----------");//解析消息Integer courseId = (Integer) map.get("course_id");Integer classActivitiesId = (Integer) map.get("class_activities_id");String signCode = (String) map.get("sign_code");//业务幂等性判断ClassActivitiesEntity byId = classActivitiesService.getById(classActivitiesId);//证明已经消费过了 本来是额外存的这里 只用状态判断if(byId.getActiveStatus()==1){return;}//拿到redis中的学生签到数据String signCodeKey = "sign_" + courseId + "_" + signCode;Map<Object, Object> studentSignMap = redisTemplate.opsForHash().entries("h" + signCodeKey);//课堂活动状态改为已经结束LambdaUpdateWrapper<ClassActivitiesEntity> eq1 = Wrappers.lambdaUpdate(ClassActivitiesEntity.class).set(ClassActivitiesEntity::getActiveStatus, 1).eq(ClassActivitiesEntity::getId, classActivitiesId);classActivitiesService.update(eq1);//学生签到key//String studentSignKey="student_sign_"+studentSign.getStudentId();List<Integer> studentSignIdList = studentSignMap.entrySet().stream().map(i -> {String studentSignKey = (String) i.getKey();log.info("学生信息为{}", studentSignKey);Integer studentId = Integer.valueOf(studentSignKey.split("_")[2]);log.info("学生id为{}", studentId);return studentId;}).collect(Collectors.toList());//查出该课程下 的所有学生idLambdaQueryWrapper<StudentCoursesEntity> eq = Wrappers.lambdaQuery(StudentCoursesEntity.class).eq(StudentCoursesEntity::getCourseId, courseId);List<StudentCoursesEntity> list = studentCoursesService.list(eq);List<Integer> studentIds = list.stream().map(i -> i.getStudentId()).collect(Collectors.toList());//正常是 会有课程状态 课程结课什么的 ,这里我们模拟 不做处理ArrayList<StudentActivitiesEntity> studentActivitiesEntities = new ArrayList<>();studentIds.stream().forEach(studentId -> {StudentActivitiesEntity studentActivitiesEntity = new StudentActivitiesEntity();studentActivitiesEntity.setStudentId(studentId);studentActivitiesEntity.setClassActivitiesId(classActivitiesId);studentActivitiesEntity.setCourseId(courseId);studentActivitiesEntity.setStudentActivitiesStatus(0);if (studentSignIdList.contains(studentId)) {log.info("有学生签到了");studentActivitiesEntity.setStudentActivitiesStatus(1);}studentActivitiesEntities.add(studentActivitiesEntity);});//构建学生活动表的数据studentActivitiesService.saveBatch(studentActivitiesEntities);//删除redis数据redisTemplate.delete(signCodeKey);redisTemplate.delete("h" + signCodeKey);//true 和false 代表着 是否 确认该条消息之前的 true 是确认 false 不确认// 假设队列中有消息 deliveryTag=5,6,7 现在是6// 结果:仅消息6被确认删除,消息5和7仍在队列中channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info(now() + "----------老师点名延迟消息处理结束----------");} catch (Exception e) {Boolean redelivered = message.getMessageProperties().getRedelivered();if (redelivered) {log.info(now() + "----------老师点名延迟消息处理异常,已被重新投递,丢弃消息----------");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.info(now() + "----------老师点名延迟消息处理异常,消息重新投递----------");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}throw e;}}
}
测试流程
接口测试
jmeter 压测
数据库数据查看
可见 已经测试成功了