javahazelcasthazelcast-jet

Hazelcast: service in mapUsingService not serializable


My two questions:

  1. How to make HTTP request within a hazelcast pipeline? (My tries below)
  2. Why is my Foo class not serializable?

I am using embedded hazelcast (5.4.0) within Java.


The problematic pipline:

Pipeline pipeline = Pipeline.create();
StreamStage<Bar> prepared = pipeline.readFrom(KafkaSources. ...)
                                    .withTimestamps(...)
                                    .map(Map.Entry::getValue);
ServiceFactory<?, Foo> service = ServiceFactories.sharedService(ctx -> new Foo(url))
                                                 .toNonCooperative();
prepared.mapUsingService(service, (s, bar) -> new EnhancedBar(bar, s.getDetails(bar.getId())))
        .writeTo(Sinks.logger());

The error:

java.lang.IllegalArgumentException: "createContextFn" must be serializable
        at com.hazelcast.internal.serialization.impl.SerializationUtil.checkSerializable(SerializationUtil.java:83)

The problematic class:

@Getter
@Setter
public class Foo implements Serializable {

    private String url;
    private transient OkHttpClient client;

    public Foo() {
        this.url = "unknown";
        client = new OkHttpClient();
    }

    public Foo(String url) {
        this.url = url;
        client = new OkHttpClient();
    }

    public String getDetails() {
        Request request = new Request.Builder().url(url).build();

        Call call = client.newCall(request);
        try {
            Response response = call.execute();
            return response.body().string();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Serial
    private void writeObject(ObjectOutputStream oos) throws IOException {
        oos.writeObject(url);
    }

    @Serial
    private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
        String url = (String) ois.readObject();
        this.setUrl(url);
        this.setClient(new OkHttpClient());
    }
}

The problem stems from OkHttpClient not implementing Serializable (I guess).

My tries:

  1. When I removed the client and simply returned "constant detail", the job was successfully submitted.
  2. When I deleted both @Serial methods and used CompactSerializer instead, I got the same error.

In the last case, the read/write methods of my CompactSerializer were

@NotNull @Override
public Foo read(@NotNull CompactReader reader) {
    String url = reader.readString("url");
    return new Foo(url);
}

@Override
public void write(@NotNull CompactWriter writer, @NotNull Foo object) {
    writer.writeString("url", object.getUrl());
}

Solution

  • If you are using OkHttpClient, then you also need to upload

    1. The jar file to HZ server
    2. The service's parent class if service is an inner class. If service is not an inner class then this step is not necessary

    Note : In my opinion OkHttpClient is creating unnecessary burden. Just use the HttpClient coming with JDK. The job configuration will be simpler

    Here is an example

    // This example creates a Jet job that downsload the given url.
    @Slf4j
    class OkHttpClientJob {
    
      public static void main(String[] args) throws InterruptedException {
        log.info("Starting Test");
        // Start client
        HazelcastInstance hazelcastClient = getHazelcastClientInstance();
        submitJob(hazelcastClient);
    
        TimeUnit.SECONDS.sleep(10);
    
        hazelcastClient.shutdown();
    
        log.info("Test completed");
      }
    
      private static HazelcastInstance getHazelcastClient() {
        ClientConfig clientConfig = new ClientConfig();
        return HazelcastClient.newHazelcastClient(clientConfig);
      }
    
      private static void submitJob(HazelcastInstance hazelcastInstance) {
        // Create a service that will be used by Jet Job
        ServiceFactory<?, HttpClientService> service = 
          ServiceFactories.sharedService(ctx -> new HttpClientService())
            .toNonCooperative();
    
        Pipeline pipeline = Pipeline.create();
        List<String> input = List.of("https://www.google.com");
        BatchStage<String> prepared = pipeline.readFrom(TestSources.items(input));
    
        prepared.mapUsingService(service, HttpClientService::get)
            .writeTo(Sinks.logger());
    
        // Path to okhttp jar file
        File file = new File("src/main/resources/okhttp-5.0.0-alpha.14.jar");
        JobConfig jobConfig = new JobConfig()
          // Add jar to jet job
          .addJarsInZip(file)
          // Add parent class that contains the service class
          .addClass(OkHttpClientJob.class);
             
        JetService jetService = hazelcastInstance.getJet();
        Job job = jetService.newJob(pipeline, jobConfig);
        job.join();
      }
    
    
      private static class HttpClientService implements Serializable {
        private transient OkHttpClient client;
    
        public HttpClientService() {
          client = new OkHttpClient();
        }
    
        public String get(String url) {
          Request request = new Request.Builder()
            .url(url)
            .build();
    
          String result;
          try {
            Call call = client.newCall(request);
            try (Response response = call.execute()) {
              try (ResponseBody responseBody = response.body()) {
                result = responseBody.string();
              }
            }
          } catch (IOException exception) {
             result = exception.getMessage();
          }
          return result;
        }
      }
    }