pythonjsonapache-sparkpysparkapache-spark-sql

Convert dataframe to nested json records


I have a spark dataframe as follows:


----------------------------------------------------------------------------------------------
| type |   lctNbr   | itmNbr |   lastUpdatedDate   | lctSeqId|  T7797_PRD_LCT_TYP_CD|   FXT_AIL_ID| pmyVbuNbr | upcId   |   vndModId|
____________________________________________________________________________
| prd_lct   145         147       2024-07-22T05:24:14   1          1                         14       126       008236686661    35216

_____________________________________________________________________________

I want to group this data frame by type, lctNbr, itmNbr, and lastUpdatedDate. I jsut want each record to be in the below json format:

  "type": "prd_lct",
  "lctNbr": 145,
  "itmNbr": 147,
  "lastUpdatedDate": "2024-07-22T05:24:14",
  "locations": [
    {
      "lctSeqId": 1,
      "prdLctTypCd": 1,
      "fxtAilId": "14"
    }
  ],
  "itemDetails": [
    {
      "pmyVbuNbr": 126,
      "upcId": "008236686661",
      "vndModId": "35216"
  ]
}

I tried using to_json, collect_list and map_from_entries functions but i just keep getting errors when i do a show command and cant seem to get to the correct format.


Solution

  • You can groupby the desired fields, then aggregate F.collect_list(F.create_map(...)) to get the inner fields for locations and itemDetails.

    Sample data:

    pandasDF = pd.DataFrame({
        "type": ["prd_lct","prd_lct","test"],
        "lctNbr": [145, 145, 148],
        "itmNbr": [147, 147, 150],
        "lastUpdatedDate": ["2024-07-22T05:24:14", "2024-07-22T05:24:14", "2024-07-22T05:24:15"],
        "lctSeqId": [1,2,3],
        "T7797_PRD_LCT_TYP_CD": [1,2,3],
        "FXT_AIL_ID": ["14","15","16"],
        "pmyVbuNbr": [126, 127, 128],
        "upcId": ["008236686661","008236686662","008236686663"],
        "vndModId": ["35216","35217","35218"]
    })
    
    +-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
    |   type|lctNbr|itmNbr|    lastUpdatedDate|lctSeqId|T7797_PRD_LCT_TYP_CD|FXT_AIL_ID|pmyVbuNbr|       upcId|vndModId|
    +-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
    |prd_lct|   145|   147|2024-07-22T05:24:14|       1|                   1|        14|      126|008236686661|   35216|
    |prd_lct|   145|   147|2024-07-22T05:24:14|       2|                   2|        15|      127|008236686662|   35217|
    |   test|   148|   150|2024-07-22T05:24:15|       3|                   3|        16|      128|008236686663|   35218|
    +-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
    

    Resulting DataFrame and conversion to a list of JSON encoded strings.

    resultDF = sparkDF.groupby(
        'type', 'lctNbr', 'itmNbr', 'lastUpdatedDate'
    ).agg(
        F.collect_list(
            F.create_map(
                F.lit('lctSeqId'), F.col('lctSeqId'),
                F.lit('prdLctTypCd'), F.col('T7797_PRD_LCT_TYP_CD'),
                F.lit('fxtAilId'), F.col('FXT_AIL_ID'),
            )
        ).alias('locations'),
        F.collect_list(
            F.create_map(
                F.lit('pmyVbuNbr'), F.col('pmyVbuNbr'),
                F.lit('upcId'), F.col('upcId'),
                F.lit('vndModId'), F.col('vndModId'),
            )
        ).alias('itemDetails')
    )
    
    resultJSON = result.toJSON().collect()
    

    Since resultJSON will be a list of JSON encoded strings, you can convert it into a dictionary using the following:

    import ast
    result_dict = [ast.literal_eval(x) for x in resultJSON]
    
    [
      {
        "type": "prd_lct",
        "lctNbr": 145,
        "itmNbr": 147,
        "lastUpdatedDate": "2024-07-22T05:24:14",
        "locations": [
          {
            "lctSeqId": "1",
            "prdLctTypCd": "1",
            "fxtAilId": "14"
          },
          {
            "lctSeqId": "2",
            "prdLctTypCd": "2",
            "fxtAilId": "15"
          }
        ],
        "itemDetails": [
          {
            "pmyVbuNbr": "126",
            "upcId": "008236686661",
            "vndModId": "35216"
          },
          {
            "pmyVbuNbr": "127",
            "upcId": "008236686662",
            "vndModId": "35217"
          }
        ]
      },
      {
        "type": "test",
        "lctNbr": 148,
        "itmNbr": 150,
        "lastUpdatedDate": "2024-07-22T05:24:15",
        "locations": [
          {
            "lctSeqId": "3",
            "prdLctTypCd": "3",
            "fxtAilId": "16"
          }
        ],
        "itemDetails": [
          {
            "pmyVbuNbr": "128",
            "upcId": "008236686663",
            "vndModId": "35218"
          }
        ]
      }