I have written this function to be able to create 5 dictionaries from a single one and to pass it into Mapin apache beam to produce another Pcollection.
Input : col1,Col2, Col3, Market_0_30, DealerMake_0_30, Market_31_60, DealerMake_31_60, Market_61_90, DealerMake_61_90, Market_91_120, DealerMake_91_120, Market_121, DealerMake_121,
Output: line 1: col1, col2, Col3, Market, DealerMake, Age: 0_30
line 2: col1, col2, Col3, Market, DealerMake, Age: 31_60
line 3: col1, col2, Col3, Market, DealerMake, Age: 31_60
def _expand(element: Dict) -> List:
common_columns = {}
for key in element.keys():
if key not in markets and key not in dealermakers:
common_columns[key] = element[key]
lines = {}
for i, (market, dealermaker) in enumerate(zip(markets, dealermakers)):
line = {}
line = common_columns.copy()
line[market] = element[market]
line[dealermaker] = element[dealermaker]
return lines
output = sources_data["group_stocks_view"] | "EXPAND" >> beam.Map(_expand) | "PRINT" >> beam.Map(print)
But I always get an empty Pcollection at the end.
Any help please ?
Regards,
def __init__(self):
pass
def process(self, element, *args, **kwargs) -> List[Dict[Any, Any]]:
"""convert an element to multiple elements
Attributes:
line: element to convert and filter
Yields:
yield a json document from the input line if not filtered
"""
periods = ["0_30", "31_60", "61_90", "91_120", "121"]
dicts_to_ret = []
for period in periods:
clean_dict = {
k: v
for (k, v) in element.items()
if not (k.startswith("Market") or k.startswith("DealerMake"))
}
new_dict = {
"Market": element[f"Market_{period}"],
"DealerMake": element[f"DealerMake_{period}"],
"Age": period,
}
dicts_to_ret.append({**clean_dict, **new_dict})
print(dicts_to_ret)
return dicts_to_ret
output = sources_data["group_stocks"] | "EXPAND" >> beam.ParDo(ExpandStocks())