apache-flinkflink-streamingflink-batch

Extracting a field value (mostly constant) from Apache Flink Datastream<GenericRecord>


I have a Datastream which contains some fields like event_id, timestamp, etc. which remains constant for many records in pipeline. I want to use those in filename while writing it back in ParquetFormat using StreamingFileSink. We can use suffix and prefix if we are using constants. However, I wanted help in extracting the value from record which can be used to generate the filename.

Filename Pattern _ <Event_id>--.parquet

OutputFileConfig config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build();

I am planning to use this, but want help in Extracting "prefix" from the records itself.

Any thoughts on this would really be helpful. Thanks in advance :)


Solution

  • The current implementation of OutputFileConfig only supports fixed prefix and suffix parameters, and does not support user-defined logic

    public class OutputFileConfig implements Serializable {
    
        private final String partPrefix;
    
        private final String partSuffix;
    
        /**
         * Initiates the {@code PartFileConfig} with values passed as parameters.
         *
         * @param partPrefix - the beginning of part file name
         * @param partSuffix - the ending of part file name
         */
        public OutputFileConfig(final String partPrefix, final String partSuffix) {
            this.partPrefix = Preconditions.checkNotNull(partPrefix);
            this.partSuffix = Preconditions.checkNotNull(partSuffix);
        }