java消息顺序执行_Apache Flink:如何并行执行但保持消息顺序?

请在下面找到使用侧输出和插槽组进行本地扩展的示例 .

package org.example

/*

* Licensed to the Apache Software Foundation (ASF) under one

* or more contributor license agreements. See the NOTICE file

* distributed with this work for additional information

* regarding copyright ownership. The ASF licenses this file

* to you under the Apache License, Version 2.0 (the

* "License"); you may not use this file except in compliance

* with the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

import org.apache.flink.streaming.api.functions.ProcessFunction

import org.apache.flink.streaming.api.scala._

import org.apache.flink.util.Collector

/**

* This example shows an implementation of WordCount with data from a text socket.

* To run the example make sure that the service providing the text data is already up and running.

*

* To start an example socket text stream on your local machine run netcat from a command line,

* where the parameter specifies the port number:

*

* {{{

* nc -lk 9999

* }}}

*

* Usage:

* {{{

* SocketTextStreamWordCount

* }}}

*

* This example shows how to:

*

* - use StreamExecutionEnvironment.socketTextStream

* - write a simple Flink Streaming program in scala.

* - write and use user-defined functions.

*/

object SocketTextStreamWordCount {

def main(args: Array[String]) {

if (args.length != 2) {

System.err.println("USAGE:\nSocketTextStreamWordCount ")

return

}

val hostName = args(0)

val port = args(1).toInt

val outputTag1 = OutputTag[String]("side-1")

val outputTag2 = OutputTag[String]("side-2")

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.getConfig.enableObjectReuse()

//Create streams for names and ages by mapping the inputs to the corresponding objects

val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement")

val counts = text.flatMap {

_.toLowerCase.split("\\W+") filter {

_.nonEmpty

}

}

.process(new ProcessFunction[String, String] {

override def processElement(

value: String,

ctx: ProcessFunction[String, String]#Context,

out: Collector[String]): Unit = {

if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))

else ctx.output(outputTag2, String.valueOf(value))

}

})

val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)

val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)

val output1 = sideOutputStream1.map {

(_, 1)

}.slotSharingGroup("map1")

.keyBy(0)

.sum(1)

val output2 = sideOutputStream2.map {

(_, 1)

}.slotSharingGroup("map2")

.keyBy(0)

.sum(1)

output1.print()

output2.print()

env.execute("Scala SocketTextStreamWordCount Example")

}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/260264.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python的字符串定界符可以使用_使用Template格式化Python字符串的方法

对Python字符串&#xff0c;除了比较老旧的%&#xff0c;以及用来替换掉%的format&#xff0c;及在python 3.6中加入的f这三种格式化方法以外&#xff0c;还有可以使用Template对象来进行格式化。from string import Template&#xff0c;可以导入Template类。实例化Template类…

【ES实战】ES6.7的tar包离线安装帮助手册

Elasticsearch6.7部署帮助手册 校验时间&#xff1a;2023年12月19日 文章目录 Elasticsearch6.7部署帮助手册安装前准备安装包安装要求锁定内存,修改最大文件描述符,最大线程数内核参数 部署规划端口规划用户规划目录规划 安装步骤每个服务器配置JDK配置文件master角色node角色…

jenkins 部署文档

Jenkins是一个非常出色的持续集成服务器&#xff0c;本文主要介绍在CentOS系统中Jenkins的基本安装配置方法&#xff0c;供参考。一. 软件包&#xff1a;1. 下载apache-maven-2.2.1-bin.tarhttp://www.apache.org/dyn/closer.cgi/maven/binaries/apache-maven-2.2.1-bin.tar.gz…

牛人,多看看他们写的东西

计算机大师 Donald E. Knuth&#xff08;高德纳&#xff09; 算法大师&#xff0c;我最崇拜的计算机科学家&#xff0c;没有之一&#xff01;不认识高爷爷的人别说自己是学计算机的。《The Art of Computer Programming》绝对是计算机科学的圣经。对高爷爷的崇敬&#xff0c;对…

System.Math.Min(System.Threading.Interlocked.Increment(i), i - 1)

System.Math.Min(System.Threading.Interlocked.Increment(i), i - 1) 在vb里面 等价于ii-1 在C#里面 等价于i-- 是有C#自动转VB时转换的转载于:https://www.cnblogs.com/YaDi/archive/2012/11/08/2759802.html

java快速查找中位数_用QuickSort快速查找中位数(median)

中位数(median)是一个排好序的元素中中间位置的元素&#xff0c;如果元素个数为偶数&#xff0c;则是中间两个元素的平均值。例如(3,1,5)的中位数是3&#xff0c;而(2,1,3,5)的中位数是2.5。查找中位数属于SelectionAlgorithms的一种。用快速排序可以做到每次divide之后&#x…

python安装mysql数据库_windows10安装mysql-8.0.13(zip安装)~Python安装mysql

windows10安装mysql-8.0.13(zip安装)安装环境说明系统版本&#xff1a;windows10mysql版本&#xff1a;mysql-8.0.13-winx64.zip下载地址&#xff1a;http://mirrors.163.com/mysql/Downloads/MySQL-8.0/mysql-8.0.13-winx64.zip解压安装包解压路径&#xff1a;D:\develop\soft…

