Flink协调器Coordinator及自定义Operator

Flink协调器Coordinator及自定义Operator

最近的项目开发过程中,使用到了Flink中的协调器以及自定义算子相关的内容,本篇文章主要介绍Flink中的协调器是什么,如何用,以及协调器与算子间的交互。

协调器Coordinator

Flink中的协调器是用来协调运行时的算子,运行在JobManager中,通过事件的方式与算子通信。例如Source和Sink算子中的协调器是用来发现和分配工作或者聚合和提交元数据。

线程模型

所有协调器方法都由作业管理器的主线程(邮箱线程)调用。这意味着这些方法在任何情况下都不得执行阻塞操作(如 I/ O 或等待锁或或Futures)。这很有可能使整个 JobManager 瘫痪。
因此,涉及更复杂操作的协调器应生成线程来处理 I/ O 工作。上 OperatorCoordinator. Context 的方法可以安全地从另一个线程调用,而不是从调用协调器方法的线程调用。

一致性

与调度程序的视图相比,协调器对任务执行的视图高度简化,但允许与在并行子任务上运行的操作员进行一致的交互。具体而言,保证严格按顺序调用以下方法:

  1. executionAttemptReady(int, int, OperatorCoordinator.SubtaskGateway):在子任务就绪的时候调用一次。SubtaskGateway是用来与子任务交互的网关。这是与子任务尝试交互的开始。
    executionAttemptFailed(int, int, Throwable):在尝试失败或取消后立即调用每个子任务。此时,应停止与子任务尝试的交互。
  2. subtaskReset(int, long) 或 resetToCheckpoint(long, byte[]):一旦调度程序确定了要还原的检查点,这些方法就会通知协调器。前一种方法在发生区域故障/ 恢复(可能影响子任务的子集)时调用,后一种方法在全局故障/ 恢复的情况下调用。此方法应用于确定要恢复的操作,因为它会告诉要回退到哪个检查点。协调器实现需要恢复自还原的检查点以来与相关任务的交互。只有在子任务的所有尝试被调用后 executionAttemptFailed(int, int, Throwable) ,才会调用它。
  3. executionAttemptReady(int, int, OperatorCoordinator. SubtaskGateway):在恢复的任务(新尝试)准备就绪后再次调用。这晚于 subtaskReset(int, long),因为在这些方法之间,会计划和部署新的尝试。

接口方法说明

实现自定义的协调器需要实现OperatorCoordinator接口方法,各方法说明如下所示:

public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {// ------------------------------------------------------------------------/*** 启动协调器,启动时调用一次当前方法在所有方法之前* 此方法抛出的异常都会导致当前作业失败*/void start() throws Exception;/*** 释放协调器时调用当前方法,此方法应当释放持有的资源* 此方法抛出的异常不会导致作业失败*/@Overridevoid close() throws Exception;// ------------------------------------------------------------------------/*** 处理来自并行算子实例的事件* 此方法抛出的异常会导致作业失败并恢复*/void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)throws Exception;// ------------------------------------------------------------------------/*** 为协调器做checkpoint,将当前协调器中的状态序列化到checkpoint中,执行成功需要调用CompletableFuture的complete方法,失败需要调用CompletableFuture的completeExceptionally方法*/void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture)throws Exception;/*** We override the method here to remove the checked exception. Please check the Java docs of* {@link CheckpointListener#notifyCheckpointComplete(long)} for more detail semantic of the* method.*/@Overridevoid notifyCheckpointComplete(long checkpointId);/*** We override the method here to remove the checked exception. Please check the Java docs of* {@link CheckpointListener#notifyCheckpointAborted(long)} for more detail semantic of the* method.*/@Overridedefault void notifyCheckpointAborted(long checkpointId) {}/*** 从checkpoint重置当前的协调器*/void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception;// ------------------------------------------------------------------------/*** 子任务重置时调用此方法*/void subtaskReset(int subtask, long checkpointId);/*** 子任务失败时调用此方法 */void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason);/*** 子任务就绪时调用此方法*/void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway);
}

算子Operator

Flink中执行计算任务的算子,像使用DataStream API时调用的map、flatmap、process传入的自定义函数最终都会封装为一个一个的算子。使用UDF已经能够满足大多数的开发场景,但涉及到与协调器打交道时需要自定义算子,自定义算子相对比较好简单,具体可以参考org.apache.flink.streaming.api.operators.KeyedProcessOperator的实现。

