Java通过calcite实时读取kafka中的数据

引入maven依赖

        <dependency>

            <groupId>org.apache.calcite</groupId>

            <artifactId>calcite-kafka</artifactId>

            <version>1.28.0</version>

        </dependency>

测试代码

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.SQLException;

import java.util.Properties;

public class CalciteDemo {

    public static void main(String[] args) throws SQLException {

        String model = "inline:" +

                "{\n" +

                "  \"version\": \"1.0\",\n" +

                "  \"defaultSchema\": \"KAFKA\",\n" +

                "  \"schemas\": [\n" +

                "    {\n" +

                "    \"name\": \"KAFKA\",\n" +

                "    \"tables\": [\n" +

                "      {\n" +

                "        \"name\": \"TEST_TABLE\",\n" +

                "        \"factory\": \"org.apache.calcite.adapter.kafka.KafkaTableFactory\",\n" +

                "        \"stream\": { \"stream\": true },\n" +

                "        \"operand\": {\n" +

                "          \"bootstrap.servers\": \"192.168.x.xx:9092\",\n" +

                "          \"topic.name\": \"my-cloud-events\",\n" +

                "          \"consumer.params\": {\n" +

                "            \"group.id\": \"calcite-ut-consumer\",\n" +

                "            \"key.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\n" +

                "            \"value.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\"\n" +

                "          }\n" +

                "        }\n" +

                "      }\n" +

                "    ]\n" +

                "    }\n" +

                "  ]\n" +

                "}";

        Properties info = new Properties();

        info.put("model", model);

        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

        final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        final String sql7 = "SELECT STREAM * FROM \"KAFKA\".\"TEST_TABLE\"";

        print(calciteConnection,sql7);

        connection.close();

        calciteConnection.close();

    }

    public static void print(CalciteConnection calciteConnection, String sql7) throws SQLException {

        final PreparedStatement statement = calciteConnection.prepareStatement(sql7);

        final ResultSet resultSet = statement.executeQuery();

        ResultSetMetaData metadata = resultSet.getMetaData();

        while (resultSet.next()) {

            for (int i = 1; i <= metadata.getColumnCount(); i++) {

                System.out.print(metadata.getColumnLabel(i) + "=" + resultSet.getString(i) + ",");

            }

            System.out.println();

        }

    }

}

发送测试数据

运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61206.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RDIFramework.NET CS敏捷开发框架 V6.1发布(.NET6+、Framework双引擎、全网唯一)

RDIFramework.NET C/S敏捷开发框架V6.1版本迎来重大更新与调整&#xff0c;全面重新设计业务逻辑代码&#xff0c;代码量减少一半以上&#xff0c;开发更加高效。全系统引入全新字体图标&#xff0c;整个界面焕然一新。底层引入最易上手的ORM框架SqlSugar&#xff0c;让开发更加…

TypeORM在Node.js中的高级应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 TypeORM在Node.js中的高级应用 TypeORM在Node.js中的高级应用 TypeORM在Node.js中的高级应用 引言 TypeORM 基本概念 1. 实体&am…

11.15组会汇报

概述 不经意传输协议,也叫茫然传输协议,是一种保护隐私的两方通信协议,消息发送者持有两条待发送的消息,接收者选择一条进行接收,事后发送者对接收者获取哪一条消息毫不知情,接收者对于未选择的消息也无法获取任何信息。即1-out-of-2 OT。在OT协议中,发送方拥有全部的数据权限,…

通过华为鲲鹏认证发行上市的集成平台产品推荐

华为鲲鹏认证是技术实力与品质的权威象征&#xff0c;代表着产品达到了高标准的要求。从技术层面看&#xff0c;认证确保产品与华为鲲鹏架构深度融合&#xff0c;能充分释放鲲鹏芯片的高性能、低功耗优势&#xff0c;为集成平台的高效运行提供强大动力。在安全方面&#xff0c;…

基于ARM+FPGA的电力通信管理机IEC61850规约通信机的实现

本章通过对比传统的通信管理机方案对需要支持多RS485端口的不足之处&#xff0c; 以及在进行海量数据处理时的性能瓶颈&#xff0c;本文使用Intel全新的Cyclone V SoC FPGA芯片&#xff0c;充分发挥FPGA的高速并行运算特性以及现场可配置优势&#xff0c;并且结合 ARM处理器的…

ASP.NET Core Webapi 返回数据的三种方式

ASP.NET Core为Web API控制器方法返回类型提供了如下几个选择&#xff1a; Specific type IActionResult ActionResult<T> 1. 返回指定类型&#xff08;Specific type&#xff09; 最简单的API会返回原生的或者复杂的数据类型&#xff08;比如&#xff0c;string 或者…

ROS机器视觉入门:从基础到人脸识别与目标检测

前言 从本文开始&#xff0c;我们将开始学习ROS机器视觉处理&#xff0c;刚开始先学习一部分外围的知识&#xff0c;为后续的人脸识别、目标跟踪和YOLOV5目标检测做准备工作。我采用的笔记本是联想拯救者游戏本&#xff0c;系统采用Ubuntu20.04&#xff0c;ROS采用noetic。 颜…

