Flink导入StarRocks

1、pom依赖

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- Apache Flink 的依赖, 这些依赖项,生产环境可以不打包到JAR文件中. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- flink-connector-starrocks --><dependency><groupId>com.starrocks</groupId><artifactId>flink-connector-starrocks</artifactId><version>1.2.5_flink-1.13_2.12</version></dependency></dependencies>

2、代码编写

public class LoadJsonRecords {public static void main(String[] args) throws Exception {// To run the example, you should prepare in the following steps// 1. create a primary key table in your StarRocks cluster. The DDL is//  CREATE DATABASE `test`;//    CREATE TABLE `test`.`score_board`//    (//        `id` int(11) NOT NULL COMMENT "",//        `name` varchar(65533) NULL DEFAULT "" COMMENT "",//        `score` int(11) NOT NULL DEFAULT "0" COMMENT ""//    )//    ENGINE=OLAP//    PRIMARY KEY(`id`)//    COMMENT "OLAP"//    DISTRIBUTED BY HASH(`id`)//    PROPERTIES(//        "replication_num" = "1"//    );//// 2. replace the connector options "jdbc-url" and "load-url" with your cluster configurationsMultipleParameterTool params = MultipleParameterTool.fromArgs(args);String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");String loadUrl = params.get("loadUrl", "be-ip:8040;be-ip:8040;be-ip:8040");//String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");//String loadUrl = params.get("loadUrl", "be-ip:8040;be-ip:8040;be-ip:8040");//String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://fe-ip:9030");//String loadUrl = params.get("loadUrl", "be-ip:8040,be-ip:8040,be-ip:8040");final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Generate json-format records. Each record has three fields correspond to// the columns `id`, `name`, and `score` in StarRocks table.String[] records = new String[]{"{\"id\":1111, \"name\":\"starrocks-json\", \"score\":100}","{\"id\":2222, \"name\":\"flink-json\", \"score\":100}",};DataStream<String> source = env.fromElements(records);// Configure the connector with the required properties, and you also need to add properties// "sink.properties.format" and "sink.properties.strip_outer_array" to tell the connector the// input records are json-format.StarRocksSinkOptions options = StarRocksSinkOptions.builder().withProperty("jdbc-url", jdbcUrl).withProperty("load-url", loadUrl).withProperty("database-name", "tmp").withProperty("table-name", "score_board").withProperty("username", "").withProperty("password", "").withProperty("sink.properties.format", "json").withProperty("sink.properties.strip_outer_array", "true").withProperty("sink.parallelism","1")//.withProperty("sink.version","V1").build();// Create the sink with the optionsSinkFunction<String> starRockSink = StarRocksSink.sink(options);source.addSink(starRockSink);env.execute("LoadJsonRecords");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/580063.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2014年第三届数学建模国际赛小美赛A题吹口哨解题全过程文档及程序

2014年第三届数学建模国际赛小美赛 A题 吹口哨 原题再现&#xff1a; 哨子是一种小装置&#xff0c;当空气被迫通过开口时会发出声音。哨声的巨大而引人注目&#xff0c;使其对警察和体育裁判来说至关重要。当救生员、迷路的露营者或犯罪受害者使用它们时&#xff0c;它们可以…

网站显示不安全警告怎么办?消除网站不安全警告超全指南

网站显示不安全警告怎么办&#xff1f;当用户访问你的网站&#xff0c;而您的网站没有部署SSL证书实现HTTPS加密时&#xff0c;网站就会显示不安全警告&#xff0c;这种警告&#xff0c;不仅有可能阻止用户继续浏览网站&#xff0c;影响网站声誉&#xff0c;还有可能影响网站在…

阿赵UE学习笔记——5、创建关卡元素

阿赵UE学习笔记目录 大家好&#xff0c;我是阿赵。   之前介绍了从空白模板创建关卡&#xff0c;接下来尝试着在这个空白的世界里面&#xff0c;创建一些内容。 一、创建地面 1、创建面片作为地面 创建——形状——平面&#xff0c;可以创建一个面片 在细节面板设置合适的…

C# 事件(Event)

C# 事件&#xff08;Event&#xff09; C# 事件&#xff08;Event&#xff09;通过事件使用委托声明事件&#xff08;Event&#xff09;实例 C# 事件&#xff08;Event&#xff09; 事件&#xff08;Event&#xff09; 基本上说是一个用户操作&#xff0c;如按键、点击、鼠标移…

分布式锁竟然这么简单?(荣耀典藏版)

大家好&#xff0c;我是小月夜枫&#xff0c;作为一个后台开发&#xff0c;不管是工作还是面试中&#xff0c;分布式一直是一个让人又爱又恨的话题。它如同一座神秘的迷宫&#xff0c;时而让你迷失方向&#xff0c;时而又为你揭示出令人惊叹的宝藏。 今天&#xff0c;让我们来…

LeetCode第2题 - 两数相加

题目 给出两个 非空 的链表用来表示两个非负的整数。其中&#xff0c;它们各自的位数是按照 逆序 的方式存储的&#xff0c;并且它们的每个节点只能存储 一位 数字。 如果&#xff0c;我们将这两个数相加起来&#xff0c;则会返回一个新的链表来表示它们的和。 您可以假设除了数…

项目零散记录

Ts托管 仅本项目禁用本地vscode内置的ts服务 提交代码前的检查 husky(哈士奇)工具&#xff08;是一个git hooks工具&#xff09; 1、安装 pnpm dlx husky-init && pnpm install安装的时候&#xff0c;出现如下报错 解决方案&#xff0c;需要先执行git init初始化…

Android10.0 人脸解锁流程分析

人脸解锁概述 人脸解锁即用户通过注视设备的正面方便地解锁手机或平板。Android 10 为支持人脸解锁的设备在人脸认证期间添加了一个新的可以安全处理相机帧、保持隐私与安全的人脸认证栈的支持&#xff0c;也为安全合规地启用集成交易的应用&#xff08;网上银行或其他服务&am…

Java Web基础详解

回顾 之前的两篇的文章已经大概的带我们了解了tomcat的一些基本的操作&#xff0c;比如从零搭建我们自己的调试环境以及官方文档构建的方式&#xff0c;接下来的话&#xff0c;我将带大家来了解一下tomcat的一些基础知识&#xff0c;这些基础知识将以问题的方式抛出&#xff0…

【SpringCloud笔记】(11)消息驱动之Stream

Stream 技术背景 底层不同模块可能使用不同的消息中间件&#xff0c;这就导致技术的切换&#xff0c;微服务的维护及开发变得麻烦起来 概述 官网&#xff1a; https://spring.io/projects/spring-cloud-stream#overview https://cloud.spring.io/spring-cloud-static/spring…

最小覆盖子串(LeetCode 76)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路参考文献 1.问题描述 给你一个字符串 s 、一个字符串 t 。返回 s 中涵盖 t 所有字符的最小子串。如果 s 中不存在涵盖 t 所有字符的子串&#xff0c;则返回空字符串 “” 。 注意&#xff1a; 对于 t 中重复字符&#xff…

如何在Vue3中实现无缝热重载:提升你的开发效率

Vue3中的热重载&#xff08;Hot Module Replacement&#xff0c;简称HMR&#xff09;是一种开发时的功能&#xff0c;它允许开发者在不刷新整个页面的情况下&#xff0c;实时替换、添加或删除模块。这意味着当你对Vue组件进行修改并保存时&#xff0c;这些更改会立即反映在浏览…

鸿蒙 - arkTs:属性动画,显式动画,组件转场动画

属性动画&#xff1a; 属性动画是通过设置组件的animation属性来给组件添加动画&#xff1b; 代码示例&#xff1a; Entry Component struct Index {State widthSize: number 250State heightSize: number 100State flag: boolean truebuild() {Column() {Button(开始动画…

AutoSAR(基础入门篇)2.2-AutoSAR架构中的Ports类型与Runnables可运行实体

Ports的类型 一、接口的类型 1、S/R接口 2、C/S接口 Runnables可运行实体

git 常用基本命令, reset 回退撤销commit,解决gitignore无效,忽略记录或未记录远程仓库的文件,删除远程仓库文件

git 基本命令 reset 撤销commit https://blog.csdn.net/a704397849/article/details/135220091 idea 中 rest 撤销commit过程如下&#xff1a; Git -> Rest Head… 在To Commit中的HEAD后面加上^&#xff0c;点击Reset即可撤回最近一次的尚未push的commit Reset Type 有三…

Flink Has Become the De-facto Standard of Streaming Compute

摘要&#xff1a;本文整理自 Apache Flink 中文社区发起人、阿里巴巴开源大数据平台负责人王峰&#xff08;莫问&#xff09;&#xff0c;在 Flink Forward Asia 2023 主会场的分享。Flink 从 2014 年诞生之后&#xff0c;已经发展了将近 10 年&#xff0c;尤其是最近这些年得到…

线下拓展运营常用的ChatGPT通用提示词模板

线下拓展策略&#xff1a;如何制定有效的线下拓展策略&#xff1f; 选址策略&#xff1a;如何选择合适的店铺位置&#xff1f; 店铺装修&#xff1a;如何设计店铺的装修风格&#xff1f; 店面陈列&#xff1a;如何规划店面的产品陈列&#xff1f; 营业时间&#xff1a;如何…

爬虫系列----Python解析Json网页并保存到本地csv

Python解析JSON 1 知识小课堂1.1 爬虫1.2 JSON1.3 Python1.4 前言技术1.4.1 range1.4.2 random1.4.3 time.sleep1.4.4 with open() as f: 2 解析过程2.1 简介2.2 打开调试工具2.3 分析网址2.3.1 网址的规律2.3.2 网址的参数 2.4 爬取第一页内容2.5 存入字典并获取2.6 循环主体数…

git 项目带分支迁移到另一个 git 仓库

1. 指定迁移 git 仓库地址 git remote add target URL 上面URL是需要迁移的git地址 2. 全部代码推送 git push target --all

7-2 设计一元二次方程求解类(高教社,《Python编程基础及应用》习题9-4)——python

设计一个类Root来计算ax2bxc0的根。该类包括&#xff1a;a、b、c共3个属性表示方程的3个系数&#xff0c;getDiscriminant()方法返回b2-4ac, getRoot1()和getRoot2()返回方程的两个根。 其中&#xff0c;getRoot1()返回的根对应&#xff1a; getRoot2()返回的根对应&#xff1a…