java 连接google cloud pubsub做消息发布和消费

pom依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-gcp-starter-pubsub</artifactId><version>1.2.7.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.1-jre</version></dependency>

yml配置

pubsub-secret-key.json 为谷歌云服务账号密钥,密钥生成看谷歌云文档

spring:cloud:gcp:pubsub:enabled: trueproject-id: project-IDcredentials:location: classpath:pubsub-secret-key.json

java代码

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Service;
import com.allsaints.reco.service.PubSubService;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;@Service
@Slf4j
public class PubSubServiceImpl implements PubSubService {String projectId = "project-ID";String subscriptionId = "订阅id";@PostConstructpublic void startSubscriber() throws IOException {ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);MessageReceiver receiver = (message, consumer) -> {// 处理接收到的消息log.info("Received message------------" + message.getData().toStringUtf8());// 确认消息已被处理consumer.ack();};GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();subscriber.startAsync().awaitRunning();subscriber.stopAsync();}@Overridepublic void publishMessage(String message) {Publisher publisher = null;try {GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));ProjectTopicName topicName = ProjectTopicName.of(projectId, "topid-ID");FixedCredentialsProvider credentialsProvider = FixedCredentialsProvider.create(credentials);Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 msdouble retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 secondsDuration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 secondsdouble rpcTimeoutMultiplier = 1.0; // default: 1.0Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 secondsDuration totalTimeout = Duration.ofSeconds(600); // default: 600 secondsRetrySettings retrySettings =RetrySettings.newBuilder().setInitialRetryDelay(initialRetryDelay).setRetryDelayMultiplier(retryDelayMultiplier).setMaxRetryDelay(maxRetryDelay).setInitialRpcTimeout(initialRpcTimeout).setRpcTimeoutMultiplier(rpcTimeoutMultiplier).setMaxRpcTimeout(maxRpcTimeout).setTotalTimeout(totalTimeout).build();publisher = Publisher.newBuilder(topicName).setCredentialsProvider(credentialsProvider).setRetrySettings(retrySettings).build();ByteString data = ByteString.copyFromUtf8(message);PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();ApiFuture<String> res = publisher.publish(pubsubMessage);log.info("pubsub 发布结果 = {}",res);log.info("pubsub 发布结果 = {}",res.get());
//			publisher.shutdown();
//			publisher.awaitTermination(30, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();} finally {if (publisher != null) {// When finished with the publisher, shutdown to free up resources.publisher.shutdown();try {publisher.awaitTermination(1, TimeUnit.MINUTES);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}@Overridepublic void pullMessages() {// 创建订阅名称ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);PullRequest pullRequest = PullRequest.newBuilder().setSubscription(subscriptionName.toString()).setMaxMessages(1).build();try {GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder().setCredentialsProvider(() -> credentials).build();GrpcSubscriberStub subscriberStub = GrpcSubscriberStub.create(subscriberStubSettings);// 发送拉取请求并处理接收到的消息List<ReceivedMessage> list = subscriberStub.pullCallable().call(pullRequest).getReceivedMessagesList();list.forEach(message -> {// 处理消息逻辑log.info("Received message: " + message.getMessage().getData().toStringUtf8());});subscriberStub.shutdown();} catch (IOException e) {log.error("拉取pubsub消息异常 = {}",e);}
//		String projectId = "asofone-composer2-project";
//		String subscriptionId = "java-test-sub";
//		Integer numOfMessages = 10;
//		try {
//			subscribeSyncExample(projectId, subscriptionId, numOfMessages);
//		}catch (Exception e) {
//			log.error("{}",e);
//		}}/***************************************************/
//	public static void main(String... args) throws Exception {
//		String projectId = "asofone-composer2-project";
//		String subscriptionId = "java-test-sub";
//		Integer numOfMessages = 10;
//
//		subscribeSyncExample(projectId, subscriptionId, numOfMessages);
//	}public static void subscribeSyncExample(String projectId, String subscriptionId, Integer numOfMessages)throws IOException {GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));// 创建 SubscriberStubSettings.Builder 对象SubscriberStubSettings.Builder subscriberSettingsBuilder = SubscriberStubSettings.newBuilder();// 将 GoogleCredentials 对象设置到 SubscriberStubSettings.Builder 中subscriberSettingsBuilder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));// 构建 SubscriberStubSettings 对象
//		SubscriberStubSettings subscriberSettings = subscriberSettingsBuilder.build();SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder().setTransportChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(20 * 1024 * 1024).build()) // 20MB (maximum message size)).build();try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);PullRequest pullRequest = PullRequest.newBuilder().setMaxMessages(numOfMessages).setSubscription(subscriptionName).build();// Use pullCallable().futureCall to asynchronously perform this operation.PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);// Stop the program if the pull response is empty to avoid acknowledging// an empty list of ack IDs.if (pullResponse.getReceivedMessagesList().isEmpty()) {log.info("消息列表 = {}",pullResponse.getReceivedMessagesList());return;}List<String> ackIds = new ArrayList<>();for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {// Handle received message// ...ackIds.add(message.getAckId());}// Acknowledge received messages.AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().setSubscription(subscriptionName).addAllAckIds(ackIds).build();// Use acknowledgeCallable().futureCall to asynchronously perform this// operation.subscriber.acknowledgeCallable().call(acknowledgeRequest);System.out.println(pullResponse.getReceivedMessagesList());}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28628.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MFC】08.MFC消息,自定义消息,常用控件(MFC菜单创建大总结),工具栏,状态栏-笔记

本专栏上几篇文章讲解了MFC几大机制&#xff0c;今天带领大家学习MFC自定义消息以及常用控件&#xff0c;最常用的控件请查看本专栏第一二篇文章&#xff0c;今天这篇文章介绍工具栏&#xff0c;菜单和状态栏&#xff0c;以及菜单创建大总结。 文章目录 MFC消息分类&#xff1…

【Sa-Token】9、Sa-Token实现在线用户管理功能

尽管框架将大部分操作提供了简易的封装&#xff0c;但在一些特殊场景下&#xff0c;我们仍需要绕过框架&#xff0c;直达数据底层进行一些操作。 1、官方文档 会话查询 https://sa-token.cc/doc.html#/up/search-sessionSa-Token提供以下API助你直接操作会话列表&#xff1a…

【EI复现】售电市场环境下电力用户选择售电公司行为研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

如何在MongoDB中添加新用户

如何在MongoDB中添加新用户&#xff1f; MongoDB是一款流行的NoSQL数据库&#xff0c;它的可扩展性强&#xff0c;可进行分布式部署&#xff0c;且具有高可用性。其许多优势使得越来越多的企业和组织选择MongoDB作为其数据库系统。本文将介绍如何在MongoDB中添加新用户。 第一步…

网页版Java五子棋项目(一)websocket【服务器给用户端发信息】

网页版Java五子棋项目&#xff08;一&#xff09;websocket【服务器给用户端发信息】 一、为什么要用websocket二、websocket介绍原理解析 三、代码演示1. 创建后端api&#xff08;TestAPI&#xff09;新增知识点&#xff1a;extends TextWebSocketHandler重写各种方法 2. 建立…

回顾 OWASP 机器学习十大风险

日复一日&#xff0c;越来越多的机器学习 (ML) 模型正在开发中。机器学习模型用于查找训练数据中的模式&#xff0c;可以产生令人印象深刻的检测和分类能力。机器学习已经为人工智能的许多领域提供了动力&#xff0c;包括情感分析、图像分类、面部检测、威胁情报等。 数十亿美…

生活小妙招之UE custom Decal

因为这几年大部分时间都在搞美术&#xff0c;所以博客相关的可能会鸽的比较多&#xff0c;阿巴阿巴 https://twitter.com/Tuatara_Games/status/1674034744084905986 之前正好看到一个贴花相关的小技巧&#xff0c;正好做一个记录&#xff0c;也在这个的基础上做一些小的拓展…

微信小程序前后页面传值

微信小程序前后页面传值 从前一个页面跳转到下一个页面&#xff0c;如何传递参数&#xff1f;从后一个页面返回前一个页面&#xff0c;如何回调参数&#xff1f; 向后传值 从前一个页面跳转到下一个页面并传值。 前页面&#xff1a;在跳转链接中添加参数并传递 wx.navigat…

MySQL体系结构和存储引擎【InnoDB特性】【4种隔离级别】【聚集索引】

1.概念 1.1 数据库 文件的集合 1.2 数据库实例 程序 1.3 数据库 & 数据库实例的关系 应用程序通过数据库实例和数据库打交道 2.InnoDB存储引擎 2.1 特性 2.1.1 支持事务 2.1.2 具有行锁设计 默认的读取操作不会产生锁 2.1.3 支持外键 2.1.4 通过多版本并发&am…

compile_and_runtime_not_namespaced_r_class_jar\debug\R.jar: 另一个程序正在使用

问题情况&#xff1a; run App的时候&#xff0c;提示该文件被占用 想要clean Project&#xff0c;还是提示该文件被占用&#xff0c;这个文件和连带的文件夹都无法被删除。 方法1&#xff1a; AndroidStudio下方的terminal&#xff08;没有这个窗口的话&#xff0c;从上面的…

Docker基本使用

查看本地镜像 查看本地&#xff1a;docker imagesPull镜像&#xff1a;docker pull nginx:latest登录镜像&#xff1a;docker login hub.docker.com -u **** -p ****制作镜像&#xff1a;docker build -t xxxx:v1push&#xff1a;docker push xxx:v1删除镜像:docker rmi #imag…

ELK 企业级日志分析系统(二)

目录 ELK Kiabana 部署&#xff08;在 Node1 节点上操作&#xff09; 1&#xff0e;安装 Kiabana 2&#xff0e;设置 Kibana 的主配置文件 3&#xff0e;启动 Kibana 服务 4&#xff0e;验证 Kibana 5&#xff0e;将 Apache 服务器的日志&#xff08;访问的、错误的&#x…

DC电源模块关于多路输出的问题

BOSHIDA DC电源模块关于多路输出的问题 DC电源模块通常具备多路输出功能&#xff0c;这使得它在实际应用中具有极高的灵活性和可扩展性。当需要为多个不同的负载提供电源时&#xff0c;多路输出的设计可以降低整个系统的成本和复杂度&#xff0c;同时也可以减少系统空间的占用。…

react钩子副作用理解

useEffect(() > { fetch(‘https://api.example.com/data’) .then(response > response.json()) .then(data > setData(data)); }, []); 怎么理解这个[] 在 React 中&#xff0c;useEffect 钩子用于处理副作用&#xff0c;比如数据获取、订阅、手动 DOM 操作等。useE…

基于Spring Boot的影视点播网站设计与实现(Java+spring boot+MySQL)

获取源码或者论文请私信博主 演示视频&#xff1a; 基于Spring Boot的影视点播网站设计与实现&#xff08;Javaspring bootMySQL&#xff09; 使用技术&#xff1a; 前端&#xff1a;html css javascript jQuery ajax thymeleaf 微信小程序 后端&#xff1a;Java springboot…

Centos 从0搭建grafana和Prometheus 服务以及问题解决

下载 虚拟机下载 https://customerconnect.vmware.com/en/downloads/info/slug/desktop_end_user_computing/vmware_workstation_player/17_0 cenos 镜像下载 https://www.centos.org/download/ grafana 服务下载 https://grafana.com/grafana/download/7.4.0?platformlinux …

C语言易错知识点总结1

生命周期&作用域 第 1 题&#xff08;单选题&#xff09; 题目名称&#xff1a; 局部变量的作用域是&#xff1a; 题目内容&#xff1a; A .main函数内部 B .整个程序 C .main函数之前 D .局部变量所在的局部范围 答案解析&#xff1a; 在C语言中&#xff0c;变量分为局部…

视频声音怎么转换成文字?这四种转换方法很简单

将视频声音转换成文字的好处不仅仅限于方便记录、保存和查阅视频内容。它还可以大大提高视频内容的可访问性和可搜索性&#xff0c;使得非母语人士、听力障碍者等人群更容易理解视频内容&#xff0c;并且可以更快速地找到相关信息。此外&#xff0c;将视频声音转换成文字还可以…

led台灯哪些牌子性价比高?推荐几款性价比高的护眼台灯

作为学龄期儿童的家长&#xff0c;最担心的就是孩子长时间学习影响视力健康。无论是上网课、写作业、玩桌游还是陪伴孩子读绘本&#xff0c;都需要一个足够明亮的照明环境&#xff0c;因此选购一款为孩子视力发展保驾护航的台灯非常重要。为大家推荐几款性价比高的护眼台灯。 …

gitee分支合并

合并dev分支到master&#xff08;合并到主分支&#xff09; git checkout master git merge dev //这里的dev表示你的分支名称 git push //推送到远程仓库 效果如下图 不报错就表示推送成功了&#xff0c;希望能帮助各位小伙伴