深入理解 Flink(三)Flink 内核基础设施源码级原理详解

Hadoop 生态各大常见组件的 RPC 技术实现

在这里插入图片描述

Flink RPC 网络通信框架 Akka 详解

在这里插入图片描述
1、ActorSystem 是管理 Actor 生命周期的组件,Actor 是负责进行通信的组件。
2、每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。
3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用阻塞方法。
4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor(谁生的谁养)。
5、每一个 ActorSystem 和 Actor 都在启动的时候会给定一个 name,如果要从 ActorSystem 中,获取一个 Actor,则通过以下的方式来进行 Actor 的获取:

akka.tcp://actorsystem_name@bigdata02:9527/user/actor_name 

6、如果一个 Actor 要和另外一个 Actor 进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。

actorRef = actorSystem.actorOf("akka.tcp://actorsystem_name@bigdata02:9527/user/actor_name")
// 获取和对方 actor 进行通信的一个 actorRef 对象,类似于一个本地调用,但事实上,actorRef 和 对方actor 的通信细节被封装了。
actorRef = actorSystem.actorOf("schema://actorsystem_name@hostname:port/user/actor_name")
actorRef.getNow()

7、通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回调返回处理结果。

深入理解 Flink RPC 网络通信框架

Flink RPC 采用了和 Akka 一样的一种抽象,底层是基于 Akka 来实现。Flink RPC 其实是封装了 Akka 但是上层抽象其实和 Akka 的工作机制是一样的。Flink 的 RPC 网络通信框架的底层依然使用 Akka Actor Model 模型设计实现,大致实现和 Spark RPC 差不多。
Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,涉及到的最重要的 API 主要是以下这四个:
在这里插入图片描述

简单概况

1、RpcGateway 路由,RPC 的老祖宗。各种其他 RPC 服务组件,都是 RpcGateWay 的子类,类似于 Hadoop 中的通信协议 Protocol。
2、RpcEndpoint 业务逻辑载体,对应的 Actor 的封装。
3、RpcService 对应 ActorSystem 的封装,类似于 Spark 中的 RpcEnv。
4、RpcServer 为 RpcService(ActorSystem)和 RpcEndpoint(Actor)之间的粘合层

继承关系图

在这里插入图片描述
四个比较重要的子类:

  • TaskExecutor 集群中从节点中最重要的角色,负责资源管理。
  • Dispatcher 主节点中的一个工作角色,负责 job 调度执行。
  • JobMaster 应用程序中的主控程序,类似于 Spark 中的 Driver 的作用,或者 MapReduce 中的 MRAppMaster。
  • ResourceManager 集群中的主节点 JobManager 中的负责资源管理的角色,和 TaskExecutor 一起构成资源管理的主从架构。

这四个组件的任何一个组件的实例对象创建成功之后,都会要调用 start() 去启动这个 RpcEndpoint,然后就会去执行他的 RpcEndpoint 的 onStart() 方法。一般来说,对应的 RpcEndpoint 组件都会重写,在这些 RpcEndpoint 组件启动的时候,一些重要的逻辑,都有可能被放在这个 onStart() 生命周期方法里。

关于 Flink Standalone 集群
逻辑概念:JobManager + TaskManager
物理概念:ClusterEntryPoint(ResourceManager) + TaskManagerRunner(TaskExecutor)
主节点 ClusterEntryPoint 的内部其实有四种重要的组件:

  • ResourceManager
  • Dispatcher
  • RestServer
  • JobMaster

例如,在 TaskExecutor 的内部,持有 ResourceManager 的一个 Gateway 对象,当 TaskExecutor 需要给 ResourceManager 的时候,就通过 ResourceManagerGateWay 给 ResourceManager 发送消息。

Flink RpcEndpoint

JobManager 的 ResourceManager

ResourceManager 的职责就是帮助 主节点 JobManager 完成从节点 TaskManager 的管理和资源的管理和分配等工作。
在这里插入图片描述

TaskManager 的 TaskExecutor

Flink Standalone 集群是一个主从架构,主节点叫做 JobManager,从节点叫做 TaskManager。
在这里插入图片描述
这个 TaskExecutor 是存在于 TaskManager 的内部,真正完成资源提供和分配,接收任务和执行等相关工作。这个角色的意义更等同于 Spark 中的 Worker, YARN 集群中的 NodeManager。

Flink 核心工作组件整体架构抽象

在这里插入图片描述

Flink on YARN 的三种运行模式

在这里插入图片描述
Flink 通过 YARN 的接口实现了自己的 ApplicationMaster。当在 YARN 中部署了Flink,YARN 就会用自己的 Container 来启动 Flink 的 JobManager(也就是 ApplicationMaster)和 TaskManager。

