My two questions:
Foo
class not serializable?I am using embedded hazelcast (5.4.0) within Java.
The problematic pipline:
Pipeline pipeline = Pipeline.create();
StreamStage<Bar> prepared = pipeline.readFrom(KafkaSources. ...)
.withTimestamps(...)
.map(Map.Entry::getValue);
ServiceFactory<?, Foo> service = ServiceFactories.sharedService(ctx -> new Foo(url))
.toNonCooperative();
prepared.mapUsingService(service, (s, bar) -> new EnhancedBar(bar, s.getDetails(bar.getId())))
.writeTo(Sinks.logger());
The error:
java.lang.IllegalArgumentException: "createContextFn" must be serializable
at com.hazelcast.internal.serialization.impl.SerializationUtil.checkSerializable(SerializationUtil.java:83)
The problematic class:
@Getter
@Setter
public class Foo implements Serializable {
private String url;
private transient OkHttpClient client;
public Foo() {
this.url = "unknown";
client = new OkHttpClient();
}
public Foo(String url) {
this.url = url;
client = new OkHttpClient();
}
public String getDetails() {
Request request = new Request.Builder().url(url).build();
Call call = client.newCall(request);
try {
Response response = call.execute();
return response.body().string();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Serial
private void writeObject(ObjectOutputStream oos) throws IOException {
oos.writeObject(url);
}
@Serial
private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
String url = (String) ois.readObject();
this.setUrl(url);
this.setClient(new OkHttpClient());
}
}
The problem stems from OkHttpClient
not implementing Serializable (I guess).
My tries:
"constant detail"
, the job was successfully submitted.@Serial
methods and used CompactSerializer instead, I got the same error.In the last case, the read/write methods of my CompactSerializer were
@NotNull @Override
public Foo read(@NotNull CompactReader reader) {
String url = reader.readString("url");
return new Foo(url);
}
@Override
public void write(@NotNull CompactWriter writer, @NotNull Foo object) {
writer.writeString("url", object.getUrl());
}
If you are using OkHttpClient, then you also need to upload
Note : In my opinion OkHttpClient is creating unnecessary burden. Just use the HttpClient coming with JDK. The job configuration will be simpler
Here is an example
// This example creates a Jet job that downsload the given url.
@Slf4j
class OkHttpClientJob {
public static void main(String[] args) throws InterruptedException {
log.info("Starting Test");
// Start client
HazelcastInstance hazelcastClient = getHazelcastClientInstance();
submitJob(hazelcastClient);
TimeUnit.SECONDS.sleep(10);
hazelcastClient.shutdown();
log.info("Test completed");
}
private static HazelcastInstance getHazelcastClient() {
ClientConfig clientConfig = new ClientConfig();
return HazelcastClient.newHazelcastClient(clientConfig);
}
private static void submitJob(HazelcastInstance hazelcastInstance) {
// Create a service that will be used by Jet Job
ServiceFactory<?, HttpClientService> service =
ServiceFactories.sharedService(ctx -> new HttpClientService())
.toNonCooperative();
Pipeline pipeline = Pipeline.create();
List<String> input = List.of("https://www.google.com");
BatchStage<String> prepared = pipeline.readFrom(TestSources.items(input));
prepared.mapUsingService(service, HttpClientService::get)
.writeTo(Sinks.logger());
// Path to okhttp jar file
File file = new File("src/main/resources/okhttp-5.0.0-alpha.14.jar");
JobConfig jobConfig = new JobConfig()
// Add jar to jet job
.addJarsInZip(file)
// Add parent class that contains the service class
.addClass(OkHttpClientJob.class);
JetService jetService = hazelcastInstance.getJet();
Job job = jetService.newJob(pipeline, jobConfig);
job.join();
}
private static class HttpClientService implements Serializable {
private transient OkHttpClient client;
public HttpClientService() {
client = new OkHttpClient();
}
public String get(String url) {
Request request = new Request.Builder()
.url(url)
.build();
String result;
try {
Call call = client.newCall(request);
try (Response response = call.execute()) {
try (ResponseBody responseBody = response.body()) {
result = responseBody.string();
}
}
} catch (IOException exception) {
result = exception.getMessage();
}
return result;
}
}
}