60、Flink 的异步 IO 算子使用异步 Http 客户端查高德地图

1、概述

  Http 异步客户端设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态输入:同时发起4条查询输出:间隔10秒,同时返回4条数据JDBC 线程池+链接池设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态输入:同时发起4条查询输出:间隔10秒,先返回两条数据,间隔10秒,再返回两条数据

2、代码示例

package com.xu.flink.datastream.day11;import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.util.concurrent.Executors;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.util.EntityUtils;import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.concurrent.TimeUnit;public class _06_HttpAsyncQueryGaoDe {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);AsyncDataStream.unorderedWait(lines, new AsyncHttpQueryFunction(), 20, TimeUnit.SECONDS, 2).print();env.execute();}
}/*** Http 异步客户端* 设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态* 输入:同时发起4条查询* 输出:间隔10秒,同时返回4条数据* <p>* JDBC 线程池+链接池* 设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态* 输入:同时发起4条查询* 输出:间隔10秒,先返回两条数据,间隔10秒,再返回两条数据*/
class AsyncHttpQueryFunction extends RichAsyncFunction<String, _06_OrderBean> {private static final String key = "***";private static CloseableHttpAsyncClient httpClient;@Overridepublic void open(Configuration parameters) throws Exception {//创建异步 HttpClient 连接池,并初始化异步的 HttpClientRequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(3000).setConnectTimeout(3000).build();httpClient = HttpAsyncClients.custom().setMaxConnTotal(2).setDefaultRequestConfig(requestConfig).build();System.out.println("open 方法被调用");System.out.println("httpClient=>" + httpClient.hashCode());httpClient.start();}@Overridepublic void asyncInvoke(String line, ResultFuture<_06_OrderBean> resultFuture) throws Exception {try {// 1、解析JSON,获取经纬度信息_06_OrderBean orderBean = JSON.parseObject(line, _06_OrderBean.class);double longitude = orderBean.getLongitude();double latitude = orderBean.getLatitude();// 创建 httpGet 请求HttpGet httpGet = new HttpGet("https://restapi.amap.com/v3/geocode/regeo?&location=" + longitude + "," + latitude + "&key=" + key);// 2、通过 httpClient 提交异步请求,获取 future 对象// callback 是回调函数(也可通过回调函数拿结果)// 注意:此处使用 task 线程,如果此处是异步提交,则不会阻塞;如果此处是同步提交,则会阻塞;Future<HttpResponse> future = httpClient.execute(httpGet, null);// 3、从成功的 Future 中取数据,返回 orderBean// 使用 Executors.directExecutor() 获取返回结果// 注意:此处为异步获取返回结果,会使用单独的线程池,即使用 get() 方法,也不会阻塞 task 线程CompletableFuture.supplyAsync(new Supplier<_06_OrderBean>() {@Overridepublic _06_OrderBean get() {try {HttpResponse response = future.get();String province = null;String city = null;String district = null;if (response.getStatusLine().getStatusCode() == 200) {//拿出响应的实例对象HttpEntity entity = response.getEntity();JSONObject jsonObject = JSON.parseObject(EntityUtils.toString(entity));JSONObject regeocodeObject = jsonObject.getJSONObject("regeocode");if (regeocodeObject != null && !regeocodeObject.isEmpty()) {JSONObject addObject = regeocodeObject.getJSONObject("addressComponent");district = addObject.getString("district");city = addObject.getString("city");province = addObject.getString("province");}}orderBean.setProvince(province);orderBean.setCity(city);orderBean.setDistrict(district);return orderBean;} catch (Exception e) {return null;}}}, Executors.directExecutor()).thenAccept(new Consumer<_06_OrderBean>() {@Overridepublic void accept(_06_OrderBean resultOrderBean) {resultFuture.complete(Collections.singleton(resultOrderBean));}});} catch (Exception e) {resultFuture.complete(Collections.singleton(null));}}@Overridepublic void timeout(String input, ResultFuture<_06_OrderBean> resultFuture) throws Exception {resultFuture.completeExceptionally(new RuntimeException(input + "=查询超时"));}@Overridepublic void close() throws Exception {httpClient.close();}
}
-----------------------------------------------------------------
@Data
@NoArgsConstructor
@AllArgsConstructor
public class _06_OrderBean {private String oid;private String uid;private double money;private double longitude;private double latitude;private String province;private String city;private String district;@Overridepublic String toString() {return "OrderBean{" +"oid='" + oid + '\'' +", uid='" + uid + '\'' +", money=" + money +", longitude=" + longitude +", latitude=" + latitude +", province='" + province + '\'' +", city='" + city + '\'' +", district='" + district + '\'' +'}';}
}

3、测试用例

{"oid":"o001","uid":"u001","money":99.99,"longitude":115.690417, "latitude":36.239344}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/37495.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谷歌SEO网站SEO优化诊断有哪些点?

在以下几种场景中&#xff0c;进行SEO审查尤为关键&#xff1a; &#xff08;1&#xff09;当你接手一个新项目或新网站时&#xff0c;了解其当前状况是至关重要的第一步 &#xff08;2&#xff09;当搜索流量出现意外下降时&#xff0c;这可能是技术问题或被惩罚的信号&…

了解SENT协议及其应用

了解SENT协议及其应用 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们来了解一种在汽车电子领域广泛应用的通信协议——SENT协议。SENT协议以其高效、可…

OpenAI禁止中国使用API,国内大模型市场何去何从

GPT-5 一年半后发布&#xff1f;对此你有何期待&#xff1f; 前言 前言&#xff1a; 近日&#xff0c;OpenAI宣布禁止中国用户使用其API&#xff0c;这一决策引起了国内大模型市场的广泛关注。面对这一挑战&#xff0c;国内大模型市场的发展路径和前景成为业界热议的焦点。本…

小时候的子弹击中了现在的我-hive进阶:案例解析(第18天)

系列文章目录 一、Hive表操作 二、数据导入和导出 三、分区表 四、官方文档&#xff08;了解&#xff09; 五、分桶表&#xff08;熟悉&#xff09; 六、复杂类型&#xff08;熟悉&#xff09; 七、Hive乱码解决&#xff08;操作。可以不做&#xff0c;不影响&#xff09; 八、…

Vue3学习笔记<->nginx部署vue项目(3)

安装nginx vue项目通常部署到nginx上&#xff0c;所以先安装一个nginx。为了方便安装的是windows版nginx&#xff0c;解压就能用。 项目参考上一篇文章《Vue3学习笔记&#xff1c;-&#xff1e;创建第一个vue项目》《Vue3学习笔记&#xff1c;-&#xff1e;创建第一个vue项目》…

uniapp启动页面鉴权页面闪烁问题

在使用uni-app开发app 打包完成后如果没有token&#xff0c;那么就在onLaunch生命周期里面判断用户是否登录并跳转至登录页。 但是在app中页面会先进入首页然后再跳转至登录页&#xff0c;十分影响体验。 处理方法&#xff1a; 使用plus.navigator.closeSplashscreen() 官网…

SpringBoot学习05-[SpringBoot的嵌入式Servlet容器]

SpringBoot的嵌入式Servlet容器 嵌入式Servlet容器servlet容器-嵌入式servlet容器配置修改通过全局配置文件修改修改添加实现了WebServerFactoryCustomizer接口的bean来进行修改 servlet容器-注册servlet三大组件 嵌入式Servlet容器 SpringBoot包含对嵌入式Tomcat、Jetty、Und…

编写和使用Linux Makefile

编写和使用Linux Makefile 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将深入探讨在Linux环境下如何编写和使用Makefile&#xff0c;这是一个用于自…

嵌入式学习——硬件(ARM体系架构)——day51

1. S3C2440基础知识——一条指令四个字节 1.1 定义 S3C2440 是三星&#xff08;Samsung&#xff09;公司设计的一款基于 ARM920T 核心的微处理器&#xff0c;广泛应用于嵌入式系统中&#xff0c;属于三星的 S3C24xx 系列。 1.2 处理器核心 ARM920T&#xff1a;基于 ARM v5T …

Spring Boot与Elasticsearch的集成应用

Spring Boot与Elasticsearch的集成应用 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将探讨如何在Spring Boot应用中集成Elasticsearch&#xff0c;以…

某配送平台未授权访问和弱口令(附赠nuclei默认密码验证脚本)

找到一个某src的子站&#xff0c;通过信息收集插件&#xff0c;发现ZABBIX-监控系统&#xff0c;可以日一下 使用谷歌搜索历史漏洞&#xff1a;zabbix漏洞 通过目录扫描扫描到后台&#xff0c;谷歌搜索一下有没有默认弱口令 成功进去了&#xff0c;挖洞就是这么简单 搜索文章还…

探秘Java版ERP管理系统源码:基于Spring Cloud Alibaba与Spring Boot的微服务架构解析

数字化时代的智能ERP管理系统&#xff1a;引领企业高效管理与创新发展 随着数字化浪潮的席卷&#xff0c;现代企业对于高效、稳定、易于扩展的管理系统需求愈发迫切。为了满足这一需求&#xff0c;我们倾力打造了一款基于Java技术的企业级资源规划&#xff08;ERP&#xff09;…

WSL(Windows Subsystem for Linux)替代VirtualBox和Vmware运行轻量级的linux服务器

要在Windows上开启WSL&#xff08;Windows Subsystem for Linux&#xff09;&#xff0c;请按照以下步骤操作&#xff1a; 检查系统兼容性&#xff1a; 确保你的Windows版本支持WSL。WSL支持Windows 10版本1607及更高版本&#xff0c;以及Windows 11。你可以在“设置”>“系…

【ONE·Linux || 高级IO(一)】

总言 主要内容&#xff1a;介绍五种IO模型的基本概念、学习IO多路转接&#xff08;select、poll编程模型&#xff09;。       文章目录 总言1、问题引入1.1、网络通信与IO1.2、五种IO模型1.2.1、举例引入1.2.2、IO模型具体含义介绍1.2.2.1、阻塞式IO1.2.2.2、非阻塞轮询检…

「树莓派入门」树莓派基础07-系统备份与还原

本文主要介绍树莓派系统备份的重要性和两种备份方式&#xff1a;全卡备份和压缩备份。同时&#xff0c;也介绍了如何使用软件和终端命令进行备份和还原。 一、系统备份的重要性 系统备份是保护树莓派数据和设置的重要手段。它可以帮助你在系统出现问题时快速恢复到正常状态。 …

【编译原理中的语法分析】

编译原理中的语法分析 一、什么是语法分析&#xff1f;1.1 定义1.2 作用 二、常见的语法分析方法2.1 递归下降分析2.2 LL(1) 分析2.3 LR 分析 三、语法分析的实现过程3.1 词法分析3.2 语法规则定义3.3 语法分析器设计与实现 一、什么是语法分析&#xff1f; 1.1 定义 语法分析…

第三节:如何理解Spring的两个特性IOC和AOP(自学Spring boot 3.x第一天)

大家好&#xff0c;我是网创有方&#xff0c;接下来教大家如何理解Spring的两个特性IOC和AOP。本节有点难&#xff0c;大家多理解。 IOC&#xff08;控制反转&#xff09; 定义与核心思想&#xff1a; IOC&#xff0c;全称Inversion of Control&#xff0c;即控制反转。 其核…

APP项目测试 之 熟悉APP项目

1.APP应用环境与web项目环境对比 APPWeb相同点 1.APP和web使用的后端服务器是相同的 2. 前后端都使用HTTP协议进行交互&#xff08;也有部分APP用socket来互&#xff09; 不同点 1.APP是C/S结构 2.APP前后端交互的数据格式以Json 为主 1.web浏览器是B/S结构 2.web前后端交互…

把飞书云文档变成HTML邮件:问题挑战与解决历程

一、背景 云文档转HTML邮件 基于公司内部的飞书办公套件&#xff0c;早在去年6月&#xff0c;我们就建设了将飞书云文档转译成HTML邮件的能力&#xff0c;方便同学们在编写邮件文档和发送邮件时&#xff0c;都能有较好的体验和较高的效率。 当下问题 要被邮件客户端识别&am…

样式继承:CSS中的自然法则

在网页设计的世界中&#xff0c;CSS&#xff08;层叠样式表&#xff09;是我们构建视觉表现的基石。而在CSS的众多特性中&#xff0c;样式继承是一个核心概念&#xff0c;它决定了元素样式如何在页面上传递。今天&#xff0c;我们就来深入探讨一下样式继承的奥秘。 什么是样式…