javamessage-queuenats-streaming-server

NATS persistent message Java Client


Does anyone have experience using the NATS Streaming Server combined with a Java client? Specifically I can't figure out how to retrieve messages using the Java Client that are sent when a subscriber is offline.

I can see using the Go client that I can publish a message and later add a subscription to retrieve all published messages. This is in the NATS Streaming Getting Started documentation and it works as advertised.

Publish several messages. For each publication you should get a result.

$ cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples
go run stan-pub.go foo "msg one"
Published [foo] : 'msg one'
$ go run stan-pub.go foo "msg two"
Published [foo] : 'msg two'
$ go run stan-pub.go foo "msg three"
Published [foo] : 'msg three'

Run the subscriber client. Use the --all flag to receive all published messages.

$ go run stan-sub.go --all -c test-cluster -id myID foo
Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [myID]
subscribing with DeliverAllAvailable
Listening on [foo], clientID=[myID], qgroup=[] durable=[]
[#1] Received on [foo]: 'sequence:1 subject:"foo" data:"msg one" timestamp:1465962202884478817 '
[#2] Received on [foo]: 'sequence:2 subject:"foo" data:"msg two" timestamp:1465962208545003897 '
[#3] Received on [foo]: 'sequence:3 subject:"foo" data:"msg three" timestamp:1465962215567601196

I'm trying to do this using the NATS Java client. I can't figure out if it's just that I'm not finding the analogous method calls or if the feature doesn't exist in the Java client.

Here's what I've tried

    import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Constants;
import io.nats.client.Message;
import io.nats.client.SyncSubscription;

import java.io.IOException;
import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class NatsTest2 {

  private static final SecureRandom random = new SecureRandom();

  public static void main(String... args) throws Exception {
    final ConnectionFactory factory = new ConnectionFactory(Constants.DEFAULT_URL);
    try (final Connection conn = factory.createConnection()) {
      // Simple Async Subscriber
      final String expectMessage = "Yum, cookies " + System.currentTimeMillis();
      works(conn, expectMessage);
      broken(conn, expectMessage);
    }
  }

  private static void works(Connection conn, String expectMessage) throws IOException, TimeoutException {
    final String queue = Long.toString(random.nextLong());
    System.out.print(queue + "=>");
    try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
      conn.publish(queue, expectMessage.getBytes());
      subscribe(subscription);
    }
  }

  private static void broken(Connection conn, String expectMessage) throws IOException, TimeoutException {
    final String queue = Long.toString(random.nextLong());
    System.out.print(queue + "=>");
    conn.publish(queue, expectMessage.getBytes());
    try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
      subscribe(subscription);
    }
  }

  private static void subscribe(SyncSubscription subscription) throws IOException, TimeoutException {
    final Message message = subscription.nextMessage(1, TimeUnit.SECONDS);
    System.out.println(new String(message.getData()));
  }
}

This gives output

-8522002637987832314=>Yum, cookies 1473462495040
-3024385525006291780=>Exception in thread "main" java.util.concurrent.TimeoutException: Channel timed out waiting for items

Solution

  • If you're using nats-streaming-server, you need to use the java-nats-streaming client. The feature you're looking for (subscription to historical messages) only exists in that client.

    Regardless, here's why you saw what you did with the jnats client:

    nats-streaming-server currently embeds a NATS server (gnatsd), and therefore allows standard NATS functionality for regular NATS clients, which is what you're seeing.

    In your example code, works() happens to work because your subscription is already created before you publish your message (in other words, your try-with-resources block guarantees the subscription is already active before anything else happens). Therefore you're not really receiving a message that was published in the past; you're receiving a message that was published after the subscription started.

    The broken() example doesn't work because it actually is publishing the message before the subscription is created, and the message is discarded by the server because there is no interest (yet).