pythonjsonapache-beamapache-beam-iondjson

How to convert data to desired format and write to a file - Python + Apache Beam


I have a .ndjson file looks like this:

{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
...

I used Apache Beam to read it and grouped the data by property_id, then I wrote the output to a json file, but the data looks like this:

('107', [PPD(property_id='107', transaction_unique_id='{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}', price=80000, date_of_transfer='2021-05-07 00:00', postcode='BL2 2GY', property_type='F', old_new='N', duration='L', PAON='14', SAON='', street='RIVER VIEW COURT', locality='', town_city='BOLTON', district='BOLTON', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('108', [PPD(property_id='108', transaction_unique_id='{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}', price=330000, date_of_transfer='2021-02-26 00:00', postcode='SK6 4AN', property_type='S', old_new='N', duration='F', PAON='18', SAON='', street='GUYWOOD LANE', locality='ROMILEY', town_city='STOCKPORT', district='STOCKPORT', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('109', [PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}', price=215000, date_of_transfer='2021-02-19 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 022', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}', price=226500, date_of_transfer='2021-02-08 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 727', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}', price=262000, date_of_transfer='2021-05-14 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 025', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
...

We can see that for property_id = '109', it grouped three records, but the output format above is really weird...Does anything know why is that and how can I convert it to a newline delimited JSON format and then write to a JSON file?

Expected format looks something like (not sure if this is valid newline delimited json format, but the idea is to include the transactions for the same property_id (for example 109) in an array):

{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transactions": [{"transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}]}
...

Can someone help please, I'm very new to beam, any help would be appreciated. Thanks.


Solution

  • I assume that PPD is a named tuple and you're taking a PCollection of PPD objects and grouping them like

    grouped = (
        ppd_pcoll
        | beam.Map(lambda ppd: (ppd.property_id, property_id)
        | beam.GroupByKey())
    

    Now grouped is a PCollection of 2-tuples, where the first is the property id string, and the second is an iterable of PPDs (with that property id).

    In order to get what you want, you'll need to map it to the desired dictionary to output it as json, e.g.

    to_write_to_json = (grouped
     | beam.MapTuple(lambda property_id, ppds: {
          'property_id': property_id,
          'transactions': [ppd_to_transaction(ppd) for ppd in ppds],
         })
    

    where ppd_to_transaction is a function that takes your PPD object and returns a dict with the desired transaction properties.