Java通过calcite实时读取kafka中的数据

引入maven依赖

        <dependency>

            <groupId>org.apache.calcite</groupId>

            <artifactId>calcite-kafka</artifactId>

            <version>1.28.0</version>

        </dependency>

测试代码

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.SQLException;

import java.util.Properties;

public class CalciteDemo {

    public static void main(String[] args) throws SQLException {

        String model = "inline:" +

                "{\n" +

                "  \"version\": \"1.0\",\n" +

                "  \"defaultSchema\": \"KAFKA\",\n" +

                "  \"schemas\": [\n" +

                "    {\n" +

                "    \"name\": \"KAFKA\",\n" +

                "    \"tables\": [\n" +

                "      {\n" +

                "        \"name\": \"TEST_TABLE\",\n" +

                "        \"factory\": \"org.apache.calcite.adapter.kafka.KafkaTableFactory\",\n" +

                "        \"stream\": { \"stream\": true },\n" +

                "        \"operand\": {\n" +

                "          \"bootstrap.servers\": \"192.168.x.xx:9092\",\n" +

                "          \"topic.name\": \"my-cloud-events\",\n" +

                "          \"consumer.params\": {\n" +

                "            \"group.id\": \"calcite-ut-consumer\",\n" +

                "            \"key.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\n" +

                "            \"value.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\"\n" +

                "          }\n" +

                "        }\n" +

                "      }\n" +

                "    ]\n" +

                "    }\n" +

                "  ]\n" +

                "}";

        Properties info = new Properties();

        info.put("model", model);

        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

        final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        final String sql7 = "SELECT STREAM * FROM \"KAFKA\".\"TEST_TABLE\"";

        print(calciteConnection,sql7);

        connection.close();

        calciteConnection.close();

    }

    public static void print(CalciteConnection calciteConnection, String sql7) throws SQLException {

        final PreparedStatement statement = calciteConnection.prepareStatement(sql7);

        final ResultSet resultSet = statement.executeQuery();

        ResultSetMetaData metadata = resultSet.getMetaData();

        while (resultSet.next()) {

            for (int i = 1; i <= metadata.getColumnCount(); i++) {

                System.out.print(metadata.getColumnLabel(i) + "=" + resultSet.getString(i) + ",");

            }

            System.out.println();

        }

    }

}

发送测试数据

运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61206.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RDIFramework.NET CS敏捷开发框架 V6.1发布(.NET6+、Framework双引擎、全网唯一)

RDIFramework.NET C/S敏捷开发框架V6.1版本迎来重大更新与调整&#xff0c;全面重新设计业务逻辑代码&#xff0c;代码量减少一半以上&#xff0c;开发更加高效。全系统引入全新字体图标&#xff0c;整个界面焕然一新。底层引入最易上手的ORM框架SqlSugar&#xff0c;让开发更加…

SQL(四) 游标实验、存储过程、函数实验

1.将视图实验中score表中的数据通过以下命令复制到新建的表score_copy中&#xff0c;然后通过带有游标的存储过程对成绩按下面的规则进行更新&#xff1a;80~100&#xff0c;更改为5&#xff1b;60~80&#xff0c;更改为3&#xff1b;低于60分更新为0。 Create table score_co…

TypeORM在Node.js中的高级应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 TypeORM在Node.js中的高级应用 TypeORM在Node.js中的高级应用 TypeORM在Node.js中的高级应用 引言 TypeORM 基本概念 1. 实体&am…

11.15组会汇报

概述 不经意传输协议,也叫茫然传输协议,是一种保护隐私的两方通信协议,消息发送者持有两条待发送的消息,接收者选择一条进行接收,事后发送者对接收者获取哪一条消息毫不知情,接收者对于未选择的消息也无法获取任何信息。即1-out-of-2 OT。在OT协议中,发送方拥有全部的数据权限,…

通过华为鲲鹏认证发行上市的集成平台产品推荐

华为鲲鹏认证是技术实力与品质的权威象征&#xff0c;代表着产品达到了高标准的要求。从技术层面看&#xff0c;认证确保产品与华为鲲鹏架构深度融合&#xff0c;能充分释放鲲鹏芯片的高性能、低功耗优势&#xff0c;为集成平台的高效运行提供强大动力。在安全方面&#xff0c;…

基于ARM+FPGA的电力通信管理机IEC61850规约通信机的实现

本章通过对比传统的通信管理机方案对需要支持多RS485端口的不足之处&#xff0c; 以及在进行海量数据处理时的性能瓶颈&#xff0c;本文使用Intel全新的Cyclone V SoC FPGA芯片&#xff0c;充分发挥FPGA的高速并行运算特性以及现场可配置优势&#xff0c;并且结合 ARM处理器的…

ASP.NET Core Webapi 返回数据的三种方式

ASP.NET Core为Web API控制器方法返回类型提供了如下几个选择&#xff1a; Specific type IActionResult ActionResult<T> 1. 返回指定类型&#xff08;Specific type&#xff09; 最简单的API会返回原生的或者复杂的数据类型&#xff08;比如&#xff0c;string 或者…

ROS机器视觉入门:从基础到人脸识别与目标检测