Flink On YARN 有三种模式:
在这里插入图片描述

  • Session 模式:在 YARN 中初始化一个 Flink 集群,开辟指定的资源,之后我们提交的 Flink Job 都在这个 Flink yarn-session 中,也就是说不管提交多少个 job,这些 job 都会共用开始时在 YARN 中申请的资源。这个 Flink 集群会常驻在 YARN 集群中,除非手动停止。
  • Per-Job 模式:在 YARN 中,每次提交 job 都会创建一个新的 Flink 集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。 所以每个 Job 执行完毕,Flink 集群关闭,释放资源。
  • Application 模式:Flink-1.11 引入,Client 需要做的事情(main 方法的执行)转移到 JobManager中,多个 env.execute() 视为同一个 Application,相比 Per-Job 模式不用启动多个 Cluster。

在这里插入图片描述

./bin/flink run --target yarn-session # Submission to an already running Flink on YARN cluster
./bin/flink run --target yarn-per-job # Submission spinning up a Flink on YARN cluster in Per-Job Mode
./bin/flink run-application --target yarn-application # Submission spinning up Flink YARN cluster in Application Mode

具体可以参考官网:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#advanced-cli
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html

Flink On YARN 不同的模式,其实入口是不一样的,总的来说是:ClusterEntryPoint,ClusterEntryPoint 是 Flink 集群模式的入口基类,它的实现类结构如下:
在这里插入图片描述
注意:1.14 版本之后,Mesos 的支持已经被移除。(Mesos 背后的商业化公司 Mesosphere 于 2023 年破产倒闭)
YARN 模式 和 Standalone 模式最大的区别就是:

  • Standalone 模式,已经提前把 ClusterEntrypoint 和 TaskManagerRunner 启动好了。集群的资源总量是固定的。
  • Flink On YARN 模式中,在 YARN 集群中,申请到一个 Container 用来启动一个 SessionClusterEntrypoint,然后动态申请足够数量的 Container 来启动 TaskManagerRunner 来运行 Task。

Flink 高可用服务 HighAvailabilityServices

在这里插入图片描述
在 Flink 的内部,需要保证高可用服务的有:ResourceManager,Dispatcher,JobManager,WebMonitorEndpoint 四大组件。

ZooKeeperHaServices 内部最重要的两个方法:

public class ZooKeeperHaServices extends AbstractHaServices {@Overrideprotected LeaderElectionService createLeaderElectionService(String leaderName) {// 创建选举服务return ZooKeeperUtils.createLeaderElectionService(client, configuration, leaderName);}@Overrideprotected LeaderRetrievalService createLeaderRetrievalService(String leaderName) {// 创建监听服务return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, leaderName);}
}

其次重要的三个方法:

public class ZooKeeperHaServices extends AbstractHaServices {@Overridepublic CheckpointRecoveryFactory createCheckpointRecoveryFactory() {// TODO_MA 注释:return new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor);}@Overridepublic JobGraphStore createJobGraphStore() throws Exception {// TODO_MA 注释:JobGraphStore + ExecutionGraphStorereturn ZooKeeperUtils.createJobGraphs(client, configuration);}@Overridepublic RunningJobsRegistry createRunningJobsRegistry() {// TODO_MA 注释:return new ZooKeeperRunningJobsRegistry(client, configuration);}
}

Flink 选举服务 LeaderElectionService 和监听 LeaderRetrievalService 机制

