Hazelcast 分布式缓存 在Seatunnel中的使用

1、背景

最近在调研seatunnel的时候,发现新版的seatunnel提供了一个web服务,可以用于图形化的创建数据同步任务,然后管理任务。这里面有个日志模块,可以查看任务的执行状态。其中有个取读数据条数和同步数据条数。很好奇这个数据是怎么来的。跟踪源码发现Hazelcast。所以对Hazelcast进行了研究。

2、Hazelcast是什么

Hazelcast是一个开源的分布式内存数据网格(In-Memory Data Grid,简称IMDG)解决方案,主要用于分布式计算和缓存

  • 分布式数据结构:Hazelcast提供了一系列分布式数据结构,如Map、List、Set、Queue等,可以在集群中进行分布式存储和访问。
  • 缓存:Hazelcast提供了分布式缓存功能,可以将数据存储在内存中,以提供快速的访问速度。它支持多种缓存策略,如LRU(Least Recently Used)、LFU(Least Frequently Used)和TTL(Time to Live)等。
  • 分布式计算:Hazelcast支持将计算任务分布到集群中的多个节点上进行并行处理,提高应用程序的处理能力。
  • 高可靠性:Hazelcast使用分布式复制和故障转移机制,确保数据的可靠性和高可用性。它具有自动故障检测和恢复机制,可以在节点故障时自动迁移数据和任务。
  • 扩展性:Hazelcast可以方便地进行水平扩展,通过添加更多的节点来增加集群的处理能力。它支持动态添加和移除节点,而无需停止应用程序。
  • 集成性:Hazelcast提供了与各种应用程序和框架的集成,如Spring、Hibernate、JCache等。它还支持与其他分布式系统的集成,如Apache Kafka、Apache Ignite等。
  • 多语言支持:Hazelcast提供了对多种编程语言的支持,包括Java、C#、C++、Python和Node.js等

3、应用场景

  • 缓存:Hazelcast可以作为高性能的分布式缓存解决方案,用于缓存应用程序中的热点数据。
  • 分布式计算:Hazelcast提供了分布式计算框架,可以将计算任务分布到集群中的多个节点上进行并行处理,适用于金融、电信、电子商务等行业。
  • 实时数据处理:Hazelcast可以处理实时数据流,支持数据的实时处理和分析,适用于构建实时应用,如实时监控系统、实时推荐系统等。
  • 分布式会话管理:Hazelcast可以用于管理分布式会话,实现会话的共享和负载均衡。
  • 分布式数据存储:Hazelcast可以作为分布式数据存储解决方案,用于在多个节点间共享数据。

4、与Redis对比

可以看到Hazelcast可以理解为一个NoSQL,那就不得不说我们用的最多的Redis了。两者都提供了丰富的数据接口,比如map、list等等。那为什么不直接用Redis呢。我理解有下边几个方面的原因:

  1. 使用Redis需要额外的环境搭建,而Hazelcast如果使用内嵌的方式,则不需要额外的组件引入,做到了开箱即用。
  2. Hazelcast用的是应用服务器自身的内存,扩展性强,不需要外部内存(有点类似Caffeine)。
  3. Hazelcast对过期时间的支持没有Redis那么灵活。
  4. Hazelcast可以进行分布式计算。我们将数据存入到多个节点,通过分布式计算的api,从多个节点上读取数据,然后计算并返回。这也算是相较Redis的一个优势。
  5. Redis可以供多个应用使用共享数据,与应用解耦。Hazelcast一般使用需要嵌入应用。

如果不考虑分布式计算等场景,完全可以看那个方便。如果公司没有基础架构,并且是自己业务线的产品。那完全可以使用Hazelcast。免去了Redis的搭建、运维、管理等环境。否则还是老老实实的用Redis吧。

但是如果存在实时流式处理,那么使用Hazelcast的分布式特性是个不错的选择。比如咱们做一个监控系统,需要处理很多业务系统的数据,总不能单纯在Redis或者Mysql或者单机内存中处理吧。可以考虑试试Hazelcast。

5、怎么用

上边说了一堆的理论,说到底怎么用呢,这里以SpringBoot嵌入式为例。

  1. maven中添加依赖
    <dependency>  <groupId>com.hazelcast</groupId>  <artifactId>hazelcast</artifactId>  <version>你的Hazelcast版本号</version>  
    </dependency>  <!-- Hazelcast Spring Boot 集成(如果需要) -->  
    <dependency>  <groupId>com.hazelcast</groupId>  <artifactId>hazelcast-spring-boot</artifactId>  <version>你的Hazelcast Spring Boot集成版本号</version>  
    </dependency> 
  2. 代码
    import com.hazelcast.core.HazelcastInstance;  
    import com.hazelcast.map.IMap;  
    import org.springframework.beans.factory.annotation.Autowired;  
    import org.springframework.stereotype.Component;  @Component  
    public class HazelcastService {  @Autowired  private HazelcastInstance hazelcastInstance;  public void putData() {  IMap<String, String> map = hazelcastInstance.getMap("my-map");  map.put("key1", "value1");  }  public String getData(String key) {  IMap<String, String> map = hazelcastInstance.getMap("my-map");  return map.get(key);  }  
    }
  3. 启动成功
    分别启动两个服务,可以看到有两个Hazelcast节点组成的集群

