javaapache-flinkflink-streaming

Flink classLoader cannot be null Kryo


I work with Flink 1.18.1 Java 11. One of my pipelines after around 30 minutes start getting into crash loop. In the TaskManager i can see log:

Exception in thread "Thread-21" java.lang.IllegalArgumentException: classLoader cannot be null.
        at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:553)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:394)
        at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
        at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205)
        at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191)
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007fc338f944bf, pid=1, tid=388
#
# JRE version: OpenJDK Runtime Environment Temurin-11.0.23+9 (11.0.23+9) (build 11.0.23+9)
# Java VM: OpenJDK 64-Bit Server VM Temurin-11.0.23+9 (11.0.23+9, mixed mode, sharing, tiered, compressed oops, g1 gc, linux-amd64)
# Problematic frame:
# V  [libjvm.so+0x7944bf]  Exceptions::_throw_oop(Thread*, char const*, int, oopDesc*)+0x15f
#
# Core dump will be written. Default location: /opt/flink/core.1
#
# An error report file with more information is saved as:
# /opt/flink/hs_err_pid1.log
#
# If you would like to submit a bug report, please visit:
#   https://github.com/adoptium/adoptium-support/issues
#

Can't find a solution for this, any ideas?

@Update The relevant part of hs_err_pid1.log:


Current thread (0x00007f476002d800):  JavaThread "Thread-21" [_thread_in_vm, id=407, stack(0x00007f46a48d4000,0x00007f46a50d4000)]

Stack: [0x00007f46a48d4000,0x00007f46a50d4000],  sp=0x00007f46a50d0f10,  free space=8179k
Native frames: (J=compiled Java code, A=aot compiled Java code, j=interpreted, Vv=VM code, C=native code)
V  [libjvm.so+0x7944bf]  Exceptions::_throw_oop(Thread*, char const*, int, oopDesc*)+0x15f
V  [libjvm.so+0x95a8ac]  jni_Throw+0x8c
C  [librocksdbjni-linux64.so+0x222ce1]  JavaListElementFilter::NextUnexpiredOffset(rocksdb::Slice const&, long, long) const+0x121
C  [librocksdbjni-linux64.so+0x648941]  rocksdb::flink::FlinkCompactionFilter::ListDecide(rocksdb::Slice const&, std::string*) const+0x81
C  [librocksdbjni-linux64.so+0x648de7]  rocksdb::flink::FlinkCompactionFilter::FilterV2(int, rocksdb::Slice const&, rocksdb::CompactionFilter::ValueType, rocksdb::Slice const&, std::string*, std::string*) const+0xc7
C  [librocksdbjni-linux64.so+0x2d0b74]  rocksdb::CompactionIterator::InvokeFilterIfNeeded(bool*, rocksdb::Slice*)+0x914
C  [librocksdbjni-linux64.so+0x2d24a5]  rocksdb::CompactionIterator::NextFromInput()+0x3c5
C  [librocksdbjni-linux64.so+0x2d4911]  rocksdb::CompactionIterator::SeekToFirst()+0x11
C  [librocksdbjni-linux64.so+0x2dd2be]  rocksdb::CompactionJob::ProcessKeyValueCompaction(rocksdb::CompactionJob::SubcompactionState*)+0x55e
C  [librocksdbjni-linux64.so+0x2de668]  rocksdb::CompactionJob::Run()+0x278
C  [librocksdbjni-linux64.so+0x33cec4]  rocksdb::DBImpl::BackgroundCompaction(bool*, rocksdb::JobContext*, rocksdb::LogBuffer*, rocksdb::DBImpl::PrepickedCompaction*, rocksdb::Env::Priority)+0x1084
C  [librocksdbjni-linux64.so+0x3407e7]  rocksdb::DBImpl::BackgroundCallCompaction(rocksdb::DBImpl::PrepickedCompaction*, rocksdb::Env::Priority)+0xd7
C  [librocksdbjni-linux64.so+0x340dda]  rocksdb::DBImpl::BGWorkCompaction(void*)+0x3a
C  [librocksdbjni-linux64.so+0x5e5744]  rocksdb::ThreadPoolImpl::Impl::BGThread(unsigned long)+0x254
C  [librocksdbjni-linux64.so+0x5e58ed]  rocksdb::ThreadPoolImpl::Impl::BGThreadWrapper(void*)+0x5d


siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: 0x0000000000000000

Solution

  • It seems that the problem was correlated to this Flink bug. When I switch TTL mechanism to simple timer-clearing for List State variables (I removed all the TTLs tbh), the pipeline starts working properly and no longer crashes. It's strange that the bug is marked as "Not a Priority " and present since Flink 1.8.0.