Kafka、Kafka Streams、Drools、Redis 和分布式数据库的风控系统程序

由于实时风控系统难度较大,集成框架设计各个单位均有特点,快速建立一个通用性较强,学习、实施和使用成本较低的框架尤其重要。

提供一个简化的 Java 程序示例,演示如何将 Kafka 消息中间件、Kafka Streams 计算引擎、Drools 规则引擎、Redis 内存数据库和分布式数据库集成在一起。程序的主要功能是:

  1. 从 Kafka 中消费实时交易数据。
  2. 从 Redis 获取对应的风险标签,如果没有则从分布式数据库获取并更新到 Redis。
  3. 使用 Drools 规则引擎对交易数据和风险标签进行评估。
  4. 将评估结果发送回支付业务系统或记录下来。

示例图:

实时交易模块:接收交易数据 -> 获取风险标签(Redis)---> 调用规则引擎 —> 评估结果返回
      ↓                                                                                     ↓                                      ↑
规则引擎模块:交易数据 + 风险标签 ---> 规则执行 -----------> 输出评估结果(通过/拒绝)
 

为了简化示例,我们将:

创建一个简单的 Kafka 生产者,向 transaction-topic 发送交易数据。

2. 生产测试数据

  • 使用简单的交易数据结构和风险标签。
  • 定义基本的 Drools 规则。
  • 使用内存中的 H2 数据库模拟分布式数据库
  • 项目结构和依赖

    1. 项目结构

    risk-control-demo/
    ├── src/
    │   ├── main/
    │   │   ├── java/
    │   │   │   └── com.example.riskcontrol/
    │   │   │       ├── RiskControlApplication.java       // 主应用程序
    │   │   │       ├── Transaction.java                  // 交易数据模型
    │   │   │       ├── RiskTag.java                      // 风险标签模型
    │   │   │       ├── RiskEvaluator.java                // 风险评估类
    │   │   │       ├── RedisService.java                 // Redis 服务类
    │   │   │       ├── DatabaseService.java              // 数据库服务类
    │   │   │       └── KafkaStreamsConfig.java           // Kafka Streams 配置
    │   │   └── resources/
    │   │       ├── drools/
    │   │       │   └── rules.drl                         // Drools 规则文件
    │   │       └── application.properties                // 应用程序配置
    ├── pom.xml                                           // Maven 项目配置
  • 2. 依赖库(在 pom.xml 中)
  • <dependencies><!-- Kafka Streams --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>3.4.0</version></dependency><!-- Drools Core --><dependency><groupId>org.kie</groupId><artifactId>kie-api</artifactId><version>7.73.0.Final</version></dependency><dependency><groupId>org.drools</groupId><artifactId>drools-core</artifactId><version>7.73.0.Final</version></dependency><dependency><groupId>org.drools</groupId><artifactId>drools-compiler</artifactId><version>7.73.0.Final</version></dependency><!-- Redis Client (Jedis) --><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.3.1</version></dependency><!-- H2 Database --><dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><version>2.1.214</version><scope>runtime</scope></dependency><!-- JSON Processing --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.0</version></dependency><!-- Logging --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.36</version></dependency>
    </dependencies>
    

    详细代码

    1. Transaction.java(交易数据模型)
  • package com.example.riskcontrol;import java.io.Serializable;public class Transaction implements Serializable {private String transactionId;private String accountId;private double amount;private long timestamp;// Constructors, getters, setters, toString()public Transaction() {}public Transaction(String transactionId, String accountId, double amount, long timestamp) {this.transactionId = transactionId;this.accountId = accountId;this.amount = amount;this.timestamp = timestamp;}// Getters and Setters// toString() method
    }
    

    2. RiskTag.java(风险标签模型)

  • package com.example.riskcontrol;import java.io.Serializable;public class RiskTag implements Serializable {private String accountId;private int riskLevel; // 1-低风险, 2-中风险, 3-高风险// Constructors, getters, setters, toString()public RiskTag() {}public RiskTag(String accountId, int riskLevel) {this.accountId = accountId;this.riskLevel = riskLevel;}// Getters and Setters// toString() method
    }
    

    3. RedisService.java(Redis 服务类)

  • package com.example.riskcontrol;import redis.clients.jedis.Jedis;public class RedisService {private Jedis jedis;public RedisService(String host, int port) {jedis = new Jedis(host, port);}public RiskTag getRiskTag(String accountId) {String riskLevelStr = jedis.get("risk:" + accountId);if (riskLevelStr != null) {int riskLevel = Integer.parseInt(riskLevelStr);return new RiskTag(accountId, riskLevel);}return null;}public void setRiskTag(RiskTag riskTag) {jedis.set("risk:" + riskTag.getAccountId(), String.valueOf(riskTag.getRiskLevel()));}public void close() {jedis.close();}
    }
    

    4. DatabaseService.java(数据库服务类)

  • package com.example.riskcontrol;import java.sql.*;public class DatabaseService {private Connection connection;public DatabaseService() throws SQLException {// 连接 H2 内存数据库connection = DriverManager.getConnection("jdbc:h2:mem:testdb");initializeDatabase();}private void initializeDatabase() throws SQLException {Statement stmt = connection.createStatement();// 创建风险标签表String sql = "CREATE TABLE IF NOT EXISTS risk_tags (" +"account_id VARCHAR(255) PRIMARY KEY," +"risk_level INT" +")";stmt.executeUpdate(sql);// 插入示例数据sql = "INSERT INTO risk_tags (account_id, risk_level) VALUES ('account123', 2)";stmt.executeUpdate(sql);stmt.close();}public RiskTag getRiskTag(String accountId) throws SQLException {String sql = "SELECT risk_level FROM risk_tags WHERE account_id = ?";PreparedStatement pstmt = connection.prepareStatement(sql);pstmt.setString(1, accountId);ResultSet rs = pstmt.executeQuery();if (rs.next()) {int riskLevel = rs.getInt("risk_level");rs.close();pstmt.close();return new RiskTag(accountId, riskLevel);} else {rs.close();pstmt.close();return null;}}public void close() throws SQLException {connection.close();}
    }
    

    5. RiskEvaluator.java(风险评估类)

  • package com.example.riskcontrol;import org.kie.api.KieServices;
    import org.kie.api.runtime.KieContainer;
    import org.kie.api.runtime.KieSession;public class RiskEvaluator {private KieSession kieSession;public RiskEvaluator() {// 初始化 DroolsKieServices kieServices = KieServices.Factory.get();KieContainer kieContainer = kieServices.newKieClasspathContainer();kieSession = kieContainer.newKieSession("ksession-rules");}public boolean evaluate(Transaction transaction, RiskTag riskTag) {kieSession.insert(transaction);kieSession.insert(riskTag);int fired = kieSession.fireAllRules();kieSession.dispose();return fired > 0;}
    }
    

    6. drools/rules.drl(Drools 规则文件)

  • package com.example.riskcontrolimport com.example.riskcontrol.Transaction;
    import com.example.riskcontrol.RiskTag;rule "High Risk Transaction"
    when$transaction : Transaction( amount > 10000 )$riskTag : RiskTag( riskLevel == 3 )
    thenSystem.out.println("High risk transaction detected: " + $transaction);
    endrule "Medium Risk Transaction"
    when$transaction : Transaction( amount > 5000 && amount <= 10000 )$riskTag : RiskTag( riskLevel >= 2 )
    thenSystem.out.println("Medium risk transaction detected: " + $transaction);
    endrule "Low Risk Transaction"
    when$transaction : Transaction()$riskTag : RiskTag( riskLevel == 1 )
    thenSystem.out.println("Transaction passed: " + $transaction);
    end
    

    7. KafkaStreamsConfig.java(Kafka Streams 配置)

  • package com.example.riskcontrol;import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.StreamsConfig;import java.util.Properties;public class KafkaStreamsConfig {public static Properties getProperties() {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "risk-control-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return props;}
    }
    

    8. RiskControlApplication.java(主应用程序)

  • package com.example.riskcontrol;import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.kstream.KStream;import java.sql.SQLException;public class RiskControlApplication {public static void main(String[] args) throws SQLException {// 初始化服务RedisService redisService = new RedisService("localhost", 6379);DatabaseService databaseService = new DatabaseService();RiskEvaluator riskEvaluator = new RiskEvaluator();// 配置 Kafka StreamsStreamsBuilder builder = new StreamsBuilder();KStream<String, String> sourceStream = builder.stream("transaction-topic");// 处理流sourceStream.foreach((key, value) -> {try {ObjectMapper objectMapper = new ObjectMapper();Transaction transaction = objectMapper.readValue(value, Transaction.class);// 从 Redis 获取风险标签RiskTag riskTag = redisService.getRiskTag(transaction.getAccountId());if (riskTag == null) {// 如果 Redis 中没有,从数据库获取并更新到 RedisriskTag = databaseService.getRiskTag(transaction.getAccountId());if (riskTag != null) {redisService.setRiskTag(riskTag);} else {// 如果数据库中也没有,设定默认风险标签riskTag = new RiskTag(transaction.getAccountId(), 1);}}// 使用 Drools 进行风险评估boolean isRisk = riskEvaluator.evaluate(transaction, riskTag);// 根据评估结果进行处理if (isRisk) {System.out.println("Transaction " + transaction.getTransactionId() + " is risky. Action: Block");// 发送阻止交易的消息或记录日志} else {System.out.println("Transaction " + transaction.getTransactionId() + " is safe. Action: Approve");// 发送通过交易的消息或记录日志}} catch (Exception e) {e.printStackTrace();}});// 启动 Kafka StreamsKafkaStreams streams = new KafkaStreams(builder.build(), KafkaStreamsConfig.getProperties());streams.start();// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(() -> {streams.close();redisService.close();try {databaseService.close();} catch (SQLException e) {e.printStackTrace();}}));}
    }
    

    运行示例

    1. 启动必要的服务

  • Redis:确保 Redis 服务在本地的 6379 端口运行。
  • Kafka:确保 Kafka 服务在本地的 9092 端口运行,并创建主题 transaction-topic
package com.example.riskcontrol;import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class TransactionProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);try {ObjectMapper objectMapper = new ObjectMapper();// 创建示例交易数据Transaction transaction = new Transaction("tx1001", "account123", 12000.0, System.currentTimeMillis());String transactionJson = objectMapper.writeValueAsString(transaction);ProducerRecord<String, String> record = new ProducerRecord<>("transaction-topic", transaction.getTransactionId(), transactionJson);producer.send(record);System.out.println("Transaction sent: " + transactionJson);} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}
}

. 运行应用程序

  • 先运行 RiskControlApplication,启动风控系统。
  • 再运行 TransactionProducer,发送交易数据。

4. 预期输出

风控系统将处理交易数据,使用 Drools 规则引擎进行评估,并根据规则打印评估结果。例如:

High risk transaction detected: Transaction{transactionId='tx1001', accountId='account123', amount=12000.0, timestamp=...}
Transaction tx1001 is risky. Action: Block

说明

  • Kafka Streams:用于实时消费交易数据,并进行数据处理。
  • Drools:规则引擎,用于评估交易的风险级别。
  • Redis:作为缓存,存储风险标签,快速获取账户的风险级别。
  • 分布式数据库(H2 数据库模拟):当 Redis 中没有风险标签时,从数据库获取,并更新到 Redis。
  • 风险标签:简单地使用风险级别(1-低风险,2-中风险,3-高风险)来表示。

注意事项

  • 异常处理:在实际应用中,需要更完善的异常处理机制,防止因异常导致程序崩溃。
  • 多线程与并发:在高并发场景下,需要考虑线程安全和性能优化。
  • 资源管理:确保所有的资源(如数据库连接、Redis 连接、Kafka Streams)在程序结束时正确关闭。
  • 配置管理:将硬编码的配置(如主机地址、端口、主题名)提取到配置文件中,便于管理和修改。

5、系统整体各个模块的调度关系流程

以下是系统各模块之间的交互流程,详细说明了调度关系:

  1. 交易数据的接收与预处理

    • 支付业务系统将实时交易数据通过消息队列模块(Kafka)接口与通信模块(API/gRPC)发送到实时交易数据处理模块
    • 实时交易数据处理模块接收数据后,进行数据预处理,如格式验证和完整性检查。
  2. 风险标签的获取

    • 实时交易数据处理模块需要获取交易涉及的账户或用户的风险标签。
    • 首先从**数据存储与缓存模块(Redis)**中查询风险标签。
    • 如果缓存中没有对应的风险标签,则从分布式数据库中读取,并更新到缓存。
  3. 风险评估

    • 实时交易数据处理模块将交易数据和风险标签一起传递给规则引擎模块
    • 规则引擎模块根据预定义的业务规则,对交易进行风险评估,生成评估结果(如通过、拒绝、需人工审核)。
  4. 评估结果的返回

    • 规则引擎模块将评估结果返回给实时交易数据处理模块
    • 实时交易数据处理模块通过接口与通信模块将评估结果反馈给支付业务系统,执行相应的业务操作。
  5. 风险标签的批量更新

    • 批量风险标签处理模块定期执行,获取历史数据进行风险标签的重新计算。
    • 计算出的风险标签存储在分布式数据库中,并同步更新到Redis 缓存
  6. 系统监控与安全

    • 监控与运维模块持续监控各模块的状态和性能,收集日志信息,设置报警机制。
    • 安全与合规模块确保数据传输和存储的安全性,对各模块的访问进行权限控制,满足合规要求。
[支付业务系统]|v
1. 发送交易数据|v
[消息队列模块(Kafka)/接口与通信模块(API/gRPC)]|v
[实时交易数据处理模块]|+--> 2. 从缓存获取风险标签|         ||         v|    [数据存储与缓存模块(Redis)]|         ||     若未命中|         v|    从数据库获取并更新缓存|         ||    [分布式数据库]|+--> 3. 调用规则引擎模块|         ||         v|    [规则引擎模块]|         ||     执行风险评估|         ||    返回评估结果|+--> 4. 返回评估结果给支付业务系统|         |v         v
[接口与通信模块] <---> [支付业务系统]

总结

上述示例提供了一个基本的程序框架,演示了如何将 Kafka、Kafka Streams、Drools、Redis 和分布式数据库集成在一起,完成实时风控的基本功能。在实际项目中,需要根据具体的业务需求和技术环境,对程序进行扩展和优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882968.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

小新学习K8s第一天之K8s基础概念

目录 一、Kubernetes&#xff08;K8s&#xff09;概述 1.1、什么是K8s 1.2、K8s的作用 1.3、K8s的功能 二、K8s的特性 2.1、弹性伸缩 2.2、自我修复 2.3、服务发现和负载均衡 2.4、自动发布&#xff08;默认滚动发布模式&#xff09;和回滚 2.5、集中化配置管理和密钥…

高效改进!防止DataX从HDFS导入关系型数据库丢数据

高效改进&#xff01;防止DataX从HDFS导入关系型数据库丢数据 针对DataX在从HDFS导入数据到关系型数据库过程中的数据丢失问题&#xff0c;优化了分片处理代码。改动包括将之前单一分片处理逻辑重构为循环处理所有分片&#xff0c;确保了每个分片数据都得到全面读取和传输&…

Python 实现 excel 数据过滤

一、场景分析 假设有如下一份 excel 数据 shop.xlsx, 写一段 python 程序&#xff0c;实现对于车牌的分组数据过滤。 并以车牌为文件名&#xff0c;把店名输出到 车牌.txt 文件中。 比如 闽A.txt 文件内容为&#xff1a; 小林书店福州店1 小林书店福州店2 二、依赖安装 程序依…

TBWeb正式稳定版V3.4.0+AI+MJ绘画+免授权无后门+详细安装教程

TBWeb正式稳定版V3.4.0AIMJ绘画免授权无后门详细安装教程&#xff1b; 运行环境 Nginx1.22 PHP5.7 MySQL7.4 Redis7.0 Node.js&#xff08;16.19.1&#xff09; PM2管理器5.6 TBWeb系统是基于 NineAI 二开的可商业化 TB Web 应用&#xff08;免授权&#xff0c;无后门&a…

【隐私计算】隐语HEU同态加密算法解读

HEU: 一个高性能的同态加密算法库&#xff0c;提供了多种 PHE 算法&#xff0c; 包括ZPaillier、FPaillier、IPCL、Damgard Jurik、DGK、OU、EC ElGamal 以及基于FPGA和GPU硬件加速版本的Paillier版本。 本文我们会基于GPU运行HEU Docker容器&#xff0c;编译打包GPaillier并测…

算法的学习笔记—两个链表的第一个公共结点(牛客JZ52)

&#x1f600;前言 在链表问题中&#xff0c;寻找两个链表的第一个公共结点是一个经典问题。这个问题的本质是在两个单链表中找到它们的相交点&#xff0c;或者说它们开始共享相同节点的地方。本文将详细讲解这个问题的解题思路&#xff0c;并提供一种高效的解决方法。 &#x…

蓝牙资讯|iOS 18.1 正式版下周推送,AirPods Pro 2耳机将带来助听器功能

苹果公司宣布将在下周发布 iOS 18.1 正式版&#xff0c;同时确认该更新将为 AirPods Pro 2 耳机带来新增“临床级”助听器功能。在启用功能后&#xff0c;用户首先需要使用 AirPods 和 iPhone 进行简短的听力测试&#xff0c;如果检测到听力损失&#xff0c;系统将创建一项“个…

docker run 命令解析

docker run 命令解析 docker run 命令用于从给定的镜像启动一个新的容器。这个命令可以包含许多选项&#xff0c;下面是一些常用的选项&#xff1a; -d&#xff1a;后台运行容器&#xff0c;并返回容器ID&#xff1b;-i&#xff1a;以交互模式运行容器&#xff0c;通常与 -t …

【C++】string类 (模拟实现详解 下)

我们接着上一篇【C】string类 &#xff08;模拟实现详解 上&#xff09;-CSDN博客继续对string模拟实现。从这篇内容开始&#xff0c;string相关函数的实现就要声明和定义分离了。 1.reserve、push_back和append 在string.h的string类里进行函数的声明。 void reserve(size_…

JVM(HotSpot):GC之垃圾回收器的分类

文章目录 前言一、串行二、吞吐量优先三、响应时间优先四、常见垃圾回收器使用组合 前言 上一篇&#xff0c;我们学习了分代回收机制 它的主要内容是对JVM内存的一个划分&#xff0c;以及垃圾回收器工作时&#xff0c;区域运作顺序的一个规定。 所以&#xff0c;它是一个规范。…

Spring Boot论坛网站:开发、部署与管理

3系统分析 3.1可行性分析 通过对本论坛网站实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本论坛网站采用SSM框架&#xff0c;JAVA作为开发语言&#xff0c;是…

智慧楼宇平台,构筑未来智慧城市的基石

随着城市化进程的加速&#xff0c;城市面临着前所未有的挑战。人口密度的增加、资源的紧张、环境的恶化以及对高效能源管理的需求&#xff0c;都在推动着我们寻找更加智能、可持续的城市解决方案。智慧楼宇作为智慧城市建设的重要组成部分&#xff0c;正逐渐成为推动城市可持续…

MATLAB电化学特性评估石墨和锂电

&#x1f3af;要点 模拟对比石墨电池的放电电压曲线与实验数据定性差异。对比双箔、多相多孔电极理论和锂电有限体积模型实现。通过孔隙电极理论模型了解粗粒平均质量和电荷传输以及孔隙率的表征意义。锂电中锂离子正向和逆向反应速率与驱动力的指数以及电解质和电极表面的锂浓…

Docker 部署 EMQX 一分钟极速部署

部署 EMQX ( Docker ) [Step 1] : 拉取 EMQX 镜像 docker pull emqx/emqx:latest[Step 2] : 创建目录 ➡️ 创建容器 ➡️ 拷贝文件 ➡️ 授权文件 ➡️ 删除容器 # 创建目录 mkdir -p /data/emqx/{etc,data,log}# 创建容器 docker run -d --name emqx -p 1883:1883 -p 1808…

「Qt Widget中文示例指南」如何实现半透明背景?

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 本文将为大家展示如…

《Linux从小白到高手》综合应用篇:深入理解Linux常用关键内核参数及其调优

1. 题记 有关Linux关键内核参数的调整&#xff0c;我前面的调优文章其实就有涉及到&#xff0c;只是比较零散&#xff0c;本篇集中深入介绍Linux常用关键内核参数及其调优&#xff0c;Linux调优80%以上都涉及到内核的这些参数的调整。 2. 文件系统相关参数 fs.file-max 参数…

Excel 对数据进行脱敏

身份证号脱敏&#xff1a;LEFT(A2,6)&REPT("*",6)&RIGHT(A2,6) 手机号脱敏&#xff1a;LEFT(B2,3)&REPT("*",5)&RIGHT(B2,3) 姓名脱敏&#xff1a;LEFT(C2,1)&REPT("*",1)&RIGHT(C2,1) 参考&#xff1a; excel匹配替换…

STM32F103C8T6 IO 操作

1.开启相关时钟 在 STM32 微控制器中&#xff0c;开启 GPIO 端口的时钟是确保 IO 口可以正常工作的第一步。 查找 RCC 寄存器使能时钟 在 STM32 中&#xff0c;时钟控制的寄存器通常位于 RCC (Reset and Clock Control) 模块中。不同的 STM32 系列&#xff08;如 STM32F1、STM…

【单元测试】深入解剖单元测试的思维逻辑

目录 一、前言二、准备环境三、 常用的mock语句3.1 模拟指定类的对象实例&#xff0c;用于模拟依赖对象&#xff08;类成员&#xff09;3.2 定义被测试对象3.3 模拟枚举类型/静态方法3.4 模拟依赖方法3.5 模拟构造方法3.6 验证方法调用次数3.7 验证返回值3.8 验证异常对象 四、…

第十六周:机器学习笔记

第十六周周报 摘要Abstratc一、机器学习1. Pointer Network&#xff08;指针网络&#xff09;2. 生成式对抗网络&#xff08;Generative Adversarial Networks | GAN&#xff09;——&#xff08;上&#xff09;2.1 Generator&#xff08;生成器&#xff09;2.2 Discriminator&…