mysqlapache-flinkflink-sqlpyflinkapache-iceberg

Why my Flink task manager crashes when transferring MySQL data to Iceberg?


The source table is ~4TB large. So we tried to use Flink to stream migrate them. Ideally, Flink will pick data from MySQL and transfer them to Iceberg line by line. After each line is inserted to Iceberg, the data should no longer hold any memory in Flink JVM. However, task manager crashes, showing org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id localhost:26211-081070 is no longer reachable. and

23/10/17 14:43:34 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.sql.SQLException: Java heap space
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
        at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
        at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0.0 in stage 0.0 (TID 0),5,main]
java.sql.SQLException: Java heap space
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
        at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
        at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (ars.momenta.ai executor driver): java.sql.SQLException: Java heap space
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
        at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
        at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
        at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 192, in deco
    converted = convert_exception(e.java_exception)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 170, in convert_exception
    is_instance_of(gw, c, "org.apache.spark.api.python.PythonException")
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 464, in is_instance_of
    return gateway.jvm.py4j.reflection.TypeUtil.isInstanceOf(
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1722, in __getattr__
    raise Py4JError(message)
py4j.protocol.Py4JError: py4j does not exist in the JVM
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/mnt/data/martin.hou/transfer_job/spark_write.py", line 60, in <module>
    sample = df.first().asDict()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1938, in first
    return self.head()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1924, in head
    rs = self.head(1)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1926, in head
    return self.take(n)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 868, in take
    return self.limit(num).collect()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 816, in collect
    with SCCallSiteSync(self._sc):
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/traceback_utils.py", line 81, in __exit__
    self._context._jsc.setCallSite(None)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1320, in __call__
    answer = self.gateway_client.send_command(command)
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1036, in send_command
    connection = self._get_connection()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 284, in _get_connection
    connection = self._create_new_connection()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 291, in _create_new_connection
    connection.connect_to_java_server()
  File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 438, in connect_to_java_server
    self.socket.connect((self.java_address, self.java_port))
ConnectionRefusedError: [Errno 111] Connection refused

Here is my code:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1).enable_checkpointing(3000)
table_env = StreamTableEnvironment.create(env)

# source table
table_env.execute_sql("""
    CREATE TABLE src_result (
        id STRING,
        input_md5 STRING,
        output_md5 STRING,
        log STRING,
        metric STRING,
        create_time TIMESTAMP(6),      
        update_time TIMESTAMP(6),
        workflow_id_id STRING,
        error_details STRING,
        error_stage STRING,
        error_type STRING
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://xxx',
        'table-name' = 'result',
        'username' = 'ars_dev',
        'password' = '01234567'
    );
""")

# target iceberg
table_env.execute_sql("CREATE CATALOG iceberg WITH ("
                    "'type'='iceberg', "
                    "'catalog-type'='hive', "
                    "'uri'='thrift://xxx',"
                    "'warehouse'='xxx')")

# start migration
t_res = table_env.execute_sql("""
    INSERT INTO iceberg.ars.results
    SELECT 
        id,
        input_md5,
        output_md5,
        log,
        metric,
        CAST(create_time AS STRING),
        CAST(update_time AS STRING),
        workflow_id_id,
        error_details,
        error_stage,
        error_type  
    FROM src_result
""")

I am new to Flink. I think that maybe using INSERT INTO xx SELECT * FROM xx will take all table to the memory at once. However, when I just used SELECT * FROM xx and stream process them, the same error occurred after ~10k entries being visited. How is that possible? Do I need to clear the memory manually?


Solution

  • The Flink JDBC Source connector is only available as a Bounded source, meaning for batch processing. It doesn't stream the results, like you can do with Kafka or FileSystem. So when you're running a query, you're selecting all the data from the source which needs to be stored somewhere, until it can be sinked into your sink. That's most likely what causes Flink to go out of memory.