6、源码

源码我想从两个方面去看

1、seatunnel-web提供的查看监控

  • 找到查看日志接口
@RequestMapping("/seatunnel/api/v1/task")
@RestController
public class TaskInstanceController {@Autowired ITaskInstanceService<SeaTunnelJobInstanceDto> taskInstanceService;@GetMapping("/jobMetrics")@ApiOperation(value = "get the jobMetrics list ", httpMethod = "GET")public Result<PageInfo<SeaTunnelJobInstanceDto>> getTaskInstanceList(@RequestAttribute(name = "userId") Integer userId,@RequestParam(name = "jobDefineName", required = false) String jobDefineName,@RequestParam(name = "executorName", required = false) String executorName,@RequestParam(name = "stateType", required = false) String stateType,@RequestParam(name = "startDate", required = false) String startTime,@RequestParam(name = "endDate", required = false) String endTime,@RequestParam("syncTaskType") String syncTaskType,@RequestParam("pageNo") Integer pageNo,@RequestParam("pageSize") Integer pageSize) {return taskInstanceService.getSyncTaskInstancePaging(userId,jobDefineName,executorName,stateType,startTime,endTime,syncTaskType,pageNo,pageSize);}
}
  • 进入getSyncTaskInstancePaging方法
public Result<PageInfo<SeaTunnelJobInstanceDto>> getSyncTaskInstancePaging(Integer userId,String jobDefineName,String executorName,String stateType,String startTime,String endTime,String syncTaskType,Integer pageNo,Integer pageSize) {JobDefinition jobDefinition = null;IPage<SeaTunnelJobInstanceDto> jobInstanceIPage;if (jobDefineName != null) {jobDefinition = jobDefinitionDao.getJobByName(jobDefineName);}Result<PageInfo<SeaTunnelJobInstanceDto>> result = new Result<>();PageInfo<SeaTunnelJobInstanceDto> pageInfo = new PageInfo<>(pageNo, pageSize);result.setData(pageInfo);baseService.putMsg(result, Status.SUCCESS);Date startDate = dateConverter(startTime);Date endDate = dateConverter(endTime);if (jobDefinition != null) {jobInstanceIPage =jobInstanceDao.queryJobInstanceListPaging(new Page<>(pageNo, pageSize),startDate,endDate,jobDefinition.getId(),syncTaskType);} else {jobInstanceIPage =jobInstanceDao.queryJobInstanceListPaging(new Page<>(pageNo, pageSize), startDate, endDate, null, syncTaskType);}List<SeaTunnelJobInstanceDto> records = jobInstanceIPage.getRecords();if (CollectionUtils.isEmpty(records)) {return result;}addJobDefineNameToResult(records);addRunningTimeToResult(records);// 关键代码,上边都是从本地数据库中获取的,这里会去Hazelcast中获取数据,并更新本地数据jobPipelineSummaryMetrics(records, syncTaskType, userId);pageInfo.setTotal((int) jobInstanceIPage.getTotal());pageInfo.setTotalList(records);result.setData(pageInfo);return result;}
  • 进入代码jobPipelineSummaryMetrics(records, syncTaskType, userId);
     
private void jobPipelineSummaryMetrics(List<SeaTunnelJobInstanceDto> records, String syncTaskType, Integer userId) {try {ArrayList<Long> jobInstanceIdList = new ArrayList<>();HashMap<Long, Long> jobInstanceIdAndJobEngineIdMap = new HashMap<>();for (SeaTunnelJobInstanceDto jobInstance : records) {if (jobInstance.getId() != null && jobInstance.getJobEngineId() != null) {jobInstanceIdList.add(jobInstance.getId());jobInstanceIdAndJobEngineIdMap.put(jobInstance.getId(), Long.valueOf(jobInstance.getJobEngineId()));}}Map<Long, JobSummaryMetricsRes> jobSummaryMetrics =// 获取每条日志数据的监控数据jobMetricsService.getALLJobSummaryMetrics(userId,jobInstanceIdAndJobEngineIdMap,jobInstanceIdList,syncTaskType);for (SeaTunnelJobInstanceDto taskInstance : records) {if (jobSummaryMetrics.get(taskInstance.getId()) != null) {taskInstance.setWriteRowCount(jobSummaryMetrics.get(taskInstance.getId()).getWriteRowCount());taskInstance.setReadRowCount(jobSummaryMetrics.get(taskInstance.getId()).getReadRowCount());}}} catch (Exception e) {for (SeaTunnelJobInstanceDto taskInstance : records) {log.error("instance {} {} set instance and engine id error", taskInstance.getId(), e);}}}
  • 进入jobMetricsService.getALLJobSummaryMetrics( userId,jobInstanceIdAndJobEngineIdMap, jobInstanceIdList, syncTaskType);
     
@Overridepublic Map<Long, JobSummaryMetricsRes> getALLJobSummaryMetrics(@NonNull Integer userId,@NonNull Map<Long, Long> jobInstanceIdAndJobEngineIdMap,@NonNull List<Long> jobInstanceIdList,@NonNull String syncTaskType) {log.info("jobInstanceIdAndJobEngineIdMap={}", jobInstanceIdAndJobEngineIdMap);funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_METRICS_SUMMARY, userId);List<JobInstance> allJobInstance = jobInstanceDao.getAllJobInstance(jobInstanceIdList);if (allJobInstance.isEmpty()) {log.warn("getALLJobSummaryMetrics : allJobInstance is empty, task id list is {}",jobInstanceIdList);return new HashMap<>();}Map<Long, JobSummaryMetricsRes> result = null;Map<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsFromEngine =// 从Hazelcast集群节点中获取监控数据getAllRunningJobMetricsFromEngine(allJobInstance.get(0).getEngineName(),allJobInstance.get(0).getEngineVersion());// 通过不同的方式获取数据if (syncTaskType.equals("BATCH")) {result =getMatricsListIfTaskTypeIsBatch(allJobInstance,userId,allRunningJobMetricsFromEngine,jobInstanceIdAndJobEngineIdMap);} else if (syncTaskType.equals("STREAMING")) {result =getMatricsListIfTaskTypeIsStreaming(allJobInstance,userId,allRunningJobMetricsFromEngine,jobInstanceIdAndJobEngineIdMap);}log.info("result is {}", result == null ? "null" : result.toString());return result;}
  • 进入方法getAllRunningJobMetricsFromEngine(allJobInstance.get(0).getEngineName(),allJobInstance.get(0).getEngineVersion());
     
private Map<Long, HashMap<Integer, JobMetrics>> getAllRunningJobMetricsFromEngine(String engineName, String engineVersion) {Engine engine = new Engine(engineName, engineVersion);IEngineMetricsExtractor engineMetricsExtractor =(new EngineMetricsExtractorFactory(engine)).getEngineMetricsExtractor();// 看名字就知道这个是获取任务的监控数据的return engineMetricsExtractor.getAllRunningJobMetrics();}
  • 进入engineMetricsExtractor.getAllRunningJobMetrics();
     
@Overridepublic Map<Long, HashMap<Integer, JobMetrics>> getAllRunningJobMetrics() {HashMap<Long, HashMap<Integer, JobMetrics>> allRunningJobMetricsHashMap = new HashMap<>();try {
// 是不是很熟悉。seatunnelproxy,一看就是从这里开始真正和Hazelcast交互,获取数据了String allJobMetricsContent = seaTunnelEngineProxy.getAllRunningJobMetricsContent();if (StringUtils.isEmpty(allJobMetricsContent)) {return new HashMap<>();}JsonNode jsonNode = JsonUtils.stringToJsonNode(allJobMetricsContent);Iterator<JsonNode> iterator = jsonNode.iterator();while (iterator.hasNext()) {LinkedHashMap<Integer, JobMetrics> metricsMap = new LinkedHashMap();JsonNode next = iterator.next();JsonNode sourceReceivedCount = next.get("metrics").get("SourceReceivedCount");Long jobEngineId = 0L;if (sourceReceivedCount != null && sourceReceivedCount.isArray()) {for (JsonNode node : sourceReceivedCount) {jobEngineId = node.get("tags").get("jobId").asLong();Integer pipelineId = node.get("tags").get("pipelineId").asInt();JobMetrics currPipelineMetrics =getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);currPipelineMetrics.setReadRowCount(currPipelineMetrics.getReadRowCount() + node.get("value").asLong());}}JsonNode sinkWriteCount = next.get("metrics").get("SinkWriteCount");if (sinkWriteCount != null && sinkWriteCount.isArray()) {for (JsonNode node : sinkWriteCount) {jobEngineId = node.get("tags").get("jobId").asLong();Integer pipelineId = node.get("tags").get("pipelineId").asInt();JobMetrics currPipelineMetrics =getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);currPipelineMetrics.setWriteRowCount(currPipelineMetrics.getWriteRowCount()+ node.get("value").asLong());}}JsonNode sinkWriteQPS = next.get("metrics").get("SinkWriteQPS");if (sinkWriteQPS != null && sinkWriteQPS.isArray()) {for (JsonNode node : sinkWriteQPS) {Integer pipelineId = node.get("tags").get("pipelineId").asInt();JobMetrics currPipelineMetrics =getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);currPipelineMetrics.setWriteQps(currPipelineMetrics.getWriteQps()+ (new Double(node.get("value").asDouble())).longValue());}}JsonNode sourceReceivedQPS = next.get("metrics").get("SourceReceivedQPS");if (sourceReceivedQPS != null && sourceReceivedQPS.isArray()) {for (JsonNode node : sourceReceivedQPS) {Integer pipelineId = node.get("tags").get("pipelineId").asInt();JobMetrics currPipelineMetrics =getOrCreatePipelineMetricsMapStatusRunning(metricsMap, pipelineId);currPipelineMetrics.setReadQps(currPipelineMetrics.getReadQps()+ (new Double(node.get("value").asDouble())).longValue());}}JsonNode cdcRecordEmitDelay = next.get("metrics").get("CDCRecordEmitDelay");if (cdcRecordEmitDelay != null && cdcRecordEmitDelay.isArray()) {Map<Integer, List<Long>> dataMap = new HashMap<>();for (JsonNode node : cdcRecordEmitDelay) {Integer pipelineId = node.get("tags").get("pipelineId").asInt();long value = node.get("value").asLong();dataMap.computeIfAbsent(pipelineId, n -> new ArrayList<>()).add(value);}dataMap.forEach((key, value) -> {JobMetrics currPipelineMetrics =getOrCreatePipelineMetricsMapStatusRunning(metricsMap, key);OptionalDouble average =value.stream().mapToDouble(a -> a).average();currPipelineMetrics.setRecordDelay(Double.valueOf(average.isPresent()? average.getAsDouble(): 0).longValue());});}log.info("jobEngineId={},metricsMap={}", jobEngineId, metricsMap);allRunningJobMetricsHashMap.put(jobEngineId, metricsMap);}} catch (Exception e) {e.printStackTrace();}return allRunningJobMetricsHashMap;}
  • 到这里如果有实际操作过seatunnel-web界面的同学们肯定知道,这个基本就已经触及监控数据的来源了。
  • 进入seaTunnelEngineProxy.getAllRunningJobMetricsContent();
     
public String getAllRunningJobMetricsContent() {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);try {return seaTunnelClient.getJobClient().getRunningJobMetrics();} finally {seaTunnelClient.close();}}
  • 代码很简单,没啥说的继续跟踪
     
public String getRunningJobMetrics() {return (String)this.hazelcastClient.requestOnMasterAndDecodeResponse(SeaTunnelGetRunningJobMetricsCodec.encodeRequest(), SeaTunnelGetRunningJobMetricsCodec::decodeResponse);}
  • hazelcastClient,是不是眼熟。是的,seatunnel对hazelcast的调用,封装了很深。马上就胜利了,继续跟代码
     
public <S> S requestOnMasterAndDecodeResponse(@NonNull ClientMessage request, @NonNull Function<ClientMessage, Object> decoder) {if (request == null) {throw new NullPointerException("request is marked non-null but is null");} else if (decoder == null) {throw new NullPointerException("decoder is marked non-null but is null");} else {UUID masterUuid = this.hazelcastClient.getClientClusterService().getMasterMember().getUuid();return this.requestAndDecodeResponse(masterUuid, request, decoder);}}
  • 获取到我们要从那个hazelcast节点获取数据的信息,然后去调用
     
public <S> S requestAndDecodeResponse(@NonNull UUID uuid, @NonNull ClientMessage request, @NonNull Function<ClientMessage, Object> decoder) {if (uuid == null) {throw new NullPointerException("uuid is marked non-null but is null");} else if (request == null) {throw new NullPointerException("request is marked non-null but is null");} else if (decoder == null) {throw new NullPointerException("decoder is marked non-null but is null");} else {ClientInvocation invocation = new ClientInvocation(this.hazelcastClient, request, (Object)null, uuid);try {ClientMessage response = (ClientMessage)invocation.invoke().get();return this.serializationService.toObject(decoder.apply(response));} catch (InterruptedException var6) {Thread.currentThread().interrupt();return null;} catch (Throwable var7) {throw ExceptionUtil.rethrow(var7);}}}
  • 着重记忆一下ClientInvocation和ClientMessage。因为在跟踪hazelcase-api的代码的时候,就是用的这里。
  • 在下边就是调用hazelcast的客户端,发送请求,然后get阻塞,直到数据返回。

2、Hazelcast-api

  • hazelcast的api调用,我们以下面这段代码为入口开始看源码。
import com.hazelcast.core.HazelcastInstance;  
import com.hazelcast.map.IMap;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  @Component  
public class HazelcastService {  @Autowired  private HazelcastInstance hazelcastInstance;  public void putData() {  IMap<String, String> map = hazelcastInstance.getMap("my-map");  map.put("key1", "value1");  }  public String getData(String key) {  IMap<String, String> map = hazelcastInstance.getMap("my-map");  return map.get(key);  }  
}
  • 可以看到hazelcast的使用基本和java的数据结构使用一样。所以如果我们要使用hazelcast还是很方便入手的。
  • 进入hazelcast封装的map的put方法
     
@Overridepublic V get(@Nonnull Object key) {checkNotNull(key, NULL_KEY_IS_NOT_ALLOWED);return toObject(getInternal(key));}
  • 进入getInternal方法
     
protected Object getInternal(Object key) {// TODO: action for read-backup true is not well testedData keyData = toDataWithStrategy(key);if (mapConfig.isReadBackupData()) {Object fromBackup = readBackupDataOrNull(keyData);if (fromBackup != null) {return fromBackup;}}MapOperation operation = operationProvider.createGetOperation(name, keyData);operation.setThreadId(getThreadId());return invokeOperation(keyData, operation);}
  • 将参数封装为了hazelcast的map数据结构,并调用操作方法
     
private Object invokeOperation(Data key, MapOperation operation) {int partitionId = partitionService.getPartitionId(key);operation.setThreadId(getThreadId());try {Object result;if (statisticsEnabled) {long startTimeNanos = Timer.nanos();Future future = operationService.createInvocationBuilder(SERVICE_NAME, operation, partitionId).setResultDeserialized(false).invoke();result = future.get();incrementOperationStats(operation, localMapStats, startTimeNanos);} else {Future future = operationService.createInvocationBuilder(SERVICE_NAME, operation, partitionId).setResultDeserialized(false).invoke();result = future.get();}return result;} catch (Throwable t) {throw rethrow(t);}}
  • 执行方法,并返回了一个InvocationFuture,这个InvocationFuture对象是集成了CompletableFuture的一个future,所以如果需要,也可以使用多线程编排,执行复杂查询的。
     
@Overridepublic InvocationFuture invoke() {op.setServiceName(serviceName);Invocation invocation;if (target == null) {op.setPartitionId(partitionId).setReplicaIndex(replicaIndex);invocation = new PartitionInvocation(context, op, doneCallback, tryCount, tryPauseMillis, callTimeout, resultDeserialized,failOnIndeterminateOperationState, connectionManager);} else {invocation = new TargetInvocation(context, op, target, doneCallback, tryCount, tryPauseMillis,callTimeout, resultDeserialized, connectionManager);}return async? invocation.invokeAsync(): invocation.invoke();}
  • 可以看到真正去执行的是不同类型的Invocation。并且可以根据是同步还是异步,调用不同的执行方法,我们直接看invoke方法。
     
private void invoke0(boolean isAsync) {if (invokeCount > 0) {throw new IllegalStateException("This invocation is already in progress");} else if (isActive()) {throw new IllegalStateException("Attempt to reuse the same operation in multiple invocations. Operation is " + op);}try {setCallTimeout(op, callTimeoutMillis);setCallerAddress(op, context.thisAddress);op.setNodeEngine(context.nodeEngine);boolean isAllowed = context.operationExecutor.isInvocationAllowed(op, isAsync);if (!isAllowed && !isMigrationOperation(op)) {throw new IllegalThreadStateException(Thread.currentThread() + " cannot make remote call: " + op);}doInvoke(isAsync);} catch (Exception e) {handleInvocationException(e);}}
  • 继续进入doInvoke方法
     
private void doInvoke(boolean isAsync) {if (!engineActive()) {return;}invokeCount++;setInvocationTime(op, context.clusterClock.getClusterTime());// We'll initialize the invocation before registering it. Invocation monitor iterates over// registered invocations and it must observe completely initialized invocations.Exception initializationFailure = null;try {initInvocationTarget();} catch (Exception e) {// We'll keep initialization failure and notify invocation with this failure// after invocation is registered to the invocation registry.initializationFailure = e;}if (!context.invocationRegistry.register(this)) {return;}if (initializationFailure != null) {notifyError(initializationFailure);return;}if (isLocal()) {doInvokeLocal(isAsync);} else {doInvokeRemote();}}
  • 如果是本地调用,进入doInvokeLocal。如果是远程调用进入doInvokeRemote。如果是springboot直接引入的情况下,进入本地调用
  • 调用远程的hazelcast集群的。进入doInvokeRemote方法。
  • 例子中是本地调用,所以进入doInvokeLocal,这里的代码本文就不继续跟进去,如果感兴趣可以debug进去看看,大概的逻辑是调用execute方法,然后将MapOperation(Operation对象)放到一个队列中,线程池异步执行,我们着重看下MapOperation。
     
public abstract class MapOperation extends AbstractNamedOperationimplements IdentifiedDataSerializable, ServiceNamespaceAware {private static final boolean ASSERTION_ENABLED = MapOperation.class.desiredAssertionStatus();protected transient MapService mapService;protected transient RecordStore<Record> recordStore;protected transient MapContainer mapContainer;protected transient MapServiceContext mapServiceContext;protected transient MapEventPublisher mapEventPublisher;protected transient boolean createRecordStoreOnDemand = true;protected transient boolean disposeDeferredBlocks = true;private transient boolean canPublishWanEvent;public MapOperation() {}public MapOperation(String name) {this.name = name;}@Overridepublic final void beforeRun() throws Exception {super.beforeRun();mapService = getService();mapServiceContext = mapService.getMapServiceContext();mapEventPublisher = mapServiceContext.getMapEventPublisher();try {recordStore = getRecordStoreOrNull();if (recordStore == null) {mapContainer = mapServiceContext.getMapContainer(name);} else {mapContainer = recordStore.getMapContainer();}} catch (Throwable t) {disposeDeferredBlocks();throw rethrow(t, Exception.class);}canPublishWanEvent = canPublishWanEvent(mapContainer);assertNativeMapOnPartitionThread();innerBeforeRun();}protected void innerBeforeRun() throws Exception {if (recordStore != null) {recordStore.beforeOperation();}// Concrete classes can override this method.}@Overridepublic final void run() {try {runInternal();} catch (NativeOutOfMemoryError e) {rerunWithForcedEviction();}}protected void runInternal() {// Intentionally empty method body.// Concrete classes can override this method.}private void rerunWithForcedEviction() {try {runWithForcedEvictionStrategies(this);} catch (NativeOutOfMemoryError e) {disposeDeferredBlocks();throw e;}}@Overridepublic final void afterRun() throws Exception {afterRunInternal();disposeDeferredBlocks();super.afterRun();}protected void afterRunInternal() {// Intentionally empty method body.// Concrete classes can override this method.}@Overridepublic void afterRunFinal() {if (recordStore != null) {recordStore.afterOperation();}}protected void assertNativeMapOnPartitionThread() {if (!ASSERTION_ENABLED) {return;}assert mapContainer.getMapConfig().getInMemoryFormat() != NATIVE|| getPartitionId() != GENERIC_PARTITION_ID: "Native memory backed map operations are not allowed to run on GENERIC_PARTITION_ID";}ILogger logger() {return getLogger();}protected final CallerProvenance getCallerProvenance() {return disableWanReplicationEvent() ? CallerProvenance.WAN : CallerProvenance.NOT_WAN;}private RecordStore getRecordStoreOrNull() {int partitionId = getPartitionId();if (partitionId == -1) {return null;}PartitionContainer partitionContainer = mapServiceContext.getPartitionContainer(partitionId);if (createRecordStoreOnDemand) {return partitionContainer.getRecordStore(name);} else {return partitionContainer.getExistingRecordStore(name);}}@Overridepublic void onExecutionFailure(Throwable e) {disposeDeferredBlocks();super.onExecutionFailure(e);}@Overridepublic void logError(Throwable e) {ILogger logger = getLogger();if (e instanceof NativeOutOfMemoryError) {Level level = this instanceof BackupOperation ? Level.FINEST : Level.WARNING;logger.log(level, "Cannot complete operation! -> " + e.getMessage());} else {// we need to introduce a proper method to handle operation failures (at the moment// this is the only place where we can dispose native memory allocations on failure)disposeDeferredBlocks();super.logError(e);}}void disposeDeferredBlocks() {if (!disposeDeferredBlocks|| recordStore == null|| recordStore.getInMemoryFormat() != NATIVE) {return;}recordStore.disposeDeferredBlocks();}private boolean canPublishWanEvent(MapContainer mapContainer) {boolean canPublishWanEvent = mapContainer.isWanReplicationEnabled()&& !disableWanReplicationEvent();if (canPublishWanEvent) {mapContainer.getWanReplicationDelegate().doPrepublicationChecks();}return canPublishWanEvent;}@Overridepublic String getServiceName() {return MapService.SERVICE_NAME;}public boolean isPostProcessing(RecordStore recordStore) {MapDataStore mapDataStore = recordStore.getMapDataStore();return mapDataStore.isPostProcessingMapStore()|| !mapContainer.getInterceptorRegistry().getInterceptors().isEmpty();}public void setThreadId(long threadId) {throw new UnsupportedOperationException();}public long getThreadId() {throw new UnsupportedOperationException();}protected final void invalidateNearCache(List<Data> keys) {if (!mapContainer.hasInvalidationListener() || isEmpty(keys)) {return;}Invalidator invalidator = getNearCacheInvalidator();for (Data key : keys) {invalidator.invalidateKey(key, name, getCallerUuid());}}// TODO: improve here it's possible that client cannot manage to attach listenerpublic final void invalidateNearCache(Data key) {if (!mapContainer.hasInvalidationListener() || key == null) {return;}Invalidator invalidator = getNearCacheInvalidator();invalidator.invalidateKey(key, name, getCallerUuid());}/*** This method helps to add clearing Near Cache event only from* one-partition which matches partitionId of the map name.*/protected final void invalidateAllKeysInNearCaches() {if (mapContainer.hasInvalidationListener()) {int partitionId = getPartitionId();Invalidator invalidator = getNearCacheInvalidator();if (partitionId == getNodeEngine().getPartitionService().getPartitionId(name)) {invalidator.invalidateAllKeys(name, getCallerUuid());} else {invalidator.forceIncrementSequence(name, getPartitionId());}}}private Invalidator getNearCacheInvalidator() {MapNearCacheManager mapNearCacheManager = mapServiceContext.getMapNearCacheManager();return mapNearCacheManager.getInvalidator();}protected final void evict(Data justAddedKey) {if (mapContainer.getEvictor() == Evictor.NULL_EVICTOR) {return;}recordStore.evictEntries(justAddedKey);disposeDeferredBlocks();}@Overridepublic int getFactoryId() {return MapDataSerializerHook.F_ID;}@Overridepublic ObjectNamespace getServiceNamespace() {MapContainer container = mapContainer;if (container == null) {MapService service = getService();container = service.getMapServiceContext().getMapContainer(name);}return container.getObjectNamespace();}// for testing onlypublic void setMapService(MapService mapService) {this.mapService = mapService;}// for testing onlypublic void setMapContainer(MapContainer mapContainer) {this.mapContainer = mapContainer;}protected final void publishWanUpdate(Data dataKey, Object value) {publishWanUpdateInternal(dataKey, value, false);}private void publishWanUpdateInternal(Data dataKey, Object value, boolean hasLoadProvenance) {if (!canPublishWanEvent) {return;}Record<Object> record = recordStore.getRecord(dataKey);if (record == null) {return;}Data dataValue = toHeapData(mapServiceContext.toData(value));ExpiryMetadata expiryMetadata = recordStore.getExpirySystem().getExpiryMetadata(dataKey);WanMapEntryView<Object, Object> entryView = createWanEntryView(toHeapData(dataKey), dataValue, record, expiryMetadata,getNodeEngine().getSerializationService());mapEventPublisher.publishWanUpdate(name, entryView, hasLoadProvenance);}protected final void publishLoadAsWanUpdate(Data dataKey, Object value) {publishWanUpdateInternal(dataKey, value, true);}protected final void publishWanRemove(@Nonnull Data dataKey) {if (!canPublishWanEvent) {return;}mapEventPublisher.publishWanRemove(name, toHeapData(dataKey));}protected boolean disableWanReplicationEvent() {return false;}protected final TxnReservedCapacityCounter wbqCapacityCounter() {return recordStore.getMapDataStore().getTxnReservedCapacityCounter();}protected final Data getValueOrPostProcessedValue(Record record, Data dataValue) {if (!isPostProcessing(recordStore)) {return dataValue;}return mapServiceContext.toData(record.getValue());}@Overridepublic TenantControl getTenantControl() {return getNodeEngine().getTenantControlService().getTenantControl(MapService.SERVICE_NAME, name);}@Overridepublic boolean requiresTenantContext() {return true;}
}
  • 既然要线程异步去执行,所以它肯定要实现run方法,所以找到run方法,进入runInternal。实现方法很多,找到map包相关的类。
     
@Overrideprotected void runInternal() {Object currentValue = recordStore.get(dataKey, false, getCallerAddress());if (noCopyReadAllowed(currentValue)) {// in case of a 'remote' call (e.g a client call) we prevent making// an on-heap copy of the off-heap dataresult = (Data) currentValue;} else {// in case of a local call, we do make a copy, so we can safely share// it with e.g. near cache invalidationresult = mapService.getMapServiceContext().toData(currentValue);}}
  • 这里基本就是获取到hazelcast管理的内存中数据的地方,不再一一debug,一路向下找到代码
     
public V get(Object key) {int hash = hashOf(key);return segmentFor(hash).get(key, hash);}
  • 怎么样,熟悉吧。java的map调用是不是也是这样,先hash找到位置,在获取数据。其实这里的hash和map的hash有一些区别。这是由于hazelcast的架构决定的,如果对原理架构感兴趣可以百度搜一搜,很多。这里大概提一嘴,有一个分片的概念,put的时候会hash到不同的分区(分片)。这也是hazelcast分布式的原理。

7、结语

本文只是介绍了hazelcast的最基本用法,如果按照案例中的使用,完全可以用redis或者本地缓存。但是如果有了更高级(实际中的使用),那么hazelcast的分布式计算特性还是很好用的。源码也只是分析了本地的调用。如果感兴趣其实可以debug跟进去看下远程调用的方式。其实想想本质还是一样,远程调用就需要1、发现节点;2、注册节点;3、网络调用其他节点。而seatunnel的调用就相对来说更高级一些,它进行了一系列的封装。最后也还是网络调用其他节点。然后返回future阻塞等待返回结果,由于是内存级别的,处理特别快。

对了差点忘记一点,一直在说分布式特性。本文只说了单纯作为缓存使用get、put方法。这里大概介绍下分布式api的使用

IExecutorService executorService = hazelcastInstance.getExecutorService("myExecutor");  
Runnable task = () -> {  // 这里是任务的逻辑  System.out.println("Executing task on " + hazelcastInstance.getCluster().getLocalMember().getAddress());  
};  
Future<Void> future = executorService.submit(task);  
future.get(); // 等待任务完成

这样就可以查询分布式节点上的数据,然后聚合返回。是不是有点像MapReduce。确实,hazelcast也可以使用MapReduce进行复杂运算,想了解的,也可以去搜一搜看看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/29094.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

羊城杯 2020 a_piece_of_java

考点:JDBC反序列化打CC链动态代理类触发readobject 一眼看过去 好像只有一个mysql-connector-java 可以利用jdbc 可能的攻击路径就有1) Mysql服务器任意文件读取 2) JDBC反序列化打依赖链 出现了一个不常见的依赖库 serialkiller 做了反序列化的过滤器 可以尝试查看其源码 htt…

2000-2022年上市公司员工、工资数据

2000-2022年上市公司员工、工资数据 1、时间&#xff1a;2000-2022年 2、来源&#xff1a;上市公司年报 3、指标&#xff1a;年份、股票代码、股票简称、行业名称、行业代码、省份、城市、区县、行政区划代码、城市代码、区县代码、首次上市年份、上市状态、员工人数_人、应…

Windows 与 Java 环境下的 Redis 利用分析

1 前言 在最近的一次攻防演练中&#xff0c;遇到了两个未授权访问的 Redis 实例。起初以为可以直接利用&#xff0c;但后来发现竟然是Windows Java (Tomcat)。因为网上没有看到相关的利用文章&#xff0c;所以在经过摸索&#xff0c;成功解决之后决定简单写一写。 本文介绍了…

【工程2区】毕业神刊 —— 1-2个月录用!非黑!非预警!

【欧亚科睿学术】 电力能源类SCIE ✅ 进展超顺 ✅ 录用率高 ✅ 领域相关均可 【期刊简介】IF&#xff1a;1.0-2.0&#xff0c;JCR2区&#xff0c;中科院4区 【版面类型】正刊&#xff0c;仅少量版面 【终审周期】走期刊部系统&#xff0c;预计3个月左右录用 【检索情况…

使用SpringBoot对接Kafka

Kafka是什么&#xff0c;以及如何使用SpringBoot对接Kafka 一、Kafka与流处理 我们先来看看比较正式的介绍&#xff1a;Kafka是一种流处理平台&#xff0c;由LinkedIn公司创建&#xff0c;现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高…

组件二次封装,通过属性事件透传,插槽使用,组件实例方法的绑定,深入理解 Vue.js 组件扩展与插槽

透传&#xff0c;插槽&#xff0c;组件实例方法的绑定&#xff0c;深入理解 Vue.js 组件扩展与插槽 前言 Vue.js 提供了强大的组件化系统&#xff0c;允许开发者构建可复用、可组合的UI组件。在实际项目中&#xff0c;直接使用第三方库提供的基础组件&#xff08;如Element UI…

Internet Download Manager(IDM6.41)软件下载-详细安装教程视频

Internet Download Manager有一个智能下载逻辑加速器&#xff0c;具有智能动态文件分割和安全的多部分下载技术&#xff0c;可以加速下载。与其他下载加速器和管理器不同&#xff0c;Internet下载管理器在下载开始之前对文件进行分段&#xff0c;而Internet下载管理器在下载过程…

用TensorRT-LLM进行LLama的推理和部署

Deploy an AI Coding Assistant with NVIDIA TensorRT-LLM and NVIDIA Triton | NVIDIA Technical BlogQuick Start Guide — tensorrt_llm documentation (nvidia.github.io) 使用TensorRT-LLM的源码&#xff0c;来下载docker并在docker里编译TensorRT-LLM&#xff1b; 模型…

Android Calculator2源码分析与修改

private CalculatorDisplay mDisplay; private Symbols mSymbols new Symbols(); -41,6 44,7 class Logic { private int mLineLength 0; private static final String INFINITY_UNICODE “\u221e”; private static final String ZMS_NUMBER “55555”; public stat…

Linux构建本地时间同步ntp

环境介绍&#xff1a; 主机名 IP地址 系统发行版 环境 Node01 192.168.100.102 Centos 7.4 可联网、已关闭防火墙selinux Node02 192.168.100.103 Centos 7.4 已关闭防火墙selinux 1.主节点同步阿里云标准时间 在保证连接外网的情况下&#xff0c;同步阿里服务器的…

Spring的SmartLifecycle可以没用过,但没听过就不好了! - 第517篇

历史文章&#xff08;文章累计500&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…

three.js开发3D地图记录(一)

关键代码部分&#xff1a; <template><div class"center-map-box" id"contant"></div> </template><script> import * as THREE from "three"; import { OrbitControls } from "three/examples/jsm/control…

springboot小型超市商品展销系统-计算机毕业设计源码01635

摘 要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实运用中&#xff0c;应用软件的工作…

EIQ-ABC 分析法在配送中心储位分配中的应用

配送中心运作效率的高低主要取决于仓储业务流程的作业效率&#xff0c;在配送作业流程中&#xff0c;储位分配的是否合理性成为影响配送运作效率的重要因素。为实现储位的合理分配&#xff0c;提出通过对订单信息的分析&#xff0c;并应用 EIQ-ABC 分析法&#xff0c;以此实现缩…

白酒:茅台镇白酒的品牌合作与跨界营销案例

云仓酒庄豪迈白酒&#xff0c;作为茅台镇的知名品牌&#xff0c;在品牌合作与跨界营销方面也有着杰出的表现。通过与不同领域品牌的合作&#xff0c;豪迈白酒进一步拓宽了市场渠道&#xff0c;提升了品牌曝光度和影响力。 首先&#xff0c;云仓酒庄豪迈白酒与品质餐产品牌的合作…

量子革命 “不负众望“!即将见证首个商业量子应用案例?

内容来源&#xff1a;量子前哨&#xff08;ID&#xff1a;Qforepost&#xff09; 文丨沛贤/浪味仙 排版丨沛贤 深度好文&#xff1a;1000字丨5分钟阅读 摘要&#xff1a;雀巢、联合利华和德国能源巨头 E.ON 表示&#xff0c;距离首个商用量子案例出现可能只需三年时间。 近…

tracetcp下载 安装 使用 网络工具 windows trace工具 tcp协议

省流 Tracetcp是一个类似于Tracert的工具&#xff0c;使用如下&#xff1a; 1. 安装winpcap &#xff0c; 下载链接&#xff1a;WinPcap Download 2.下载tracetcp软件&#xff0c;下载链接&#xff1a; https://github.com/0xcafed00d/tracetcp/releases 命令&#xff1a;…

期末考试老师怎样发成绩

期末成绩的公布&#xff0c;总是让老师感到焦虑。成绩&#xff0c;这一张张的数字&#xff0c;承载着学生一学期的努力&#xff0c;也牵动着家长们的心。 传统的成绩公布方式&#xff0c;写成绩条让学生带回家&#xff0c;或是通过私发家长的方式&#xff0c;都存在一定的弊端。…

六、Nginx-正向代理和反向代理

目录 一、正向代理 1、参数详解 2、常用变量详解 3、配置示例 二、反向代理 三、 Nginx的安全控制 1、如何使用SSL对流量进行加密 2、nginx添加SSL的支持 3、 Nginx的SSL相关指令 &#xff08;1&#xff09;ssl &#xff08;2&#xff09;ssl_certificate &#xff0…

【Kaggle量化比赛】Top讨论

问: 惊人的单模型得分,请问您使用了多少个特征来获得如此高的得分?我也在使用LGB模型。 答 235个特征(180个基本特征+滚动特征) 问: 您是在使用Polars进行特征工程还是仅依赖于Pandas+Numba/多进程?即使进行了Numba优化,我也发现当滚动特征过多时,推理速度会非常慢。在…