javaspring-bootgoogle-cloud-pubsubtestcontainersgoogle-cloud-pubsub-emulator

Testing GCP PubSub with Testcontainer PubSub emulator


I'm trying to create an integration test with PubSub emulator based on the example from this GitHub repo which looks like

@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
public class PubSubIntegrationTests {
  private static final String PROJECT_ID = "test-project";

  @Container
  private static final PubSubEmulatorContainer pubsubEmulator =
      new PubSubEmulatorContainer(
          DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators"));

  @DynamicPropertySource
  static void emulatorProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.cloud.gcp.pubsub.emulator-host", pubsubEmulator::getEmulatorEndpoint);
  }

  @BeforeAll
  static void setup() throws Exception {
    ManagedChannel channel =
        ManagedChannelBuilder.forTarget("dns:///" + pubsubEmulator.getEmulatorEndpoint())
            .usePlaintext()
            .build();
    TransportChannelProvider channelProvider =
        FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

    TopicAdminClient topicAdminClient =
        TopicAdminClient.create(
            TopicAdminSettings.newBuilder()
                .setCredentialsProvider(NoCredentialsProvider.create())
                .setTransportChannelProvider(channelProvider)
                .build());

    SubscriptionAdminClient subscriptionAdminClient =
        SubscriptionAdminClient.create(
            SubscriptionAdminSettings.newBuilder()
                .setTransportChannelProvider(channelProvider)
                .setCredentialsProvider(NoCredentialsProvider.create())
                .build());

    PubSubAdmin admin =
        new PubSubAdmin(() -> PROJECT_ID, topicAdminClient, subscriptionAdminClient);

    admin.createTopic("test-topic");
    admin.createSubscription("test-subscription", "test-topic");

    admin.close();
    channel.shutdown();
  }

  // By default, autoconfiguration will initialize application default credentials.
  // For testing purposes, don't use any credentials. Bootstrap w/ NoCredentialsProvider.
  @TestConfiguration
  static class PubSubEmulatorConfiguration {
    @Bean
    CredentialsProvider googleCredentials() {
      return NoCredentialsProvider.create();
    }
  }

  @Autowired PubSubSender sender;

  @Autowired PubSubSubscriberTemplate subscriberTemplate;
  @Autowired PubSubPublisherTemplate publisherTemplate;

  @Test
  void testSend() throws ExecutionException, InterruptedException {
    ListenableFuture<String> future = sender.send("hello!");

    List<AcknowledgeablePubsubMessage> msgs =
        await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));

    assertEquals(1, msgs.size());
    assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
    assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());

    for (AcknowledgeablePubsubMessage msg : msgs) {
      msg.ack();
    }
  }

  @Test
  void testWorker() throws ExecutionException, InterruptedException {
    ListenableFuture<String> future = publisherTemplate.publish("test-topic", "hi!");

    List<PubsubMessage> messages = Collections.synchronizedList(new LinkedList<>());
    PubSubWorker worker =
        new PubSubWorker(
            "test-subscription",
            subscriberTemplate,
            (msg) -> {
              messages.add(msg);
            });
    worker.start();

    await().until(() -> messages, not(empty()));
    assertEquals(1, messages.size());
    assertEquals(future.get(), messages.get(0).getMessageId());
    assertEquals("hi!", messages.get(0).getData().toStringUtf8());

    worker.stop();
  }

  @AfterEach
  void teardown() {
    // Drain any messages that are still in the subscription so that they don't interfere with
    // subsequent tests.
    await().until(() -> subscriberTemplate.pullAndAck("test-subscription", 1000, true), hasSize(0));
  }
}

all works fine for the above example but when I want to test my implementation as follows

    @Autowired
    private FunctionCatalog catalog;

    @Test
    void testSendB() throws ExecutionException, InterruptedException {
        Consumer<PubSubMessage> function = catalog.lookup(MyFunction.class, FUNCTION_DEFINITION);
        var pubSubMessage = new PubSubMessage();
        pubSubMessage.setData(Base64.getEncoder().encodeToString(EMPTY_MESSAGE.getBytes()));
        function.accept(pubSubMessage);

        List<AcknowledgeablePubsubMessage> msgs =
                await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));

        assertEquals(1, msgs.size());
        assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
        assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());

        for (AcknowledgeablePubsubMessage msg : msgs) {
            msg.ack();
        }
    }

it will throw error:

java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=test-topic).

where my service implementation uses Publisher instead of PubSubPublisherTemplate from the example:

    private final Publisher publisher;

    public void publishMessage(String message) {
        var byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
        var pubsubApiMessage = getPubsubApiMessage(byteStr);

        try {
            publish(pubsubApiMessage);
        } catch (Exception e) {
            log.error("Error during event publishing: " + e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    private void publish(PubsubMessage pubsubApiMessage) throws Exception {
        publisher.publish(pubsubApiMessage).get();
    }

    private PubsubMessage getPubsubApiMessage(ByteString byteStr) {
        return PubsubMessage.newBuilder()
                            .setData(byteStr)
                            .build();
    }

and works fine when deployed to GCP but not in this case of integration test using PubSub emulator.


Solution

  • It came up that the PubSub emulator requires its own test publisher which can be created as a bean in configuration. Example:

    @Configuration
    public class PubSubConfig {
       @Value("${gcp.pubsub.topic.name}")
       private String topicName;
    
       @Value("${gcp.project.id}")
       private String projected;
       
       @Value("${spring.cloud.gcp.pubsub.emulator-host}")
       private String host;
       
       private static final CredentialsProvider CREDENTIALS_PROVIDER = NoCredentialsProvider.create();
    
       @Bean
       public SubscriberStub testSubscriber(
             FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
          return GrpcSubscriberStub.create(SubscriberStubSettings.newBuilder()
                                                                 .setTransportChannelProvider(fixedTransportChannelProvider)
                                                                 .setCredentialsProvider(CREDENTIALS_PROVIDER)
                                                                 .build());
       }
    
       @Primary
       @Bean
       public Publisher testPublisher(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
          return Publisher.newBuilder(ProjectTopicName.of(projectId, topicName))
                          .setChannelProvider(fixedTransportChannelProvider)
                          .setCredentialsProvider(NoCredentialsProvider.create())
                          .build();
       }
    
       @Bean
       public TopicAdminClient getTopicAdminClient(
             FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
          return TopicAdminClient.create(TopicAdminSettings.newBuilder()
                                                           .setTransportChannelProvider(fixedTransportChannelProvider)
                                                           .setCredentialsProvider(CREDENTIALS_PROVIDER)
                                                           .build());
       }
    
       @Primary
       @Bean
       public FixedTransportChannelProvider getChannelProvider() {
          var channel = ManagedChannelBuilder.forTarget(host)
                                             .usePlaintext()
                                             .build();
          return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
       }
    
       @Bean
       public SubscriptionAdminClient createSubscriptionAdmin(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
          return SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder()
                                                                         .setCredentialsProvider(
                                                                               NoCredentialsProvider.create())
                                                                         .setTransportChannelProvider(
                                                                               fixedTransportChannelProvider)
                                                                         .build());
       }
    
    }