I have a basic pipeline below. In one of the steps I want to transform an object by calling a method from a service as below. But Jet throws an error saying that this mapFn
is not serializable. What to do here? It works perfectly fine for static methods.
p.readFrom(source)
.map(r -> dataTransformer.transformRecord(r))// dataTransformer is a service
.writeTo(Sinks.filesBuilder(userHome).build());
Use mapUsingService
and create the service using the ServiceFactory
:
p.readFrom(source)
.mapUsingService(
ServiceFactories.sharedService(pctx -> new DataTransformer()),
(dataTransformer, r) -> dataTransformer.transformRecord(r))
...
Alternatively, if your service is serializable and stateless, you can copy it to a local variable:
DataTransformer dataTransformerLocal = dataTransformer;
p.readFrom(source)
.map(r -> dataTransformerLocal.transformRecord(r))
.writeTo(Sinks.filesBuilder(userHome).build());