python-3.xgoogle-cloud-dataflowapache-beamapache-beam-io

Apache beam multiple output for a single dictionary


I have written this function to be able to create 5 dictionaries from a single one and to pass it into Mapin apache beam to produce another Pcollection.

Input : col1,Col2, Col3, Market_0_30, DealerMake_0_30, Market_31_60, DealerMake_31_60, Market_61_90, DealerMake_61_90, Market_91_120, DealerMake_91_120, Market_121, DealerMake_121,

Output: line 1: col1, col2, Col3, Market, DealerMake, Age: 0_30

line 2: col1, col2, Col3, Market, DealerMake, Age: 31_60

line 3: col1, col2, Col3, Market, DealerMake, Age: 31_60

        def _expand(element: Dict) -> List:
        common_columns = {}
        for key in element.keys():
            if key not in markets and key not in dealermakers:
                common_columns[key] = element[key]

        lines = {}
        for i, (market, dealermaker) in enumerate(zip(markets, dealermakers)):
            line = {}
            line = common_columns.copy()
            line[market] = element[market]
            line[dealermaker] = element[dealermaker]
        return lines
    output = sources_data["group_stocks_view"] | "EXPAND" >> beam.Map(_expand) | "PRINT" >> beam.Map(print)

But I always get an empty Pcollection at the end.

Any help please ?

Regards,


Solution

  •             def __init__(self):
                pass
    
            def process(self, element, *args, **kwargs) -> List[Dict[Any, Any]]:
                """convert an element to multiple elements
                Attributes:
                    line: element to convert and filter
                Yields:
                    yield a json document from the input line if not filtered
                """
                periods = ["0_30", "31_60", "61_90", "91_120", "121"]
                dicts_to_ret = []
                for period in periods:
                    clean_dict = {
                        k: v
                        for (k, v) in element.items()
                        if not (k.startswith("Market") or k.startswith("DealerMake"))
                    }
                    new_dict = {
                        "Market": element[f"Market_{period}"],
                        "DealerMake": element[f"DealerMake_{period}"],
                        "Age": period,
                    }
                    dicts_to_ret.append({**clean_dict, **new_dict})
                    print(dicts_to_ret)
                return dicts_to_ret
    
        output = sources_data["group_stocks"] | "EXPAND" >> beam.ParDo(ExpandStocks())