主机型入侵检测系统(HIDS)——Elkeid在Centos7的保姆级安装部署教程

一、HIDS简介 主机型入侵检测系统(Host-based Intrusion Detection System 简称:HIDS);HIDS作为主机的监视器和分析器,主要是专注于主机系统内部(监视系统全部或部分的动态的行为以及整个系统的状态)。 HIDS使用传统的C/S架构,只需要在监测端安装agent即可,且使用用户…

Django启用国际化支持(2)—实现界面内切换语言:activate()

文章目录 ⭐注意⭐1. 配置项目全局设置&#xff1a;启用国际化2. 编写视图函数3. 配置路由4. 界面演示5、扩展自动识别并切换到当前语言设置语言并保存到Session设置语言并保存到 Cookie ⭐注意⭐ 以下操作依赖于 Django 项目的国际化支持。如果你不清楚如何启用国际化功能&am…

Springboot之登录模块探索(含Token,验证码,网络安全等知识)

简介 登录模块很简单&#xff0c;前端发送账号密码的表单&#xff0c;后端接收验证后即可~ 淦&#xff01;可是我想多了&#xff0c;于是有了以下几个问题&#xff08;里面还包含网络安全问题&#xff09;&#xff1a; 1.登录时的验证码 2.自动登录的实现 3.怎么维护前后端…

Vue3 虚拟列表组件库 virtual-list-vue3 的使用

Vue3 虚拟列表组件库 virtual-list-vue3 的基本使用 分享个人写的一个基于 Vue3 的虚拟列表组件库&#xff0c;欢迎各位来进行使用与给予一些更好的建议&#x1f60a; 概述&#xff1a;该组件组件库用于提供虚拟化列表能力的组件&#xff0c;用于解决展示大量数据渲染时首屏渲…

利用云计算实现高效的数据备份与恢复策略

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 利用云计算实现高效的数据备份与恢复策略 利用云计算实现高效的数据备份与恢复策略 利用云计算实现高效的数据备份与恢复策略 引…

基于 PyTorch 从零手搓一个GPT Transformer 对话大模型

一、从零手实现 GPT Transformer 模型架构 近年来&#xff0c;大模型的发展势头迅猛&#xff0c;成为了人工智能领域的研究热点。大模型以其强大的语言理解和生成能力&#xff0c;在自然语言处理、机器翻译、文本生成等多个领域取得了显著的成果。但这些都离不开其背后的核心架…

SpringCloud多机部署,负载均衡-LoadBalance

一.负载均衡 1.1问题描述 //根据应用名称获取服务列表 List<ServiceInstance> instancesdiscoveryClient.getInstances("product-service"); //一个微服务可能有多个实例&#xff0c;获取第一个 EurekaServiceInstance instance(EurekaServiceInstance)insta…

聊聊Flink:Flink中的时间语义和Watermark详解

该篇主要讲Flink中的时间语义、Flink 水印机制以及Flink对乱序数据的三重保障。 一、Flink的三种时间语义 1.1 Event Time Event Time指的是数据流中每个元素或者每个事件自带的时间属性&#xff0c;一般是事件发生的时间。由于事件从发生到进入Flink时间算子之间有很多环节&…

CSS基础选择器与div布局

基础选择器一 全局选择器 可以与任何元素匹配&#xff0c;优先级最低&#xff0c;不推荐使用 *{margin: 0;padding: 0;}元素选择器 HTML文档中的元素&#xff0c;p、b、div、a、img、body等。 标签选择器&#xff0c;选择的是页面上所有这种类型的标签&#xff0c;所以经常…

npm上传自己封装的插件(vue+vite)

一、npm账号及发包删包等命令 若没有账号&#xff0c;可在npm官网&#xff1a;https://www.npmjs.com/login 进行注册。 在当前项目根目录下打开终端命令窗口&#xff0c;常见命令如下&#xff1a; 1、登录命令&#xff1a;npm login&#xff08;不用每次都重新登录&#xff0…

ODC 如何精确呈现SQL耗时 | OceanBase 开发者工具解析

前言 在程序员或DBA的日常工作中&#xff0c;编写并执行SQL语句如同日常饮食中的一餐一饭&#xff0c;再寻常不过。然而&#xff0c;在使用命令行或黑屏客户端处理SQL时&#xff0c;常会遇到编写难、错误排查缓慢以及查询结果可读性不佳等难题&#xff0c;因此&#xff0c;图形…

华为USG5500防火墙配置NAT

实验要求&#xff1a; 1.按照拓扑图部署网络环境&#xff0c;使用USG5500防火墙&#xff0c;将防火墙接口加入相应的区域&#xff0c;添加区域访问规则使内网trust区域可以访问DMZ区域的web服务器和untrust区域的web服务器。 2.在防火墙上配置easy-ip&#xff0c;使trust区域…

三角波生成函数

% 设置时间范围和采样频率 t 0:0.01:2; % 时间从0到2秒&#xff0c;步长为0.01秒% 定义频率 f 和角频率 theta f 5; % 频率为5Hz theta 2 * pi * f * t;% 初始化输出向量 y zeros(size(t));% 根据给定的公式计算 y for k 1:fy y (-1)^(k-1)*(2 /(k * pi)) * sin(k * the…