apache-flinkflink-streamingamazon-kinesisamazon-kinesis-analytics

How to add multiple consumer of same data stream in Flink


I am struggling with implementing solution where the problem is as below.

There is one Kinesis stream named sushistream, event comes from the stream would contains two things, i.e. broadcastId and eventID. Now I want sliding window count for every broadcast per eventID for different timeWindow.

I have tried to implemented solution like,

    DataStream<String> stream = createKinesisSource(env, parameter);
        log.info("Kinesis stream created.");

        ObjectMapper objectMapper = new ObjectMapper();
        DataStream<AnnouncementEvent> promoCodeStream = stream.map(record -> {
                AnnouncementEvent event = objectMapper.readValue(record, AnnouncementEvent.class);
                        return event;
        }).filter(Objects::nonNull).filter(new PromoCodeEventFilter()).uid("promo-code-stream"); // PromoCodeEventFilter will filter the events where eventID = "PROMO_CODE_CLICK"

        DataStream<AnnouncementEvent> shareIconClickStream = stream.map(record -> {
            AnnouncementEvent event = objectMapper.readValue(record, AnnouncementEvent.class);
        }).filter(Objects::nonNull).filter(new ShareIconEventFilter()).uid("share-icon-stream");  // ShareIconEventFilter will filter the events where eventID = "PROMO_CODE_CLICK"


        DataStream<AnnouncementEventTypeCount> resultForPromoCodeClick = promoCodeStream
                .keyBy(AnnouncementEvent::getBroadcastID)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new AnnouncementEventCounter()).uid("promo-code-result-stream");

        DataStream<AnnouncementEventTypeCount> resultForShareIconClick = shareIconClickStream
                .keyBy(AnnouncementEvent::getBroadcastID)
                .window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
                .aggregate(new AnnouncementEventCounter()).uid("share-icon-result-stream");

        resultForShareIconClick.union(resultForPromoCodeClick);
        resultForShareIconClick.addSink(createLamdbaSinkFromStaticConfig()).uid("result-published");

        env.execute("Kinesis Data Analytics Flink Application with  Session Window and Aggregate Function");

I am using flink version 1.11 here, When I am deploying this code to S3 bucket and trying to run it then repeatedly getting error like,

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint s3://90adba2e0a9002371abdf5ee58da7883e350fe4d/3b3b06e55d15e42e945da7e868ce748b-135974748953-1690290221232/savepoints/61/attempt-1/savepoint-3b3b06-c15547ca2dff. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. at org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:210) at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:180) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1354) at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:315) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:268) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:236) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:123) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:297) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:284) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:391) ... 7 more It would be better to have some similar example where someone might solved similar problem.

I have did deployment multiple times with various solution from existing answers, but that haven't helped me.


Solution

  • The code looks fine. The reason why the job doesn't run is that you are trying to run this a version of your application using state saved in a checkpoint or savepoint taken with a older version, when the job graph was different. There's state in that checkpoint/savepoint that can't be restored, because there's an operation in the old job that no longer exists.