pom依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-gcp-starter-pubsub</artifactId><version>1.2.7.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.1-jre</version></dependency>
yml配置
pubsub-secret-key.json 为谷歌云服务账号密钥,密钥生成看谷歌云文档
spring:cloud:gcp:pubsub:enabled: trueproject-id: project-IDcredentials:location: classpath:pubsub-secret-key.json
java代码
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Service;
import com.allsaints.reco.service.PubSubService;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;@Service
@Slf4j
public class PubSubServiceImpl implements PubSubService {String projectId = "project-ID";String subscriptionId = "订阅id";@PostConstructpublic void startSubscriber() throws IOException {ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);MessageReceiver receiver = (message, consumer) -> {// 处理接收到的消息log.info("Received message------------" + message.getData().toStringUtf8());// 确认消息已被处理consumer.ack();};GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();subscriber.startAsync().awaitRunning();subscriber.stopAsync();}@Overridepublic void publishMessage(String message) {Publisher publisher = null;try {GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));ProjectTopicName topicName = ProjectTopicName.of(projectId, "topid-ID");FixedCredentialsProvider credentialsProvider = FixedCredentialsProvider.create(credentials);Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 msdouble retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 secondsDuration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 secondsdouble rpcTimeoutMultiplier = 1.0; // default: 1.0Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 secondsDuration totalTimeout = Duration.ofSeconds(600); // default: 600 secondsRetrySettings retrySettings =RetrySettings.newBuilder().setInitialRetryDelay(initialRetryDelay).setRetryDelayMultiplier(retryDelayMultiplier).setMaxRetryDelay(maxRetryDelay).setInitialRpcTimeout(initialRpcTimeout).setRpcTimeoutMultiplier(rpcTimeoutMultiplier).setMaxRpcTimeout(maxRpcTimeout).setTotalTimeout(totalTimeout).build();publisher = Publisher.newBuilder(topicName).setCredentialsProvider(credentialsProvider).setRetrySettings(retrySettings).build();ByteString data = ByteString.copyFromUtf8(message);PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();ApiFuture<String> res = publisher.publish(pubsubMessage);log.info("pubsub 发布结果 = {}",res);log.info("pubsub 发布结果 = {}",res.get());
// publisher.shutdown();
// publisher.awaitTermination(30, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();} finally {if (publisher != null) {// When finished with the publisher, shutdown to free up resources.publisher.shutdown();try {publisher.awaitTermination(1, TimeUnit.MINUTES);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}@Overridepublic void pullMessages() {// 创建订阅名称ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);PullRequest pullRequest = PullRequest.newBuilder().setSubscription(subscriptionName.toString()).setMaxMessages(1).build();try {GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder().setCredentialsProvider(() -> credentials).build();GrpcSubscriberStub subscriberStub = GrpcSubscriberStub.create(subscriberStubSettings);// 发送拉取请求并处理接收到的消息List<ReceivedMessage> list = subscriberStub.pullCallable().call(pullRequest).getReceivedMessagesList();list.forEach(message -> {// 处理消息逻辑log.info("Received message: " + message.getMessage().getData().toStringUtf8());});subscriberStub.shutdown();} catch (IOException e) {log.error("拉取pubsub消息异常 = {}",e);}
// String projectId = "asofone-composer2-project";
// String subscriptionId = "java-test-sub";
// Integer numOfMessages = 10;
// try {
// subscribeSyncExample(projectId, subscriptionId, numOfMessages);
// }catch (Exception e) {
// log.error("{}",e);
// }}/***************************************************/
// public static void main(String... args) throws Exception {
// String projectId = "asofone-composer2-project";
// String subscriptionId = "java-test-sub";
// Integer numOfMessages = 10;
//
// subscribeSyncExample(projectId, subscriptionId, numOfMessages);
// }public static void subscribeSyncExample(String projectId, String subscriptionId, Integer numOfMessages)throws IOException {GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("src/main/resources/pubsub-secret-key.json"));// 创建 SubscriberStubSettings.Builder 对象SubscriberStubSettings.Builder subscriberSettingsBuilder = SubscriberStubSettings.newBuilder();// 将 GoogleCredentials 对象设置到 SubscriberStubSettings.Builder 中subscriberSettingsBuilder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));// 构建 SubscriberStubSettings 对象
// SubscriberStubSettings subscriberSettings = subscriberSettingsBuilder.build();SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder().setTransportChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(20 * 1024 * 1024).build()) // 20MB (maximum message size)).build();try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);PullRequest pullRequest = PullRequest.newBuilder().setMaxMessages(numOfMessages).setSubscription(subscriptionName).build();// Use pullCallable().futureCall to asynchronously perform this operation.PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);// Stop the program if the pull response is empty to avoid acknowledging// an empty list of ack IDs.if (pullResponse.getReceivedMessagesList().isEmpty()) {log.info("消息列表 = {}",pullResponse.getReceivedMessagesList());return;}List<String> ackIds = new ArrayList<>();for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {// Handle received message// ...ackIds.add(message.getAckId());}// Acknowledge received messages.AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().setSubscription(subscriptionName).addAllAckIds(ackIds).build();// Use acknowledgeCallable().futureCall to asynchronously perform this// operation.subscriber.acknowledgeCallable().call(acknowledgeRequest);System.out.println(pullResponse.getReceivedMessagesList());}}
}