Window下启动支持
下载或复制老版本的放在bin目录下即可;
flink.bat
@echo off
setlocalSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\pluginsSET JVM_ARGS=-Xmx512mSET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %*endlocal
start-cluster.bat
@echo off
setlocal EnableDelayedExpansionSET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\logSET JVM_ARGS=-Xms1024m -Xmx1024mSET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*SET logname_jm=flink-%username%-jobmanager.log
SET logname_tm=flink-%username%-taskmanager.log
SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
SET outname_jm=flink-%username%-jobmanager.out
SET outname_tm=flink-%username%-taskmanager.out
SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
SET out_tm=%FLINK_LOG_DIR%\%outname_tm%SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties":: Log rotation (quick and dirty)
CD "%FLINK_LOG_DIR%"
for /l %%x in (5, -1, 1) do (
SET /A y = %%x+1
RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
RENAME "%outname_jm%.%%x" "%outname_jm%.!y!" 2> nul
RENAME "%outname_tm%.%%x" "%outname_tm%.!y!" 2> nul
)
RENAME "%logname_jm%" "%logname_jm%.0" 2> nul
RENAME "%logname_tm%" "%logname_tm%.0" 2> nul
RENAME "%outname_jm%" "%outname_jm%.0" 2> nul
RENAME "%outname_tm%" "%outname_tm%.0" 2> nul
DEL "%logname_jm%.6" 2> nul
DEL "%logname_tm%.6" 2> nul
DEL "%outname_jm%.6" 2> nul
DEL "%outname_tm%.6" 2> nulfor %%X in (java.exe) do (set FOUND=%%~$PATH:X)
if not defined FOUND (echo java.exe was not found in PATH variablegoto :eof
)echo Starting a local cluster with one JobManager process and one TaskManager process.echo You can terminate the processes via CTRL-C in the spawned shell windows.echo Web interface by default on http://localhost:8081/.start java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
start java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1endlocal
Flink实战(Java/MongoDB/Mysql)
<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version><flink-version>1.14.6</flink-version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>${flink-version}</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><optional>true</optional></dependency><!-- mongodb --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId><version>1.1.0-1.18</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.50</version></dependency><!-- Spring Boot Starter MyBatis --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.6</version></dependency><!-- MySQL Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency></dependencies>
Flink mongodb
Flink CDC在MongoDB的副本集(replica set)上使用了$changeStream操作,而该操作仅支持副本集。如果你的MongoDB是单个实例(standalone),则不支持$changeStream操作。
MongoDB开启副本集(Replica Set),否则代码运行会报错。
Flink报错记录
[20210910 10:16:40.107] [DEBUG] [main] [StreamGraph.java:391][addOperator] Vertex: 2
java.lang.IllegalStateException: No ExecutorFactory found to execute the application.at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1947)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)
添加
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink-version}</version></dependency>
[20210910 10:20:02.172] [INFO] [main] [RestServerEndpoint.java:139][start] Starting rest endpoint.
[20210910 10:20:02.203] [DEBUG] [main] [DispatcherRestEndpoint.java:124][initializeWebSubmissionHandlers] Failed to load web based job submission extension.
org.apache.flink.util.FlinkException: The module flink-runtime-web could not be found in the class path. Please add this jar in order to enable web based job submission.at org.apache.flink.runtime.webmonitor.WebMonitorUtils.loadWebSubmissionExtension(WebMonitorUtils.java:197)at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeWebSubmissionHandlers(DispatcherRestEndpoint.java:112)at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.initializeHandlers(WebMonitorEndpoint.java:268)at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeHandlers(DispatcherRestEndpoint.java:89)at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:144)at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:463)at org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:422)at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:366)at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:75)at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)
[20210910 10:20:02.535] [DEBUG] [main] [InternalLoggerFactory.java:45][newDefaultFactory] Using SLF4J as the default logging framework
[20210910 10:20:02.536] [DEBUG] [main] [InternalThreadLocalMap.java:56][<clinit>] -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
添加
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink-version}</version>
</dependency>