自定义算子需要实现AbstractStreamOperator和OneInputStreamOperator接口方法

实现定时器功能,需要实现Triggerable接口方法

实现处理协调器的事件功能,需要实现OperatorEventHandler接口方法

示例

自定义算子

这里实现一个自定义的算子,用来处理KeyedStream的数据,它能够接受来自协调器的事件,并且能够给协调器发送事件。

MyKeyedProcessOperator实现代码如下:

package com.examples.operator;import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 自定义的KeyedProcessOperator* @author shirukai*/
public class MyKeyedProcessOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT>implements OneInputStreamOperator<IN, OUT>,Triggerable<KEY, VoidNamespace>,OperatorEventHandler {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessOperator.class);private transient TimestampedCollector<OUT> collector;private transient TimerService timerService;private final OperatorEventGateway operatorEventGateway;public MyKeyedProcessOperator(ProcessingTimeService processingTimeService, OperatorEventGateway operatorEventGateway) {this.processingTimeService = processingTimeService;this.operatorEventGateway = operatorEventGateway;}@Overridepublic void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);InternalTimerService<VoidNamespace> internalTimerService =getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);timerService = new SimpleTimerService(internalTimerService);}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {LOG.info("processElement: {}", element);collector.setTimestamp(element);// 注册事件时间定时器timerService.registerEventTimeTimer(element.getTimestamp() + 10);// 注册处理时间定时器timerService.registerProcessingTimeTimer(element.getTimestamp() + 100);// 给协调器发送消息operatorEventGateway.sendEventToCoordinator(new MyEvent("hello,I'm from operator"));// 不做任何处理直接发送到下游collector.collect((OUT) element.getValue());}@Overridepublic void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {LOG.info("onEventTime: {}", timer);}@Overridepublic void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {LOG.info("onProcessingTime: {}", timer);}@Overridepublic void handleOperatorEvent(OperatorEvent evt) {LOG.info("handleOperatorEvent: {}", evt);}
}

算子工厂类MyKeyedProcessOperatorFactory:

package com.examples.operator;import com.examples.coordinator.MyKeyedProcessCoordinatorProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.operators.*;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceAware;/*** 自定义算子工厂类* @author shirukai*/
public class MyKeyedProcessOperatorFactory<IN> extends AbstractStreamOperatorFactory<IN>implements OneInputStreamOperatorFactory<IN, IN>,CoordinatedOperatorFactory<IN>,ProcessingTimeServiceAware {@Overridepublic OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {return new MyKeyedProcessCoordinatorProvider(operatorName, operatorID);}@Overridepublic <T extends StreamOperator<IN>> T createStreamOperator(StreamOperatorParameters<IN> parameters) {final OperatorID operatorId = parameters.getStreamConfig().getOperatorID();final OperatorEventGateway gateway =parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);try {final MyKeyedProcessOperator<?, IN, IN> operator = new MyKeyedProcessOperator<>(processingTimeService, gateway);operator.setup(parameters.getContainingTask(),parameters.getStreamConfig(),parameters.getOutput());parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, operator);return (T) operator;} catch (Exception e) {throw new IllegalStateException("Cannot create operator for "+ parameters.getStreamConfig().getOperatorName(),e);}}@Overridepublic Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {return MyKeyedProcessOperator.class;}
}

自定义协调器

协调器执行器线程工厂类CoordinatorExecutorThreadFactory,当前类可以通用,用来创建协调器线程。

package com.examples.coordinator;import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FatalExitExceptionHandler;import javax.annotation.Nullable;
import java.util.concurrent.ThreadFactory;/*** A thread factory class that provides some helper methods.*/
public class CoordinatorExecutorThreadFactoryimplements ThreadFactory, Thread.UncaughtExceptionHandler {private final String coordinatorThreadName;private final ClassLoader classLoader;private final Thread.UncaughtExceptionHandler errorHandler;@Nullableprivate Thread thread;// TODO discuss if we should fail the job(JM may restart the job later) or directly kill JM// process// Currently we choose to directly kill JM processCoordinatorExecutorThreadFactory(final String coordinatorThreadName, final ClassLoader contextClassLoader) {this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);}@VisibleForTestingCoordinatorExecutorThreadFactory(final String coordinatorThreadName,final ClassLoader contextClassLoader,final Thread.UncaughtExceptionHandler errorHandler) {this.coordinatorThreadName = coordinatorThreadName;this.classLoader = contextClassLoader;this.errorHandler = errorHandler;}@Overridepublic synchronized Thread newThread(Runnable r) {thread = new Thread(r, coordinatorThreadName);thread.setContextClassLoader(classLoader);thread.setUncaughtExceptionHandler(this);return thread;}@Overridepublic synchronized void uncaughtException(Thread t, Throwable e) {errorHandler.uncaughtException(t, e);}public String getCoordinatorThreadName() {return coordinatorThreadName;}boolean isCurrentThreadCoordinatorThread() {return Thread.currentThread() == thread;}
}