  • LeaderElectionService : 用来做选举的服务,基于 ZK 实现,真正实现类的名字叫做: DefaultLeaderElectioinService。它的内部通过一个 选举驱动器 LeaderElectionDriver 来完成。LeaderElectionDriver 的内部其实通过 curator 框架提供的一个选举组件:LeaderLatch 来负责进行选举。
  • Flink 的选举和监听机制,都是依托于 Curator 框架的 API 进行封装提供了的实现,具体涉及到的实现类包括:LeaderContender(用于竞选)和 LeaderElectionService(选举服务)和 LeaderRetrievalService(监听服务)。
  • Curator 对应的监听 API

在这里插入图片描述

LeaderElectionService 接口定义

public interface LeaderElectionService {// 启动选举,启动方法将竞争者作为参数void start(LeaderContender contender) throws Exception;// 停止void stop() throws Exception;// 确认void confirmLeadership(UUID leaderSessionID, String leaderAddress);// 判断是否拥有指定 session 下的 leadershipboolean hasLeadership(@Nonnull UUID leaderSessionId);
}

LeaderContender 是 LeaderElectionService 中的参与选举的竞选者。它有四种实现(1.14):
在这里插入图片描述
也即前文中提到的需要保证高可用服务:ResourceManager,Dispatcher,JobManager,WebMonitorEndpoint 四大组件。这四个组件中组合了 LeaderElectionService,同时 LeaderElectionService 也组合了 LeaderContender(你中有我,我中有你)。

LeaderElectionService 选举实现

LeaderElectionService 实现了 LeaderElectionEventHandler 接口的两个方法:onGrantLeadership、onRevokeLeadership。
LeaderElectionService 调用 leaderElectionService.start(this) 开始执行选举,最终通过 ZooKeeperLeaderElectionDriver 实现选举;
ZooKeeperLeaderElectionDriver 实现了 LeaderLatchListener 接口的 isLeader、notLeader 方法,监听到 zookeeper 的对应事件后触发。
isLeader、notLeader 方法的内部,其实是调用 LeaderElectionService 实现的 LeaderElectionEventHandler 接口的两个方法:onGrantLeadership、onRevokeLeadership;
而上述两个方法最终会调用 LeaderContender 的 grantLeadership、revokeLeadership 方法。

// LeaderLatch 基于分布式锁实现的一个选举类
// NodeCache 监听类
public ZooKeeperLeaderElectionDriver(....){// 当 当前组件 选举成功,则回调 this 的 isLeader() 方法// 当 当前组件 没有选举成功,则回到 this 的 notLeader() 方法leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));leaderLatch.addListener(this);leaderLatch.start();// 当监听响应,则会回调 this 的 nodeChanged() 方法cache = new NodeCache(client, leaderPath);cache.getListenable().addListener(this);cache.start();
}

LeaderRetrievalService 接口定义

public interface LeaderRetrievalService {// 开启监听void start(LeaderRetrievalListener listener) throws Exception;// 结束监听void stop() throws Exception;
}
public interface LeaderRetrievalListener {// 监听回调void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);void handleError(Exception exception);
}

LeaderRetrievalService 实现了 LeaderRetrievalEventHandler 接口的 notifyLeaderAddress 方法。
LeaderRetrievalService 通过 start(LeaderRetrievalListener) 方法开启监听,最终通过 ZookeeperLeaderRetrievalDriver 实现监听响应。
当发生事件响应的时候,会执行 ZookeeperLeaderRetrievalDriver 的 handleStateChange 方法;最终会在 LeaderRetrievalService 的 notifyLeaderAddress 方法中调用 LeaderRetrievalListener 的同名方法 notifyLeaderAddress。
LeaderRetrievalListener 的实现类:
在这里插入图片描述

小结

上述过程应用到了:监听者模式(观察者模式) + 模板方法模式

Flink 文件/大对象服务 BlobService

在这里插入图片描述
在 Flink 框架中,Flink 提供了一个 BlobService 专门用来提供大文件、对象服务。通俗的说,就是一个文件服务器。存储方式在逻辑上,就是一个 Map,作用是为了集中分发;key 就是 BlobKey,value 就是一个文件。BlobService 接口的定义:

public interface BlobService extends Closeable {PermanentBlobService getPermanentBlobService();TransientBlobService getTransientBlobService();int getPort();
}

BlobService 有两个实现类:
在这里插入图片描述
其中 BlobServer 最为重要,BlobServer 的实现如下:

public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {// TODO_MA 注释: BlobServer 的内部,启动了一个 BIO 的服务端。用来给 BlobClient 提供服务private final ServerSocket serverSocket;// TODO_MA 注释: 提供存储服务private final BlobStore blobStore;// TODO_MA 注释: Active BlobServerConnection 链接集合private final Set<BlobServerConnection> activeConnections = new HashSet<>();// TODO_MA 注释: 最大链接数,默认 50,可以通过 blob.fetch.num-concurrent 参数进行修改或者配置private final int maxConnections;// TODO_MA 注释: 定时清理任务相关private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes = new ConcurrentHashMap<>();private final long cleanupInterval;private final Timer cleanupTimer;// TODO_MA 注释: 构造方法,详细见代码注释public BlobServer(Configuration config, BlobStore blobStore) throws IOException {// 见代码注释,主要是初始化一些成员变量和一些参数,然后启动一个定时任务,启动一个BIO服务端}// TODO_MA 注释: BlobServer 本身是一个线程public void run() {while(!this.shutdownRequested.get()) {// BlobServer 每接收到一个客户端的链接,就使用一个 线程来专门提供服务BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this);// BlobServerConnection 是一个线程,线程启动conn.start();}}
}

Flink 提供了 BlobServer 用来提供文件服务,当然也提供了一个 BlobClient 用来提交请求。这是一个典型的 C/S 架构。BlobClient 的内部,封装了一个 BIO 客户端。在 BlobServer 中,由一个 BlobServerConnection 专门给一个 BlobClient 提供服务。
BlobServerConnection 的结构:

class BlobServerConnection extends Thread {private final Socket clientSocket;private final BlobServer blobServer;public void run() {final InputStream inputStream = this.clientSocket.getInputStream();final OutputStream outputStream = this.clientSocket.getOutputStream();switch(operation) {// 存 文件case PUT_OPERATION:put(inputStream, outputStream, new byte[BUFFER_SIZE]);break;// 取 文件case GET_OPERATION:get(inputStream, outputStream, new byte[BUFFER_SIZE]);break;default:throw new IOException("Unknown operation " + operation);}}// 存文件实现,具体工作机制,看源代码注释private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {}// 取文件实心,具体工作机制,看源代码注释private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {}
}

BlobClient 的结构:

// 客户端
public final class BlobClient implements Closeable {// Socket 客户端private final Socket socket;// 构造方法public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) throws IOException {Socket socket = new Socket();socket.connect();}// 文件上传服务public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException {}public static List<PermanentBlobKey> uploadFiles(InetSocketAddress serverAddress, Configuration clientConfig, JobIDjobId,List<Path> files) throws IOException {}// 文件下载服务static void downloadFromBlobServer(@Nullable JobID jobId, BlobKey blobKey, File localJarFile, InetSocketAddressserverAddress, Configuration blobClientConfig, int numFetchRetries) throws IOException {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/612105.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Tomcat-快速使用

关于Tomcat的概念及来由在Tomcat基本知识中进行了介绍&#xff0c;下面我们直接上手快速使用起来。 一、快速使用 &#xff08;1&#xff09;tomcat下载 &#xff08;2&#xff09;解压缩 &#xff08;3&#xff09;启动程序 &#xff08;4&#xff09;访问tomcat&#xff1a…

Serverless 开拓无服务器时代:云计算的新趋势(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

二级C语言备考2

一、单选 共40题 &#xff08;共计40分&#xff09; 第1题 &#xff08;1.0分&#xff09; 题号:6951 难度:较易 第1章 以下叙述中错误的是 A:算法正确的程序可以有零个输入 B:算法正确的程序最终一定会结束 C:算法正确的程序可以有零个输出 D:算…

精华整理几十个Python数据科学、机器学习、深度学习、神经网络、人工智能方面的核心库以及详细使用实战案例,轻松几行代码训练自己的专有人工智能模型

精华整理几十个Python数据科学、机器学习、深度学习、神经网络、人工智能方面的核心库以及详细使用实战案例,轻松几行代码训练自己的专有人工智能模型。 机器学习 人工智能的核心,是使计算机具有智能的根本途径。机器学习专注于算法,允许机器学习而不需要编程,并在暴露于新…

OpenHarmony之编译构建使用指导

目录结构 /build # 编译构建主目录├── __pycache__ ├── build_scripts/ # 编译相关的python脚本 ├── common/ ├── config/ # 编译相关的配置项 ├─…

FlinkAPI开发之窗口(Window)

案例用到的测试数据请参考文章&#xff1a; Flink自定义Source模拟数据流 原文链接&#xff1a;https://blog.csdn.net/m0_52606060/article/details/135436048 窗口的概念 Flink是一种流式计算引擎&#xff0c;主要是来处理无界数据流的&#xff0c;数据源源不断、无穷无尽。…

10个提高 Python Web 开发效率的VS Code插件

VS Code具有灵活、便捷和丰富的可用插件库&#xff0c;是Web开发人员中非常受欢迎的代码编辑器。 本文介绍10个VS Code插件&#xff0c;它们可以提高你作为Web开发人员的工作效率。 1. Live Preview Live Preview插件支持在VS Code的小型浏览器中查看网站。因此&#xff0c;无…

数据库基础5

基本连接语句 教程 https://www.bilibili.com/video/BV1pw41147jm 导jar包 https://blog.csdn.net/qq_40893824/article/details/129118784 import java.sql.*;public class test {public static void main(String[] args) throws SQLException {//用户信息和urlString url …

【LeetCode2696】删除子串后的字符串最小长度

1、题目描述 【题目链接】 标签&#xff1a;栈 、字符串、模拟 难度&#xff1a;简单 给你一个仅由 大写 英文字符组成的字符串 s 。 你可以对此字符串执行一些操作&#xff0c;在每一步操作中&#xff0c;你可以从 s 中删除 任一个 “AB” 或 “CD” 子字符串。 通过执行操作…

pytorch导出onnx时遇到不支持的算子怎么解决

在使用pytorch模型训练完成之后&#xff0c;我们现在使用的比较多的一种方法是将pytorch模型转成onnx格式的模型中间文件&#xff0c;然后再根据使用的硬件来生成具体硬件使用的深度学习模型&#xff0c;比如TensorRT。 在从pytorch模型转为onnx时&#xff0c;我们可能会遇到部…

Vue面试之组件通信的方式总结(上篇)

Vue面试之组件通信的方式总结 props父子组件传参props父组件向子组件传参数组形式对象的简单形式对象的复杂形式 props子组件向父组件传参通过传递方法的形式通过传递事件的形式 插槽默认插槽具名插槽作用域插槽 最近在整理一些前端面试中经常被问到的问题&#xff0c;分为vue相…

go 语言常见问题(1)

1. 使用值为 nil 的 slice、map会发生啥 允许对值为 nil 的 slice 添加元素&#xff0c;但对值为 nil 的 map 添加元素&#xff0c;则会造成运行时 panic。 func main() {// slice 正确示例var s []ints append(s, 1)// map 错误示例var m map[string]intm["one"]…

ChatGPT3.5、GPT4.0、DALL·E 3和Midjourney对话与绘画智能体验

MidTool&#xff08;https://www.aimidtool.com/&#xff09;是一个集成了多种先进人工智能技术的助手&#xff0c;它融合了ChatGPT3.5、GPT4.0、DALLE 3和Midjourney等不同的智能服务&#xff0c;提供了一个多功能的体验。下面是这些技术的简要介绍&#xff1a; ChatGPT3.5&am…

办公场景日益多样化 企业如何保持安全?

当前&#xff0c;企业的办公场景日益多样化。远程办公、移动办公、云办公、分支机构等&#xff0c;这些新的办公场景也带来了新的网络安全挑战。以下将介绍一些办公场景带来的安全威胁。 1、远程办公&#xff1a;员工可以在任何地方工作&#xff0c;但同时也带来了网络安全的隐…

C++游戏引擎中的坐标系

一.Direct3D四大变换 <1.世界矩阵变换: 为了模拟3D物体的旋转,缩放,平移等功能,Direct3D将静态模型的顶点坐标x,y,z经过旋转平移矩阵变换以得到新的顶点坐标x1,y1,z1 D3DXMATRIX mTrans ; D3DXMatrixTranslation (&mTrans , 5 , - 3 , 0 ); g_pd3dDevice->SetTr…

Pytorch将标签转为One-Hot编码

一、标签映射与One-Hot编码过程 先进行标签映射&#xff0c;要为每个分类建立一个整数索引&#xff0c;对于每个样本的标签&#xff0c;使用整数索引创建一个长度为类别总数的二进制向量。这个向量的所有元素都是0&#xff0c;除了与整数索引相对应的位置&#xff0c;该位置的…

引领行业赛道!聚铭网络入选安全419年度策划“2023年教育行业优秀解决方案”

近日&#xff0c;由网络安全产业资讯媒体安全419主办的《年度策划》2023年度优秀解决方案评选结果正式出炉&#xff0c;聚铭网络「高校大日志留存分析及实名审计解决方案」从众多参选方案中脱颖而出&#xff0c;被评为“教育行业优秀解决方案”&#xff0c;以硬核实力引领行业赛…

java基础 -02java集合之 List,AbstractList,ArrayList介绍

补充上篇 AbstractCollection < E > 在正式List之前&#xff0c;我们先了解我们补充上篇Collection接口的拓展实现&#xff0c;也就是说当我我们需要实现一个不可修改的Collection的时候&#xff0c;我们只需要拓展某个类&#xff0c;也就是AbstractCollection这个类&a…

ChatGPT4+Python近红外光谱数据分析及机器学习与深度学习建模

2022年11月30日&#xff0c;可能将成为一个改变人类历史的日子——美国人工智能开发机构OpenAI推出了聊天机器人ChatGPT3.5&#xff0c;将人工智能的发展推向了一个新的高度。2023年4月&#xff0c;更强版本的ChatGPT4.0上线&#xff0c;文本、语音、图像等多模态交互方式使其在…

安全漏洞周报(2024.01.01-2023.01.08)

漏洞速览 ■ 用友CRM系统存在逻辑漏洞 漏洞详情 1. 用友CRM系统存在逻辑漏洞 漏洞介绍&#xff1a; 某友CRM系统是一款综合性的客户关系管理软件&#xff0c;旨在帮助企业建立和维护与客户之间的良好关系。它提供了全面的功能&#xff0c;包括销售管理、市场营销、客户服…