apache-flinkflink-streamingpyflink

java.lang.ClassCastException: class [B cannot be cast to class org.apache.flink.types.Row


I use apache pyflink 1.18.1. The input data type from Apache Flink kafka source is like below,

2023-11-01, 2.7, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA
2023-12-01, 2.6, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA
2024-01-01, 3.7, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA
2024-02-01, 3.6, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA
2024-03-01, 3.3, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA

The python sources are

    class CustomCsvMapFunction(MapFunction):
        def map(self, value):
            str_list = value.split(',')
            return Types.ROW(str_list)  # this line brings the error.(return value type is RowTypeInfo, not flink Row type)
    
    
    source = KafkaSource.builder() \
                .set_bootstrap_servers(kafka_brokerlist) \
                .set_topics(kafka_topic) \
                .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
                .set_value_only_deserializer(SimpleStringSchema()) \
                .build()
    
    ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
    csv_ds = ds.filter(lambda str: not(str.startswith('date')))\
                .map(CustomCsvMapFunction())
            
    csv_ds.print()
    
    type_info = Types.TUPLE([Types.SQL_TIME(), Types.FLOAT(), Types.STRING()\
                , Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()])
            
    jdbcConnOptions = JdbcConnectionOptions.JdbcConnectionOptionsBuilder()\
                        .with_url(mysql_host_url)\
                        .with_driver_name('com.mysql.cj.jdbc.Driver')\
                        .with_user_name(mysql_user)\
                        .with_password(mysql_password)\
                        .build()
    
    jdbcExeOptions = JdbcExecutionOptions.builder()\
                        .with_batch_interval_ms(1000)\
                        .with_batch_size(200)\
                        .with_max_retries(5)\
                        .build()
    
    csv_ds.add_sink(
                JdbcSink.sink(
                    'insert into ' + table_name + ' values (?, ?, ?, ?, ?, ?, ?, ?)',
                    type_info, jdbcConnOptions, jdbcExeOptions))

The console outputs are

RowTypeInfo(2023-11-01, 2.7, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)
RowTypeInfo(2023-12-01, 2.6, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)
RowTypeInfo(2024-01-01, 3.7, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)
RowTypeInfo(2024-02-01, 3.6, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)
RowTypeInfo(2024-03-01, 3.3, Wyoming, WYURN, Unemployment Rate in Wyoming, M, %, NSA)

However the object type of RowTypeInfo is not correct in jdbcsink and it brings the error

java.lang.ClassCastException: class [B cannot be cast to class org.apache.flink.types.Row ([B is in module java.base of loader 'bootstrap'; org.apache.flink.types.Row is in unnamed module of loader 'app')

I think the flink stream data type has to be pyflink Row type, not RowTypeInfo. So how to convert String array type input from kafka to Apache Flink Row type output, so I can save outputs into jdbc mysql?

== Updated Parts

I found some lines of input data contain None value, ''. So I modified the some codes like below,

class CustomCsvMapFunction(MapFunction):
    def map(self, value):
        str_list = value.split(',')
        if str_list[0] != 'date' and str_list[1] != '' and str_list[1] != 'null':
            return Row(str_list[0], float(str_list[1]), str_list[2], str_list[3],
            str_list[4], str_list[5], str_list[6], str_list[7])

But, this time different errors occurs on JdbcSink.

py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
        at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
        at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
        at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
        at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)        
        at org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)        
        at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
        at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
        at org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)
        at org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)
        at org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:57)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
        at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
        at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
        at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
        at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
        at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
        at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
        at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
        at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
        at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
        ... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:89)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:39)
        at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:82)
        at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:33)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at org.apache.flink.streaming.api.operators.python.process.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:52)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.emitResult(AbstractExternalOneInputPythonFunctionOperator.java:133)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:100)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:292)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.processElement(AbstractExternalOneInputPythonFunctionOperator.java:146)
        at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.processElement(ExternalPythonProcessOperator.java:111)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 22 more
Caused by: java.lang.NullPointerException: Cannot read the array length because "from" is null
        at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.copy(BytePrimitiveArraySerializer.java:52)
        at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.copy(BytePrimitiveArraySerializer.java:31)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)

