20221124 kafka实时数据写入Redis

一、上线结论

  • 实现了将用户线上实时浏览的沉浸式视频信息,保存在Redis中这样一个功能。
  • 为实现沉浸式视频离线推荐到实时推荐提供了强有力的支持。目前只是应用在沉浸式场景,后续也能扩展到其他所有场景。
  • 用于两个场景:(1)根据用户近期观看物料匹配相似物料(2)过滤用户近期观看物料

二、实现效果展示

用户在线上刷一个视频,redis就会将用户的视频信息保存在用户历史浏览的队列中。

队列大小为100。具体保存的信息如下所示:

一、Redis存储KEY:kafka:user_short_video_streaming:_5c91e0cf0cf2f3d119f92774
二、Redis存储value:[{"duration":4,"resourceId":"28808","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}, {"duration":9,"resourceId":"24262","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}, {"duration":5,"resourceId":"25330","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}]

三、实现策略

  • 采用Java语言实现,
  • 先监听kafka,然后解析kafka消息,进行解码,再解析,从解析后的结果中提取user_id, resource_id和resource_type字段。
  • 连接Redis,构造用户队列,队列长度设置为100(用户刷的视频个数),将数据写入Redis
  • 队列大小为100,超过100顺序pop

代码Git:http://gitlab.dzj.com/applied_algorithm/data_analysis/kafka_streaming_immersive.git

四、项目后续规划

  • 扩展到Feed流,搜索召回等全部的场景
  • jar包后台运行方式改为CICD部署

五、附录

5.1 BUG分享

在实现的过程中,遇到一个序列化问题,就是写入的key和value乱码,导致用Python查询的定义好的KEY的时候查询不到,解决方案如下:

自定义RedisTemplete进行重写,  用jackson进行序列化,将这个类注册到Spring Boot中

 折叠源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

package com.dzj.kafka_streaming.Config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;

import com.fasterxml.jackson.annotation.PropertyAccessor;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

import org.springframework.data.redis.serializer.StringRedisSerializer;

/**

 * @Author : wangyongpeng

 * @Date : 2022/12/16 14:34

 * @Description : 重写RedisTemplate, 进行序列化

 */

@Configuration

public class RedisConfig {

    @Bean

    @SuppressWarnings("all")

    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {

        RedisTemplate<String, Object> template = new RedisTemplate();

        template.setConnectionFactory(redisConnectionFactory);

        // JSON序列化配置

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();

        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        jackson2JsonRedisSerializer.setObjectMapper(om);

        // String 的序列化

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        // key 采用String的序列化方式

        template.setKeySerializer(stringRedisSerializer);

        // hash的key也采用String的序列化方式

        template.setHashKeySerializer(stringRedisSerializer);

        // valuex序列化方式采用jackson

        template.setValueSerializer(jackson2JsonRedisSerializer);

        // hash的序列化也用jackson

        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();

        return template;

    }

}

5.2 项目核心代码

 折叠源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

package com.dzj.kafka_streaming.listener;

import com.dzj.kafka_streaming.dto.TagNameTypeInfo;

import com.dzj.kafka_streaming.service.ContentTagRelationService;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import javax.annotation.Resource;

import java.util.ArrayList;

import java.util.Base64;

import java.util.List;

/**

 * "immersive_streaming_" + userId; 这是旧的key,需要清除

 */

@Component

public class MessageListener {

    @Autowired

    private ContentTagRelationService relationService;

    @Resource

    private RedisTemplate<String, Object> redisTemplate;

    private final String TOPIC_NAME = "event-trace-log";

    // @KafkaListener(topics = {TOPIC_NAME},groupId = "itmentuGroup")

    @KafkaListener(topics = {TOPIC_NAME})

    public void listener(ConsumerRecord<String,String> record)  {

        //获取消息

        String message = record.value();

        //消息偏移量

        long offset = record.offset();

        String redisKeyPrefix = "kafka:user_short_video_streaming:_";

        JSONObject dataJson = parseJson(message);

        String eventCode = dataJson.getString("eventCode");

        if ("145001".equals(eventCode)){

            // 测试环境------------------------------------------------------------------------------------------

            // 目前只关注沉浸式中得数据

            String resourceId = dataJson.getJSONObject("eventBody").getString("resourceId");

            String resourceType = dataJson.getJSONObject("eventBody").getString("resourceType");

            Integer duration = dataJson.getJSONObject("eventBody").getInteger("duration");

            String actionCode = dataJson.getJSONObject("eventBody").getString("actionCode");

            String userId = dataJson.getJSONObject("eventBody").getString("userId");

            String appType = dataJson.getJSONObject("eventBody").getString("appType");

            // System.out.println("________kafka msg: eventCode = " + eventCode + "eventBody = " + dataJson.getJSONObject("eventBody"));

            /**

             * 写入Redis

             * redis存储结构: key = List(5),是一个定长为5,右进左出的队列

             * 首先查询该key的list长度,如果长度超过5,就先左边出队列一个,再右边进一个,否则右边进一个

             */

            String key = redisKeyPrefix + userId;

    //        String key = "immersive_streaming_wyp0001";

            // 定义Redis队列写入的结构

            JSONObject redisListItem = new JSONObject();

            redisListItem.put("resourceId",resourceId);

            redisListItem.put("resourceType",resourceType);

            redisListItem.put("duration",duration);

            redisListItem.put("actionCode",actionCode);

            redisListItem.put("appType",appType);

            String redisListItemString = redisListItem.toJSONString();

            if (redisTemplate.opsForList().size(key) >= 100){

                Object leftPop = redisTemplate.opsForList().leftPop(key);

                redisTemplate.opsForList().rightPush(key, redisListItemString);

                System.out.println("[pop]redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

            }else {

                if (!resourceId.isEmpty() && !resourceType.isEmpty()){

                    redisTemplate.opsForList().rightPush(key, redisListItemString);

                    Long size = redisTemplate.opsForList().size(key);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " pushed one:  "+ size + redisListItemString);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

                }

            }

        }

    }

     

    /**

     * 解析json,解码功能

     */

    public JSONObject parseJson(String message) {

        JSONObject messageJson = JSONObject.parseObject(message);

        String dataString = messageJson.getString("data");

        // --------------------base64解码字符串--------------------

        String data_string = "";

        final Base64.Decoder decoder = Base64.getDecoder();

        try{

            data_string = new String(decoder.decode(dataString), "UTF-8");

        }catch (Exception e){

            System.out.println("【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson" + e);

        }

        // string转换为json,只取eventCode = '145001'沉浸式的

        JSONObject dataJson = JSONObject.parseObject(data_string);

        return dataJson;

    }

    /**

     * 从数据库查询

     * @param resourceId

     * @param resourceType

     * @return

     */

    public List<TagNameTypeInfo>  queryByIdAndType(String resourceId, String resourceType ){

        List<TagNameTypeInfo> tagNameTypeInfos = new ArrayList<>();

        try {

            tagNameTypeInfos = relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType);

        catch (Exception e){

            System.out.println("【ERROR】" + resourceId + "&" + resourceType + "在数据库中查询不到.......");

        }

        return tagNameTypeInfos;

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/775998.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年天津财经大学珠江学院退役大学生士兵专升本专业课报名须知

天津财经大学珠江学院2024年高职升本科&#xff08;面向退役大学生士兵&#xff09;职业技能综合考查报考须知 一、报名条件 报考天津财经大学珠江学院2024年高职升本科职业技能综合考查的退役大学生士兵应符合天津市及我院规定的报考资格。考生须完成天津市高职升本科文化考…

数据结构/C++:位图 布隆过滤器

数据结构/C&#xff1a;位图 & 布隆过滤器 位图实现应用 布隆过滤器实现应用 哈希表通过映射关系&#xff0c;实现了O(1)的复杂度来查找数据。相比于其它数据结构&#xff0c;哈希在实践中是一个非常重要的思想&#xff0c;本博客将介绍哈希思想的两大应用&#xff0c;位图…

HTTPS:原理、使用方法及安全威胁

文章目录 一、HTTPS技术原理1.1 主要技术原理1.2 HTTPS的工作过程1.2.1 握手阶段1.2.2 数据传输阶段 1.3 CA证书的签发流程1.4 HTTPS的安全性 二、HTTPS使用方法三、HTTPS安全威胁四、总结 HTTPS&#xff08;全称&#xff1a;Hyper Text Transfer Protocol over Secure Socket …

git提交-分支开发合并-控制台操作

git提交-分支开发合并-控制台操作 git的基本概念工作区、暂存区和版本库工作区&#xff1a;就是你在电脑里能看到的目录&#xff08;隐藏目录 .git不算工作区&#xff09;。暂存区&#xff1a;英文叫 stage 或 index。一般存放在本地的.git目录下的index 文件&#xff08;.git/…

036—pandas 按行将列名根据值由大到小排序

前言 数据处理中&#xff0c;按行排列的列名可以提供更直观的数据探索和分析方式。 你可以逐行查看列名&#xff0c;了解每列的含义和特征&#xff0c;有助于更好地理解数据集的结构和内容。 需求&#xff1a; 需要增加一列「分布方式」&#xff0c;每行的值是本行基金名称对…

双指针算法:三数之和

文章目录 一、[题目链接&#xff1a;三数之和](https://leetcode.cn/problems/3sum/submissions/515727749/)二、思路讲解三、代码演示 先赞后看&#xff0c;养成习惯&#xff01;&#xff01;&#xff01;^ _ ^<3 ❤️ ❤️ ❤️ 码字不易&#xff0c;大家的支持就是我坚持…

对BSV区块链网络访问规则NAR通俗易懂的解释

​​发表时间&#xff1a;2024年2月21日 我们可以把BSV区块链网络想象成在公园里举办的一场大型公共足球比赛。虽然这是一场友谊赛&#xff0c;但在比赛前&#xff0c;每个人都要理解并同意基本规则。举例来说&#xff0c;除了守门员之外&#xff0c;任何球员不得用手触球。 在…

JetBrains全家桶激活,分享 WebStorm 2024 激活的方案

大家好&#xff0c;欢迎来到金榜探云手&#xff01; WebStorm公司简介 JetBrains 是一家专注于开发工具的软件公司&#xff0c;总部位于捷克。他们以提供强大的集成开发环境&#xff08;IDE&#xff09;而闻名&#xff0c;如 IntelliJ IDEA、PyCharm、和 WebStorm等。这些工具…

【动手学深度学习】深入浅出深度学习之线性神经网络

目录 &#x1f31e;一、实验目的 &#x1f31e;二、实验准备 &#x1f31e;三、实验内容 &#x1f33c;1. 线性回归 &#x1f33b;1.1 矢量化加速 &#x1f33b;1.2 正态分布与平方损失 &#x1f33c;2. 线性回归的从零开始实现 &#x1f33b;2.1. 生成数据集 &#x…

Linux:文件增删 文件压缩指令

Linux&#xff1a;文件增删 & 文件压缩指令 文件增删touch指令mkdir指令cp指令rm指令rmdir指令 文件压缩zip & unzip 指令tar指令 文件增删 touch指令 功能&#xff1a;touch命令参数可更改文档或目录的日期时间&#xff0c;包括存取时间和更改时间&#xff0c;或者新…

离线数仓(八)【DWD 层开发】

前言 1、DWD 层开发 DWD层设计要点&#xff1a; &#xff08;1&#xff09;DWD层的设计依据是维度建模理论&#xff08;主体是事务型事实表&#xff08;选择业务过程 -> 声明粒度 -> 确定维度 -> 确定事实&#xff09;&#xff0c;另外两种周期型快照事实表和累积型…

第19篇:基本RS锁存器

Q&#xff1a;本期开始我们来设计实现时序逻辑电路&#xff0c;首先来设计由与非门构成的基本RS锁存器。 A&#xff1a;基本RS锁存器工作原理&#xff1a;锁存器的2个输入端均为低电平有效&#xff0c;一般情况下&#xff0c;2个输入端均为1时输出状态维持不变&#xff0c;只有…

JWT认证原理

简介&#xff1a; JSON Web Token (JWT) is an open standard (RFC 7519) that defines a compact and self-contained way for securely transmitting information between parties as a JSON object. This information can be verified and trusted because it is digitally …

【信号处理】基于DGGAN的单通道脑电信号增强和情绪检测(tensorflow)

关于 情绪检测&#xff0c;是脑科学研究中的一个常见和热门的方向。在进行情绪检测的分类中&#xff0c;真实数据不足&#xff0c;经常导致情绪检测模型的性能不佳。因此&#xff0c;对数据进行增强&#xff0c;成为了一个提升下游任务的重要的手段。本项目通过DCGAN模型实现脑…

基于STC12C5A60S2系列1T 8051单片机的按键单击长按实现互不干扰增加减少数值应用

基于STC12C5A60S2系列1T 8051单片机的按键单击长按实现互不干扰增加减少数值应用 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式及配置STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式介绍基于STC12C5A60S2系列1T 8051单片机的…

iscsi网络协议(连接硬件设备)

iscsi概念 iscsi是一种互联网协议&#xff0c;用于将存储设备&#xff08;如硬盘驱动器或磁带驱动器&#xff09;通过网络连接到计算机。它是一种存储区域网络&#xff08;SAN&#xff09;技术&#xff0c;允许服务器通过网络连接到存储设备&#xff0c;就像它们是本地设备一样…

区块链技术与大数据结合的商业模式探索

hello宝子们...我们是艾斯视觉擅长ui设计和前端开发10年经验&#xff01;希望我的分享能帮助到您&#xff01;如需帮助可以评论关注私信我们一起探讨&#xff01;致敬感谢感恩&#xff01; 随着区块链技术和大数据技术的不断发展&#xff0c;两者的结合为企业带来了新的商业模式…

科东软件联手英特尔,用工业AI智能机器人赋能工业升级

AI浪潮已经冲击到各行各业中&#xff0c;它能够帮助人们提高思考和生产效率。在创作中&#xff0c;AI能够帮助人们释放创意&#xff0c;那在工业中&#xff0c;AI能够为产业带来什么呢&#xff1f; 科东软件是国内专注于操作系统开发的企业。当前&#xff0c;科东开发的Intewe…

机器学习——贝叶斯分类器(基础理论+编程)

目录 一、理论 1、初步引入 2、做简化 3、拉普拉斯修正 二、实战 1、计算P(c) 2、计算P(x|c) 3、实战结果 1、数据集展示 2、相关信息打印 一、理论 1、初步引入 在所有相关概率都已知的理想情形下&#xff0c;贝叶斯决策论考虑如何基于这些概率和误判损失来选择最…