I'm trying to create an integration test with PubSub emulator based on the example from this GitHub repo which looks like
@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
public class PubSubIntegrationTests {
private static final String PROJECT_ID = "test-project";
@Container
private static final PubSubEmulatorContainer pubsubEmulator =
new PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators"));
@DynamicPropertySource
static void emulatorProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.gcp.pubsub.emulator-host", pubsubEmulator::getEmulatorEndpoint);
}
@BeforeAll
static void setup() throws Exception {
ManagedChannel channel =
ManagedChannelBuilder.forTarget("dns:///" + pubsubEmulator.getEmulatorEndpoint())
.usePlaintext()
.build();
TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
TopicAdminClient topicAdminClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(channelProvider)
.build());
SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create(
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build());
PubSubAdmin admin =
new PubSubAdmin(() -> PROJECT_ID, topicAdminClient, subscriptionAdminClient);
admin.createTopic("test-topic");
admin.createSubscription("test-subscription", "test-topic");
admin.close();
channel.shutdown();
}
// By default, autoconfiguration will initialize application default credentials.
// For testing purposes, don't use any credentials. Bootstrap w/ NoCredentialsProvider.
@TestConfiguration
static class PubSubEmulatorConfiguration {
@Bean
CredentialsProvider googleCredentials() {
return NoCredentialsProvider.create();
}
}
@Autowired PubSubSender sender;
@Autowired PubSubSubscriberTemplate subscriberTemplate;
@Autowired PubSubPublisherTemplate publisherTemplate;
@Test
void testSend() throws ExecutionException, InterruptedException {
ListenableFuture<String> future = sender.send("hello!");
List<AcknowledgeablePubsubMessage> msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
@Test
void testWorker() throws ExecutionException, InterruptedException {
ListenableFuture<String> future = publisherTemplate.publish("test-topic", "hi!");
List<PubsubMessage> messages = Collections.synchronizedList(new LinkedList<>());
PubSubWorker worker =
new PubSubWorker(
"test-subscription",
subscriberTemplate,
(msg) -> {
messages.add(msg);
});
worker.start();
await().until(() -> messages, not(empty()));
assertEquals(1, messages.size());
assertEquals(future.get(), messages.get(0).getMessageId());
assertEquals("hi!", messages.get(0).getData().toStringUtf8());
worker.stop();
}
@AfterEach
void teardown() {
// Drain any messages that are still in the subscription so that they don't interfere with
// subsequent tests.
await().until(() -> subscriberTemplate.pullAndAck("test-subscription", 1000, true), hasSize(0));
}
}
all works fine for the above example but when I want to test my implementation as follows
@Autowired
private FunctionCatalog catalog;
@Test
void testSendB() throws ExecutionException, InterruptedException {
Consumer<PubSubMessage> function = catalog.lookup(MyFunction.class, FUNCTION_DEFINITION);
var pubSubMessage = new PubSubMessage();
pubSubMessage.setData(Base64.getEncoder().encodeToString(EMPTY_MESSAGE.getBytes()));
function.accept(pubSubMessage);
List<AcknowledgeablePubsubMessage> msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
it will throw error:
java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=test-topic).
where my service implementation uses Publisher
instead of PubSubPublisherTemplate
from the example:
private final Publisher publisher;
public void publishMessage(String message) {
var byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
var pubsubApiMessage = getPubsubApiMessage(byteStr);
try {
publish(pubsubApiMessage);
} catch (Exception e) {
log.error("Error during event publishing: " + e.getMessage(), e);
throw new RuntimeException(e);
}
}
private void publish(PubsubMessage pubsubApiMessage) throws Exception {
publisher.publish(pubsubApiMessage).get();
}
private PubsubMessage getPubsubApiMessage(ByteString byteStr) {
return PubsubMessage.newBuilder()
.setData(byteStr)
.build();
}
and works fine when deployed to GCP but not in this case of integration test using PubSub emulator.
It came up that the PubSub emulator requires its own test publisher which can be created as a bean in configuration. Example:
@Configuration
public class PubSubConfig {
@Value("${gcp.pubsub.topic.name}")
private String topicName;
@Value("${gcp.project.id}")
private String projected;
@Value("${spring.cloud.gcp.pubsub.emulator-host}")
private String host;
private static final CredentialsProvider CREDENTIALS_PROVIDER = NoCredentialsProvider.create();
@Bean
public SubscriberStub testSubscriber(
FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
return GrpcSubscriberStub.create(SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(fixedTransportChannelProvider)
.setCredentialsProvider(CREDENTIALS_PROVIDER)
.build());
}
@Primary
@Bean
public Publisher testPublisher(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
return Publisher.newBuilder(ProjectTopicName.of(projectId, topicName))
.setChannelProvider(fixedTransportChannelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build();
}
@Bean
public TopicAdminClient getTopicAdminClient(
FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
return TopicAdminClient.create(TopicAdminSettings.newBuilder()
.setTransportChannelProvider(fixedTransportChannelProvider)
.setCredentialsProvider(CREDENTIALS_PROVIDER)
.build());
}
@Primary
@Bean
public FixedTransportChannelProvider getChannelProvider() {
var channel = ManagedChannelBuilder.forTarget(host)
.usePlaintext()
.build();
return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
}
@Bean
public SubscriptionAdminClient createSubscriptionAdmin(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
return SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder()
.setCredentialsProvider(
NoCredentialsProvider.create())
.setTransportChannelProvider(
fixedTransportChannelProvider)
.build());
}
}