The source table is ~4TB large. So we tried to use Flink to stream migrate them.
Ideally, Flink will pick data from MySQL and transfer them to Iceberg line by line. After each line is inserted to Iceberg, the data should no longer hold any memory in Flink JVM.
However, task manager crashes, showing org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id localhost:26211-081070 is no longer reachable.
and
23/10/17 14:43:34 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.sql.SQLException: Java heap space
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0.0 in stage 0.0 (TID 0),5,main]
java.sql.SQLException: Java heap space
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (ars.momenta.ai executor driver): java.sql.SQLException: Java heap space
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:130)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1648)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:87)
at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1661)
at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1715)
at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1065)
at com.mysql.cj.NativeSession.execSQL(NativeSession.java:657)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:893)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:972)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:314)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2256/1694162626.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
23/10/17 14:43:34 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 192, in deco
converted = convert_exception(e.java_exception)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/utils.py", line 170, in convert_exception
is_instance_of(gw, c, "org.apache.spark.api.python.PythonException")
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 464, in is_instance_of
return gateway.jvm.py4j.reflection.TypeUtil.isInstanceOf(
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1722, in __getattr__
raise Py4JError(message)
py4j.protocol.Py4JError: py4j does not exist in the JVM
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt/data/martin.hou/transfer_job/spark_write.py", line 60, in <module>
sample = df.first().asDict()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1938, in first
return self.head()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1924, in head
rs = self.head(1)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 1926, in head
return self.take(n)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 868, in take
return self.limit(num).collect()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/sql/dataframe.py", line 816, in collect
with SCCallSiteSync(self._sc):
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/pyspark/traceback_utils.py", line 81, in __exit__
self._context._jsc.setCallSite(None)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1320, in __call__
answer = self.gateway_client.send_command(command)
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1036, in send_command
connection = self._get_connection()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 284, in _get_connection
connection = self._create_new_connection()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 291, in _create_new_connection
connection.connect_to_java_server()
File "/mnt/data/martin.hou/miniconda3/lib/python3.10/site-packages/py4j/clientserver.py", line 438, in connect_to_java_server
self.socket.connect((self.java_address, self.java_port))
ConnectionRefusedError: [Errno 111] Connection refused
Here is my code:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1).enable_checkpointing(3000)
table_env = StreamTableEnvironment.create(env)
# source table
table_env.execute_sql("""
CREATE TABLE src_result (
id STRING,
input_md5 STRING,
output_md5 STRING,
log STRING,
metric STRING,
create_time TIMESTAMP(6),
update_time TIMESTAMP(6),
workflow_id_id STRING,
error_details STRING,
error_stage STRING,
error_type STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx',
'table-name' = 'result',
'username' = 'ars_dev',
'password' = '01234567'
);
""")
# target iceberg
table_env.execute_sql("CREATE CATALOG iceberg WITH ("
"'type'='iceberg', "
"'catalog-type'='hive', "
"'uri'='thrift://xxx',"
"'warehouse'='xxx')")
# start migration
t_res = table_env.execute_sql("""
INSERT INTO iceberg.ars.results
SELECT
id,
input_md5,
output_md5,
log,
metric,
CAST(create_time AS STRING),
CAST(update_time AS STRING),
workflow_id_id,
error_details,
error_stage,
error_type
FROM src_result
""")
I am new to Flink. I think that maybe using INSERT INTO xx SELECT * FROM xx
will take all table to the memory at once.
However, when I just used SELECT * FROM xx
and stream process them, the same error occurred after ~10k entries being visited.
How is that possible? Do I need to clear the memory manually?
The Flink JDBC Source connector is only available as a Bounded source, meaning for batch processing. It doesn't stream the results, like you can do with Kafka or FileSystem. So when you're running a query, you're selecting all the data from the source which needs to be stored somewhere, until it can be sinked into your sink. That's most likely what causes Flink to go out of memory.