前言 从本文开始&#xff0c;我们将开始学习ROS机器视觉处理&#xff0c;刚开始先学习一部分外围的知识&#xff0c;为后续的人脸识别、目标跟踪和YOLOV5目标检测做准备工作。我采用的笔记本是联想拯救者游戏本&#xff0c;系统采用Ubuntu20.04&#xff0c;ROS采用noetic。 颜…

Bufferevent and evbuffer

每个bufferevent都有一个输入缓冲区和一个输出缓冲区&#xff0c;它们的类型都是“struct evbuffer”。有数据要写入到bufferevent时&#xff0c;添加数据到输出缓冲区&#xff1b;bufferevent中有数据供读取的时候&#xff0c;从输入缓冲区抽取&#xff08;drain&#xff09;数…

DQN代码详解

代码链接见文末 1. 任务描述 任务目标: 在 MountainCar-v0 环境中,智能体的目标是尽可能快地将一辆小车从山谷的一端(起始位置)驾驶到山谷的另一端(目标位置),通过学习如何利用环境中的动力学来实现目标。智能体只能施加力量(加速)来推动小车,且受到物理规律的限制…

pip/conda install bugs汇总

DNSResolutionError 一直不行&#xff0c;惯防火墙还是不行&#xff0c;可能导致漏洞了&#xff1b; 解决方案&#xff1a; reboot下次try可以刷新DNS缓存: resolvectl flush-cachespip._vendor.urllib3.exceptions.ReadTimeoutError: HTTPSConnectionPool(host‘mirrors.a…

主机型入侵检测系统(HIDS)——Elkeid在Centos7的保姆级安装部署教程

一、HIDS简介 主机型入侵检测系统(Host-based Intrusion Detection System 简称:HIDS);HIDS作为主机的监视器和分析器,主要是专注于主机系统内部(监视系统全部或部分的动态的行为以及整个系统的状态)。 HIDS使用传统的C/S架构,只需要在监测端安装agent即可,且使用用户…

Django启用国际化支持(2)—实现界面内切换语言:activate()

文章目录 ⭐注意⭐1. 配置项目全局设置&#xff1a;启用国际化2. 编写视图函数3. 配置路由4. 界面演示5、扩展自动识别并切换到当前语言设置语言并保存到Session设置语言并保存到 Cookie ⭐注意⭐ 以下操作依赖于 Django 项目的国际化支持。如果你不清楚如何启用国际化功能&am…

软件工程-需求分析与设计-更新中-1.0版

前言: 感觉书本上和线上课程, 讲的太抽象, 不好理解, 但软件开发不就是为了开发应用程序吗?! 干嘛搞这么抽象,对吧, 下面是个人对于软件开发的看法, 结合我的一些看法, 主打简单易懂, 当然,我一IT界小菜鸟, 对软件开发的认识也很浅显, 这个思维导图也仅仅是现阶段我的看…

Springboot之登录模块探索(含Token,验证码,网络安全等知识)

简介 登录模块很简单&#xff0c;前端发送账号密码的表单&#xff0c;后端接收验证后即可~ 淦&#xff01;可是我想多了&#xff0c;于是有了以下几个问题&#xff08;里面还包含网络安全问题&#xff09;&#xff1a; 1.登录时的验证码 2.自动登录的实现 3.怎么维护前后端…

Vue3 虚拟列表组件库 virtual-list-vue3 的使用

Vue3 虚拟列表组件库 virtual-list-vue3 的基本使用 分享个人写的一个基于 Vue3 的虚拟列表组件库&#xff0c;欢迎各位来进行使用与给予一些更好的建议&#x1f60a; 概述&#xff1a;该组件组件库用于提供虚拟化列表能力的组件&#xff0c;用于解决展示大量数据渲染时首屏渲…

http常⻅请求头和响应头详细讲解(笔记)

http常⻅请求头状态码 简介&#xff1a;讲解http常⻅见的请求⽅方法和使⽤用 http1.0定义了了三种&#xff1a;GET: 向服务器器获取资源&#xff0c;⽐比如常⻅见的查询请求POST: 向服务器器提交数据⽽而发送的请求Head: 和get类似&#xff0c;返回的响应中没有具体的内容&am…

(33)iptables设置防火墙策略常用命令(docker环境、非docker环境)

#普通环境&#xff08;非docker&#xff09; # 拒绝所有对端口 31001 的访问 iptables -A INPUT -p tcp --dport 31001 -j DROP # 允许 IP 地址 20.59.30.77 访问端口 31001 (此处用的是虚拟机 所以要使用nat地址的网关) iptables -I INPUT 1 -p tcp -s 20.59.30.77 --dpor…

【嵌入式】关于push老仓库到新仓库的方法

1. 背景 公司项目经常会有需要从开源项目中镜像代码过来的活,所以常常会在自己的服务器上创建一个对应的仓库,然后使用命令将期push过去。为方便日后抄命令,这里记录一下使用的命令。 2. 操作步骤 2.1. 已下载的代码push 特别提醒: 使用此脚本前请确保你修改的代码已保存…

利用云计算实现高效的数据备份与恢复策略

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 利用云计算实现高效的数据备份与恢复策略 利用云计算实现高效的数据备份与恢复策略 利用云计算实现高效的数据备份与恢复策略 引…