apache-beam

Collecting output from Apache Beam pipeline and displaying it to console


I have been working on Apache Beam for a couple of days. I wanted to quickly iterate on the application I am working and make sure the pipeline I am building is error free. In spark we can use sc.parallelise and when we apply some action we get the value that we can inspect.

Similarly when I was reading about Apache Beam, I found that we can create a PCollection and work with it using following syntax

with beam.Pipeline() as pipeline:
    lines = pipeline | beam.Create(["this is test", "this is another test"])
    word_count = (lines 
                  | "Word" >> beam.ParDo(lambda line: line.split(" "))
                  | "Pair of One" >> beam.Map(lambda w: (w, 1))
                  | "Group" >> beam.GroupByKey()
                  | "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
    result = pipeline.run()

I actually wanted to print the result to console. But I couldn't find any documentation around it.

Is there a way to print the result to console instead of saving it to a file each time?


Solution

  • After exploring furthermore and understanding how I can write testcases for my application I figure out the way to print the result to console. Please not that I am right now running everything to a single node machine and trying to understand functionality provided by apache beam and how can I adopt it without compromising industry best practices.

    So, here is my solution. At the very last stage of our pipeline we can introduce a map function that will print result to the console or accumulate the result in a variable later we can print the variable to see the value

    import apache_beam as beam
    
    # lets have a sample string
    data = ["this is sample data", "this is yet another sample data"]
    
    # create a pipeline
    pipeline = beam.Pipeline()
    counts = (pipeline | "create" >> beam.Create(data)
        | "split" >> beam.ParDo(lambda row: row.split(" "))
        | "pair" >> beam.Map(lambda w: (w, 1))
        | "group" >> beam.CombinePerKey(sum))
    
    # lets collect our result with a map transformation into output array
    output = []
    def collect(row):
        output.append(row)
        return True
    
    counts | "print" >> beam.Map(collect)
    
    # Run the pipeline
    result = pipeline.run()
    
    # lets wait until result a available
    result.wait_until_finish()
    
    # print the output
    print output