I hope these updated parts will be helpful in solving these issues.

== Updated Parts #2

How about these codes,

class CustomCsvMapFunction(MapFunction):
    def __init__(self):
        # Define the output type for the map function
        type_name = ['date','value','state','id','title','frequency_short','units_short','seasonal_adjustment_short']
        type_schema = [Types.STRING(), Types.FLOAT(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]
        self.output_type = Types.ROW_NAMED(type_name, type_schema)
            
    def map(self, value):
        str_list = value.split(',')
        if str_list[0] != 'date' and str_list[1] != '' and str_list[1] != 'null':
            return Row(str_list[0], float(str_list[1]), str_list[2], str_list[3], str_list[4], str_list[5], str_list[6], str_list[7])  
        
kafka_brokerlist = self._config['KAFKA_CONFIG']['kafka.brokerlist']
        
source = KafkaSource.builder() \
    .set_bootstrap_servers(kafka_brokerlist) \
    .set_topics(kafka_topic) \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()
 
ds = self._env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
        
csv_ds = ds.map(CustomCsvMapFunction())
        
csv_ds.print()
                    
type_name = ['date','value','state','id','title','frequency_short','units_short','seasonal_adjustment_short']
type_schema = [Types.STRING(), Types.FLOAT(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]
type_info = RowTypeInfo(type_schema, type_name)
        
mysql_user = self._config['MYSQL_CONFIG']['mysql.user']
mysql_password = self._config['MYSQL_CONFIG']['mysql.password']
mysql_host_url = self._config['MYSQL_CONFIG']['mysql.host.url']
        
jdbcConnOptions = JdbcConnectionOptions.JdbcConnectionOptionsBuilder()\
    .with_url(mysql_host_url)\
    .with_driver_name('com.mysql.cj.jdbc.Driver')\
    .with_user_name(mysql_user)\
    .with_password(mysql_password)\
    .build()
 
jdbcExeOptions = JdbcExecutionOptions.builder()\
    .with_batch_interval_ms(1000)\
    .with_batch_size(200)\
    .with_max_retries(5)\
    .build()
 
ds.add_sink(
    JdbcSink.sink(
        'INSERT INTO ' + table_name + ' VALUES(?, ?, ?, ?, ?, ?, ?, ?)',
               type_info, jdbcConnOptions, jdbcExeOptions))

But the execution is failed and the codes throws the following exceptions

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:90)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:40)
        at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:83)
        at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:34)
        at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
        at org.apache.flink.streaming.api.operators.python.process.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:52)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.emitResult(AbstractExternalOneInputPythonFunctionOperator.java:133)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:100)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:292)
        at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.processElement(AbstractExternalOneInputPythonFunctionOperator.java:146)
        at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.processElement(ExternalPythonProcessOperator.java:112)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        ... 25 more
Caused by: java.lang.NullPointerException: Cannot read the array length because "from" is null
        at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.copy(BytePrimitiveArraySerializer.java:52)
        at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.copy(BytePrimitiveArraySerializer.java:31)
        at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
        ... 38 more

Any ideas?


Solution

  • I also encountered the same error described in title, and I've solved it. I found the typehint in the final operator is significant. There is a small example below.

    from pyflink.common.typeinfo import RowTypeInfo
    from pyflink.common import Types, Row
    
    ds \
    
    .map(lambda x: Row(name=x[1].f1, id=int(x[1].f0)),
         output_type=Types.ROW_NAMED(['name', 'id'], [Types.STRING(), Types.LONG()])) \
    
    .add_sink(
        JdbcSink.sink(
            'update user set name = ? where id = ?',
            type_info=RowTypeInfo([Types.STRING(), Types.LONG()], ['name', 'id']),
            jdbc_connection_options=JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .with_url('jdbc:mysql://192.168.1.1:3306/pyflink?autocommit=true')
                .with_driver_name('com.mysql.cj.jdbc.Driver')
                .with_user_name('<user_name>')
                .with_password('<password>')
                .build(),
            jdbc_execution_options=JdbcExecutionOptions.builder()
                .with_batch_size(100)
                .with_batch_interval_ms(1000)
                .with_max_retries(3)
                .build()
        )
    )