I cannot make access to Java POJO in Visual Studio Code with Maven.
OS : Windows 11
JDK : open-jdk 17.0.2
maven : apache-maven 3.9.6
apache flink : 1.18.1 (installed on WSL 2 ubuntu)
visual studio code : x64 1.87.2
Below are the simple Flink MongoDB sink codes.
== Car.java (POJO class)
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Car{
private String brand;
private int price;
}
== Flink MongoDB Sink codes
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Car> cars = new ArrayList<>();
cars.add(new Car("BMW", 500));
cars.add(new Car("Kia", 300));
cars.add(new Car("Ford", 600));
DataStream<Car> stream = env.fromCollection(cars);
MongoSink<Car> sink = MongoSink.<Car>builder()
.setUri("mongodb://127.0.0.1:27017")
.setDatabase("class_db")
.setCollection("class_coll")
.setBatchSize(1000)
.setBatchIntervalMs(1000)
.setMaxRetries(3)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setSerializationSchema(
(input, context)
-> {
Document doc = new Document(input.getBrand(), input.getPrice());
System.out.println(doc);
return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
})
.build();
stream.sinkTo(sink);
env.execute("MongoDB POJO Test");
env.close();
The above codes contain no bugs except this exception,
Exception in thread "main" java.io.UncheckedIOException: java.io.IOException: Serializing the source elements failed: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @2a32de6c
at org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:162)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySetOutputType(StreamingFunctionUtils.java:84)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.setOutputType(StreamingFunctionUtils.java:60)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setOutputType(AbstractUdfStreamOperator.java:146)
at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.setOutputType(SimpleOperatorFactory.java:118)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:434)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:402)
at org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:356)
at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
at org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:860)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:881)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:839)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:590)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:328)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2289)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2280)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2266)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2093)
at com.aaa.test.FlinkMongoClassTest.main(FlinkMongoClassTest.java:49)
Caused by: java.io.IOException: Serializing the source elements failed: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @2a32de6c
at org.apache.flink.streaming.api.functions.source.FromElementsFunction.serializeElements(FromElementsFunction.java:139)
at org.apache.flink.streaming.api.functions.source.FromElementsFunction.setOutputType(FromElementsFunction.java:160)
... 22 more
Caused by: java.lang.RuntimeException: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @2a32de6c
at com.twitter.chill.java.ArraysAsListSerializer.<init>(ArraysAsListSerializer.java:69)
at org.apache.flink.api.java.typeutils.runtime.kryo.FlinkChillPackageRegistrar.registerSerializers(FlinkChillPackageRegistrar.java:67)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:513)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:522)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:348)
at org.apache.flink.streaming.api.functions.source.FromElementsFunction.serializeElements(FromElementsFunction.java:136)
... 23 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @2a32de6c
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
So I change the class of input stream to Tuple2 from Car class like below
List<Tuple2<String, Integer>> cars = new ArrayList<>();
cars.add(new Tuple2<>("BMW", 500));
cars.add(new Tuple2<>("Kia", 300));
cars.add(new Tuple2<>("Ford", 600));
DataStream<Tuple2<String, Integer>> stream = env.fromCollection(cars);
Then everything goes right without any errors. It seems that the main java codes can not make access to Car POJO class.
You're relying on Kryo to serialize your Car
object. From the error message, I think maybe your Car
class has a final field (array of objects), which Kryo can't set when deserializing an object.