协调器上下文CoordinatorContext,当前类可以通用。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.examples.coordinator;import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;/*** A context class for the {@link OperatorCoordinator}.** <p>The context serves a few purposes:** <ul>*   <li>Thread model enforcement - The context ensures that all the manipulations to the*       coordinator state are handled by the same thread.* </ul>*/
@Internal
public class CoordinatorContext implements AutoCloseable {private static final Logger LOG =LoggerFactory.getLogger(CoordinatorContext.class);private final ScheduledExecutorService coordinatorExecutor;private final ScheduledExecutorService workerExecutor;private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;private final OperatorCoordinator.Context operatorCoordinatorContext;private final Map<Integer, OperatorCoordinator.SubtaskGateway> subtaskGateways;public CoordinatorContext(CoordinatorExecutorThreadFactory coordinatorThreadFactory,OperatorCoordinator.Context operatorCoordinatorContext) {this(Executors.newScheduledThreadPool(1, coordinatorThreadFactory),Executors.newScheduledThreadPool(1,new ExecutorThreadFactory(coordinatorThreadFactory.getCoordinatorThreadName() + "-worker")),coordinatorThreadFactory,operatorCoordinatorContext);}public CoordinatorContext(ScheduledExecutorService coordinatorExecutor,ScheduledExecutorService workerExecutor,CoordinatorExecutorThreadFactory coordinatorThreadFactory,OperatorCoordinator.Context operatorCoordinatorContext) {this.coordinatorExecutor = coordinatorExecutor;this.workerExecutor = workerExecutor;this.coordinatorThreadFactory = coordinatorThreadFactory;this.operatorCoordinatorContext = operatorCoordinatorContext;this.subtaskGateways = new HashMap<>(operatorCoordinatorContext.currentParallelism());}@Overridepublic void close() throws InterruptedException {// Close quietly so the closing sequence will be executed completely.ComponentClosingUtils.shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));ComponentClosingUtils.shutdownExecutorForcefully(coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));}public void runInCoordinatorThread(Runnable runnable) {coordinatorExecutor.execute(runnable);}// --------- Package private methods for the DynamicCepOperatorCoordinator ------------ClassLoader getUserCodeClassloader() {return this.operatorCoordinatorContext.getUserCodeClassloader();}void subtaskReady(OperatorCoordinator.SubtaskGateway gateway) {final int subtask = gateway.getSubtask();if (subtaskGateways.get(subtask) == null) {subtaskGateways.put(subtask, gateway);} else {throw new IllegalStateException("Already have a subtask gateway for " + subtask);}}void subtaskNotReady(int subtaskIndex) {subtaskGateways.put(subtaskIndex, null);}Set<Integer> getSubtasks() {return subtaskGateways.keySet();}public void sendEventToOperator(int subtaskId, OperatorEvent event) {callInCoordinatorThread(() -> {final OperatorCoordinator.SubtaskGateway gateway =subtaskGateways.get(subtaskId);if (gateway == null) {LOG.warn(String.format("Subtask %d is not ready yet to receive events.",subtaskId));} else {gateway.sendEvent(event);}return null;},String.format("Failed to send event %s to subtask %d", event, subtaskId));}/*** Fail the job with the given cause.** @param cause the cause of the job failure.*/void failJob(Throwable cause) {operatorCoordinatorContext.failJob(cause);}// ---------------- private helper methods -----------------/*** A helper method that delegates the callable to the coordinator thread if the current thread* is not the coordinator thread, otherwise call the callable right away.** @param callable the callable to delegate.*/private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage) {// Ensure the split assignment is done by the coordinator executor.if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()&& !coordinatorExecutor.isShutdown()) {try {final Callable<V> guardedCallable =() -> {try {return callable.call();} catch (Throwable t) {LOG.error("Uncaught Exception in Coordinator Executor",t);ExceptionUtils.rethrowException(t);return null;}};return coordinatorExecutor.submit(guardedCallable).get();} catch (InterruptedException | ExecutionException e) {throw new FlinkRuntimeException(errorMessage, e);}}try {return callable.call();} catch (Throwable t) {LOG.error("Uncaught Exception in Source Coordinator Executor", t);throw new FlinkRuntimeException(errorMessage, t);}}
}

自定义协调器

package com.examples.coordinator;import com.examples.event.MyEvent;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;/*** 自定义协调器* 需要实现 OperatorCoordinator 接口* @author shirukai*/
public class MyKeyedProcessCoordinator implements OperatorCoordinator {private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessCoordinator.class);/*** The name of the operator this RuleDistributorCoordinator is associated with.*/private final String operatorName;private final CoordinatorContext context;private boolean started;public MyKeyedProcessCoordinator(String operatorName, CoordinatorContext context) {this.operatorName = operatorName;this.context = context;}@Overridepublic void start() throws Exception {LOG.info("Starting Coordinator for {}: {}.",this.getClass().getSimpleName(),operatorName);// we mark this as started first, so that we can later distinguish the cases where 'start()'// wasn't called and where 'start()' failed.started = true;runInEventLoop(() -> {LOG.info("Coordinator started.");},"do something for coordinator.");}@Overridepublic void close() throws Exception {}@Overridepublic void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception {LOG.info("Received event {} from operator {}.", event, subtask);}@Overridepublic void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception {}@Overridepublic void notifyCheckpointComplete(long checkpointId) {}@Overridepublic void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {}@Overridepublic void subtaskReset(int subtask, long checkpointId) {LOG.info("Recovering subtask {} to checkpoint {} for operator {} to checkpoint.",subtask,checkpointId,operatorName);runInEventLoop(() -> {},"making event gateway to subtask %d available",subtask);}@Overridepublic void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {runInEventLoop(() -> {LOG.info("Removing itself after failure for subtask {} of operator {}.",subtask,operatorName);context.subtaskNotReady(subtask);},"handling subtask %d failure",subtask);}@Overridepublic void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {assert subtask == gateway.getSubtask();LOG.debug("Subtask {} of operator {} is ready.", subtask, operatorName);runInEventLoop(() -> {context.subtaskReady(gateway);sendEventToOperator(new MyEvent("hello,I'm from coordinator"));},"making event gateway to subtask %d available",subtask);}private void sendEventToOperator(OperatorEvent event) {for (Integer subtask : context.getSubtasks()) {try {context.sendEventToOperator(subtask, event);} catch (Exception e) {LOG.error("Failed to send OperatorEvent to operator {}",operatorName,e);context.failJob(e);return;}}}private void runInEventLoop(final ThrowingRunnable<Throwable> action,final String actionName,final Object... actionNameFormatParameters) {ensureStarted();context.runInCoordinatorThread(() -> {try {action.run();} catch (Throwable t) {// If we have a JVM critical error, promote it immediately, there is a good// chance the logging or job failing will not succeed any moreExceptionUtils.rethrowIfFatalErrorOrOOM(t);final String actionString =String.format(actionName, actionNameFormatParameters);LOG.error("Uncaught exception in the coordinator for {} while {}. Triggering job failover.",operatorName,actionString,t);context.failJob(t);}});}private void ensureStarted() {if (!started) {throw new IllegalStateException("The coordinator has not started yet.");}}
}

自定义协调器提供器

package com.examples.coordinator;import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;/*** 自定义协调器的提供者** @author shirukai*/
public class MyKeyedProcessCoordinatorProvider extends RecreateOnResetOperatorCoordinator.Provider {private static final long serialVersionUID = 1L;private final String operatorName;public MyKeyedProcessCoordinatorProvider(String operatorName, OperatorID operatorID) {super(operatorID);this.operatorName = operatorName;}@Overrideprotected OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception {final String coordinatorThreadName = " MyKeyedProcessCoordinator-" + operatorName;CoordinatorExecutorThreadFactory coordinatorThreadFactory =new CoordinatorExecutorThreadFactory(coordinatorThreadName, context.getUserCodeClassloader());CoordinatorContext coordinatorContext =new CoordinatorContext(coordinatorThreadFactory, context);return new MyKeyedProcessCoordinator(operatorName, coordinatorContext);}
}

执行测试

package com.examples;import com.examples.operator.MyKeyedProcessOperatorFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author shirukai*/
public class CoordinatorExamples {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<MyData> source = env.fromElements(new MyData(1, 1.0), new MyData(2, 2.0), new MyData(1, 3.0));MyKeyedProcessOperatorFactory<MyData> operatorFactory = new MyKeyedProcessOperatorFactory<>();source.keyBy((KeySelector<MyData, Integer>) MyData::getId).transform("MyKeyedProcess", TypeInformation.of(MyData.class), operatorFactory).print();env.execute();}public static class MyData {private Integer id;private Double value;public MyData(Integer id, Double value) {this.id = id;this.value = value;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public Double getValue() {return value;}public void setValue(Double value) {this.value = value;}@Overridepublic String toString() {return "MyData{" +"id=" + id +", value=" + value +'}';}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/23459.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LlamaIndex 一 简单文档查询

前言 在学习LangChain的时候&#xff0c;我接触到了LlamaIndex。它犹如我在开发vue时用到的axios&#xff0c;主要负责数据打理。别问我为什么打这个比方&#xff0c;前端老狗&#xff0c;重走AI路&#xff0c;闭关一年能否学的妥当&#xff1f; LlamaIndex 是一个用于 LLM 应…

前端项目打包、部署的基础 (vue)

详细请看B站视频 BV19n4y1d7Gr 《禹神&#xff1a;前端项目部署指南&#xff0c;前端项目打包上线》&#xff0c;本博客为自用视频笔记。 目录 项目打包vue打包打包前分析项目请求 本地服务器部署问题 & 解决问题1&#xff1a;刷新页面404问题问题2&#xff1a;ajax请求废…

【人工智能】第六部分:ChatGPT的进一步发展和研究方向

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

秀肌肉-海外短剧系统的案例展示

多语种可以选择&#xff0c;分销功能&#xff0c;多种海外支付方式&#xff0c;多种登录模式可供选择&#xff0c;总之你想到的我们都做了&#xff0c;你没想到的我们也都做了

企微自动化机器人的应用与前景

一、引言 随着信息技术的飞速发展&#xff0c;企业对于提高内部运营效率、降低人力成本的需求日益迫切。在这样的背景下&#xff0c;企微自动化机器人应运而生&#xff0c;以其高效、便捷的特点&#xff0c;迅速成为企业内部的得力助手。本文将深入探讨企微自动化机器人的应用现…

头歌易-算式运算的合法性

给定一个算式运算,算式由运算数、+、-、*、/、(、)组成,请编写程序判断该算式运算是否合法。如果合法,计算该算式的值。 输入描述: 第一行输入一个运算表达式 输出描述: 如果表达式合法则计算其值,结果保留两位小数,如果不合法则输出 表达式不合法! 输入样例: (5+3)…

Partially Spoofed Audio Detection论文介绍(ICASSP 2024)

An Efficient Temporary Deepfake Location Approach Based Embeddings for Partially Spoofed Audio Detection 论文翻译名&#xff1a;一种基于部分欺骗音频检测的基于临时深度伪造位置方法的高效嵌入 摘要&#xff1a; 部分伪造音频检测是一项具有挑战性的任务&#xff0…

NSSCTF-Web题目6

目录 [NISACTF 2022]checkin 1、题目 2、知识点 3、思路 [NISACTF 2022]babyupload 1、题目 2、知识点 3、思路 [SWPUCTF 2022 新生赛]1z_unserialize 1、题目 2、知识点 3、思路 [NISACTF 2022]checkin 1、题目 2、知识点 010编辑器的使用、url编码 3、思路 打…

基于NANO 9K 开发板加载PICORV32软核,并建立交叉编译环境

目录 0. 环境准备 1. 安装交叉编译器 2. 理解makefile工作机理 3. 熟悉示例程序的代码结构&#xff0c;理解软核代码的底层驱动原理 4. 熟悉烧录环节的工作机理&#xff0c; 建立下载环境 5. 编写例子blink&#xff0c; printf等&#xff0c; 加载运行 6. 后续任务 0.…

2024年华为OD机试真题-多段线数据压缩-C++-OD统一考试(C卷D卷)

2024年OD统一考试(D卷)完整题库:华为OD机试2024年最新题库(Python、JAVA、C++合集)​ 题目描述: 下图中,每个方块代表一个像素,每个像素用其行号和列号表示。 为简化处理,多段线的走向只能是水平、竖直、斜向45度。 上图中的多段线可以用下面的坐标串表示:(2, 8), (3…

Modbus TCP转CanOpen网关携手FANUC机器人助力新能源汽车

Modbus TCP转CanOpen网关与FANUC机器手臂的现场应用可以实现FANUC机器手臂与其他设备之间的数据交换和通信。CANopen是一种常见的网络协议&#xff0c;用于处理机器和设备之间的通信&#xff0c;并广泛应用于自动化领域。而Modbus TCP是一种基于TCP/IP协议的通信协议&#xff0…

智慧互联网医院系统开发指南:从源码到在线问诊APP

近期&#xff0c;互联网医院系统的热度非常高&#xff0c;很多人跟小编提问如何开发&#xff0c;今天小编将从零开始为大家详解互联网医院系统源码&#xff0c;以及在线问诊APP开发技术。 一、需求分析与系统设计 1.1 需求分析 用户管理 预约挂号 在线问诊 电子病历 药品…

定个小目标之每天刷LeetCode热题(11)

这是道简单题&#xff0c;只想到了暴力解法&#xff0c;就是用集合存储起来&#xff0c;然后找出其中的众数&#xff0c;看了一下题解&#xff0c;发现有多种解法&#xff0c;我觉得Boyer-Moore 投票算法是最优解&#xff0c;看了官方对这个算法的解释&#xff0c;我是这样理解…

手把手教你用Spring Boot搭建AI原生应用

作者 | 文心智能体平台 导读 本文以快速开发一个 AI 原生应用为目的&#xff0c;介绍了 Spring AI 的包括对话模型、提示词模板、Function Calling、结构化输出、图片生成、向量化、向量数据库等全部核心功能&#xff0c;并介绍了检索增强生成的技术。依赖 Spring AI 提供的功能…

Go基础编程 - 04 - 基本类型、常量和变量

目录 1. 基本类型2. 常量iota 3. 变量3.1 变量声明3.2 自定义类型、类型别名3.3 类型推导3.4 类型断言3.5 类型转换 Go 基础编程 - 03 - init、main、_ 1. 基本类型 支持八进制、十六进制&#xff0c;以及科学记数法 零值&#xff1a;只做了声明&#xff0c;但未做初始化的变量…

进阶之格式化qDebug()输出

创作灵感 刚刚在看qt帮助手册时&#xff0c;无意间在<QtGlobal>中看见了这个函数void qSetMessagePattern(const QString &pattern)&#xff0c;该函数的精华在于&#xff0c;你可以直接重定义qDebug()的输出结果格式。以往打印调试内容&#xff0c;调试内容所在的行…

大数据和数据分析来优化推荐算法

当涉及到使用大数据和数据分析来优化推荐算法时&#xff0c;通常我们会结合编程语言和特定的数据分析工具来实现。以下是一个简化的流程&#xff0c;以及在该流程中可能涉及的代码和工具内容的详细介绍。 1. 数据收集与预处理 工具&#xff1a;Python, pandas, NumPy 代码示例…

00-macOS和Linux安装和管理多个Python版本

在 Mac 上安装多个 Python 版本可通过几种不同方法实现。 1 Homebrew 1.1 安装 Homebrew 若安装过&#xff0c;跳过该步。 /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" 1.2 安装 Python 如安装 Python …

统计每天某个时间范围内得 数据状态

来统计在特定条件下的记录数。具体来说&#xff0c;并且这些记录必须满足以下时间条件和存在条件&#xff1a; 时间条件&#xff1a;当前时间的小时和分钟部分在0600&#xff08;早上6点&#xff09;和0800&#xff08;早上8点&#xff09;之间。这是通过SUBSTRB和TO_CHAR函数实…

计算机毕业设计 | SSM 校园线上订餐系统 外卖购物网站(附源码)

1&#xff0c; 概述 1.1 项目背景 传统的外卖方式就是打电话预定&#xff0c;然而&#xff0c;在这种方式中&#xff0c;顾客往往通过餐厅散发的传单来获取餐厅的相关信息&#xff0c;通过电话来传达自己的订单信息&#xff0c;餐厅方面通过电话接受订单后&#xff0c;一般通…