centos 下使用sublime

CentOS 之 Sublime text3 安装及配置&#xff08;不支持中文输入&#xff09; sublime text 的界面友好&#xff0c;自动补全功能也不错。 &#xff08;本来用vimphp_function.txt的形式进行补全的&#xff0c;但是配置后的补全不太满意&#xff0c;放弃了。 具体参见&#xff…

20121108团队博客(苏若)

PS&#xff1a;这本是属于昨晚的帖子&#xff0c;对不住忠仔。现在补上。 忠仔&#xff0c;终于交给了我一个实实在在的任务&#xff0c;很是欣喜&#xff0c;也很是忐忑&#xff0c;生怕自己不能及时完成任务。 好了&#xff0c;废话不多说&#xff0c;步入正题。 接下任务【画…

python 倒排索引 性能_python 实现倒排索引的方法

代码如下&#xff1a;#encoding:utf-8fin open(1.txt, r)建立正向索引:“文档1”的ID > 单词1&#xff1a;出现位置列表&#xff1b;单词2&#xff1a;出现位置列表&#xff1b;…………“文档2”的ID > 此文档出现的关键词列表。forward_index {}for line in fin:line…

pythonnet下载_Python for .NET

Python for .NET 是一个可以让 Python 程序员近乎无缝的集成 .NET 通用语言环境 CLR 和以及为 .NET 开发者提供一个强大的应用脚本工具。通过这个项目你可在 .NET 中完全使用 Python 来编写整个应用&#xff0c;使用 .NET 服务和组件。这个包并没有用 CLR 语言实现一个 Python&…

webService详解

什么是webService WebService&#xff0c;顾名思义就是基于Web的服务。它使用Web(HTTP)方式&#xff0c;接收和响应外部系统的某种请求。从而实现远程调用. 1:从WebService的工作模式上理解的话&#xff0c;它跟普通的Web程序&#xff08;比如ASP、JSP等&#xff09;并没有本…

读《有人负责,才有质量:写给在集市中迷失的一代》总结与感想

在大伙都在吹捧“市集”开发软件的方式的大浪潮下&#xff0c;作者看到了其中的不当之处&#xff0c;发现其中有许多的问题&#xff0c;因此写下这篇文章给予吹捧“市集”的人一个提醒&#xff0c;甚至警告。 在该文章里&#xff0c;作者认为“市集”里的“农民”不可能建造出和…

php 判断是否文件,利用PHP判断文件是否为图片的方法总结

前言在网页设计中&#xff0c;如果需要图片&#xff0c;我们通常拿到的是一个图片的文件名。仅仅通过文件名是无法判断该文件是否是一个图片文件的。或许有的人以为通过后缀名就可以判断&#xff0c;别忘了文件的后缀名是可以随便改动的。更何况&#xff0c;在 Linux 系统下是不…

textedit怎么插入数据_还在手动插入Excel交叉空白行?这个小技巧10秒搞定

导读&#xff1a;前几天有同学在后台提问&#xff0c;怎么快速在Excel中隔行插入一行或者多行空白行&#xff0c;其实在早期我们分享的小视频中有利用过类似的小技巧来制作工资条&#xff0c;今天我们用它来插入空白行。文/ 芒种学院指北针Hello&#xff0c;大家好&#xff0c;…

python制作安装包(setup.py)

1.制作setup.py from distutils.core import setupsetup(nameMyblog,version1.0,descriptionMy Blog Distribution Utilities,authorlujianxing,author_emaillujianxinglujianxing.com,urlhttp://blog.lujianxing.com,py_modules[foo] ) py_modules 定义 需要打包的模块名 2.创…

[Ruby]$: 是什么意思?

ruby comes with a set of predefined variables$: default search path (array of paths)其他Ruby特殊变量&#xff1a; $! 最近一次的错误信息 $ 错误产生的位置 $_ gets最近读的字符串 $. 解释器最近读的行数(line number) $& 最近一次与正则表达式匹配的字符串 $~ 作为…

rocketmq 启动_016【windows版Rocketmq】小白学习Rocketmq单机部署

以前都是听说MQ&#xff0c;或者在别人搭建好的基础上去使用&#xff0c;没有自己动手搭建过&#xff0c;就没有更深入去理解。现在机会来啦.啦啦.啦啦啦......引用自己的CSDN文章href"https://blog.csdn.net/chenzhong2010/article/details/106699590或点击左下角“阅读原…

WPF WebBrowser 加载 html ,出现安全警告, 运行 脚本和 activeX 控件,

对于你的问题&#xff0c;只需要在你的HTML首行添加如下代码即可隐藏安全提示条&#xff1a; <!-- saved from url(0014)about:internet --> 还有一个可选方案是使用Winform的WebBrowser控件&#xff0c;不需要更改HTML代码&#xff0c;也不会出现安全提示&#xff0c;需…

资料下载资源网站

脚本之家&#xff1a;www.jb51.net 转载于:https://www.cnblogs.com/dreammyle/p/3850250.html