pythonapache-sparkpysparkapache-spark-sqlspark-streaming

How to dynamically apply array column typing in Spark


I have a PySpark DataFrame with a string column that contains JSON data structured as arrays of objects. However, the schema of these JSON objects can vary from row to row.

Here’s an example of two rows in the DataFrame:

+---------------------------------------------------------------------------------------------------+
| column                                                                                       |
+---------------------------------------------------------------------------------------------------+
| [{"_t":"TypeA","id":"123","value":"100","details":{"key1":"val1","key2":"val2"}}]                 |
| [{"_t":"TypeB","id":"456","extra_field":"info","other_details":{"key3":"val3","key4":"val4"}}]    |
+---------------------------------------------------------------------------------------------------+

I need to convert the column from a string type to an array type, with the schema dynamically inferred to accommodate all variations in the JSON structure. Currently, my approach only works for the first row and doesn't account for schema variations.

Here’s what I’ve tried so far:

from pyspark.sql.functions import schema_of_json, from_json, col

json_sample = df.select("column").head()[0]  # Sample JSON from the first row
inferred_schema = schema_of_json(json_sample)    # Infer schema from the sample

# Convert the column to array type using the inferred schema
df = df.withColumn("column", from_json(col("column"), inferred_schema))

Issue: The inferred schema only matches the first row's JSON structure. As a result, rows with different schemas (e.g., the second row) fail to parse correctly.

I’m looking for a way to: Dynamically infer the schema for the column to handle all variations in the JSON objects across rows. Apply the inferred schema to convert the column into an array type without losing any data.

Real data example, one row:

[{'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': '29b2c17e-e980-4e3f-bd6a-a311f414bd25', 'Operation': 'Add', 'Deductible': {'_id': 'f794288a-6c38-4dd5-bc3a-f3d81e2e824a', 'Value': '0.00', 'Percent': '0'}, 'Subscription': {'RiskRate': '0.000232200', 'LoadingFee': '0', 'DerivedLmi': '150000.00', 'PricingLmi': '150000.00', 'PureRate': '0.000232200', 'PurePremium': '34.83'}, 'Product': {'NetPremiumWithoutDiscount': '85.64', 'DaFix': '7.99', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '125.94117647058823529411764706', 'DiscountValue': '50.38', 'CommercialPremium': '75.56', 'CommissionPercentage': '0.320000000', 'CommissionValue': '40.301176470588235294117647059', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '75.564705882352941176470588236', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.000503733'}, 'Financial': {'Iof': '5.58', 'IofPercentage': '0.0738', 'GrossPremium': '81.14', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '40.301176470588235294117647059', 'ProLaboreWithoutDiscount': '0', 'Commission': '24.180705882352941176470588236', 'ProLabore': '0', 'Profit': '-3.43', 'NetPremium': '51.38', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '19.98'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '150000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': 'ca1cc323-0f42-4661-951e-dfad7027d9b6', 'Operation': 'Add', 'Deductible': {'_id': 'ad9c664c-06b4-408a-8e2f-2c664c45a137', 'Value': '0.00', 'Percent': '0'}, 'Subscription': {'RiskRate': '0.000494500', 'LoadingFee': '0', 'DerivedLmi': '15000.00', 'PricingLmi': '15000.00', 'PureRate': '0.000494500', 'PurePremium': '7.42'}, 'Product': {'NetPremiumWithoutDiscount': '30.8', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '45.294117647058823529411764706', 'DiscountValue': '18.12', 'CommercialPremium': '27.18', 'CommissionPercentage': '0.320000000', 'CommissionValue': '14.494117647058823529411764706', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '27.176470588235294117647058824', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.001812'}, 'Financial': {'Iof': '2.01', 'IofPercentage': '0.0738', 'GrossPremium': '29.19', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '14.494117647058823529411764706', 'ProLaboreWithoutDiscount': '0', 'Commission': '8.696470588235294117647058824', 'ProLabore': '0', 'Profit': '-1.23', 'NetPremium': '18.48', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '12.29'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': '5f809eaf-6a80-4eb3-a36a-e71014d7e888', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '15000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': '00e41c0f-eb68-4e24-8898-66fd54e560dd', 'Operation': 'Add', 'Deductible': {'_id': 'b4e2babc-7ea4-49e5-9c73-5b3265b07ccb', 'Value': '0.00', 'Percent': '0'}, 'Subscription': {'RiskRate': '0.010913400', 'LoadingFee': '0', 'DerivedLmi': '3000.00', 'PricingLmi': '3000.00', 'PureRate': '0.010913400', 'PurePremium': '32.74'}, 'Product': {'NetPremiumWithoutDiscount': '81.44', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '119.76470588235294117647058824', 'DiscountValue': '47.91', 'CommercialPremium': '71.86', 'CommissionPercentage': '0.320000000', 'CommissionValue': '38.324705882352941176470588237', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '71.858823529411764705882352944', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.023953333'}, 'Financial': {'Iof': '5.30', 'IofPercentage': '0.0738', 'GrossPremium': '77.16', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '38.324705882352941176470588237', 'ProLaboreWithoutDiscount': '0', 'Commission': '22.994823529411764705882352942', 'ProLabore': '0', 'Profit': '-3.26', 'NetPremium': '48.86', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '19.38'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '3000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': 'd044866e-f5f5-4a42-b68f-23aa2c55b498', 'Operation': 'Add', 'Deductible': {'_id': '0f9b9edb-2a35-4300-b1e5-d61e90636c66', 'Value': '1000', 'Percent': '0.15'}, 'Subscription': {'RiskRate': '0.001363100', 'LoadingFee': '0', 'DerivedLmi': '30000.00', 'PricingLmi': '30000.00', 'PureRate': '0.001363100', 'PurePremium': '40.89'}, 'Product': {'NetPremiumWithoutDiscount': '97.74', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '143.73529411764705882352941176', 'DiscountValue': '57.49', 'CommercialPremium': '86.24', 'CommissionPercentage': '0.320000000', 'CommissionValue': '45.995294117647058823529411763', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '86.24117647058823529411764706', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.002874667'}, 'Financial': {'Iof': '6.35', 'IofPercentage': '0.0738', 'GrossPremium': '92.59', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '45.995294117647058823529411763', 'ProLaboreWithoutDiscount': '0', 'Commission': '27.597176470588235294117647059', 'ProLabore': '0', 'Profit': '-3.91', 'NetPremium': '58.64', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '21.66'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '30000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': 'b2df01da-0b28-4425-8e68-1728395804f8', 'Operation': 'Add', 'Deductible': {'_id': '23e3f5e1-1a25-4920-a8f9-2a68bb7505d3', 'Value': '150.00', 'Percent': '0.1'}, 'Subscription': {'RiskRate': '0.005820050', 'LoadingFee': '0', 'DerivedLmi': '2000.00', 'PricingLmi': '2000.00', 'PureRate': '0.005820050', 'PurePremium': '11.64'}, 'Product': {'NetPremiumWithoutDiscount': '39.24', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '57.705882352941176470588235294', 'DiscountValue': '23.08', 'CommercialPremium': '34.62', 'CommissionPercentage': '0.320000000', 'CommissionValue': '18.465882352941176470588235294', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '34.623529411764705882352941176', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.01731'}, 'Financial': {'Iof': '2.56', 'IofPercentage': '0.0738', 'GrossPremium': '37.18', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '18.465882352941176470588235294', 'ProLaboreWithoutDiscount': '0', 'Commission': '11.079529411764705882352941176', 'ProLabore': '0', 'Profit': '-1.57', 'NetPremium': '23.54', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '13.47'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '2000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': '7fa48879-43e5-493b-bb90-ff7998232847', 'Operation': 'Add', 'Deductible': {'_id': '6c8d2e6f-5390-4a3b-b14e-79a44d5bec64', 'Value': '500.00', 'Percent': '0.1'}, 'Subscription': {'RiskRate': '0.006439250', 'LoadingFee': '0', 'DerivedLmi': '3000.00', 'PricingLmi': '3000.00', 'PureRate': '0.006439250', 'PurePremium': '19.32'}, 'Product': {'NetPremiumWithoutDiscount': '54.6', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '80.29411764705882352941176471', 'DiscountValue': '32.12', 'CommercialPremium': '48.18', 'CommissionPercentage': '0.320000000', 'CommissionValue': '25.694117647058823529411764707', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '48.176470588235294117647058826', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.01606'}, 'Financial': {'Iof': '3.56', 'IofPercentage': '0.0738', 'GrossPremium': '51.74', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '25.694117647058823529411764707', 'ProLaboreWithoutDiscount': '0', 'Commission': '15.416470588235294117647058824', 'ProLabore': '0', 'Profit': '-2.18', 'NetPremium': '32.76', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '15.62'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '3000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': '795c1e26-2eb7-4b55-b23a-765850699aeb', 'Operation': 'Add', 'Deductible': {'_id': 'e8996734-39d5-4983-8b41-8d7c23c73a12', 'Value': '0.00', 'Percent': '0'}, 'Subscription': {'RiskRate': '0.000273050', 'LoadingFee': '0', 'DerivedLmi': '3000.00', 'PricingLmi': '3000.00', 'PureRate': '0.000273050', 'PurePremium': '0.82'}, 'Product': {'NetPremiumWithoutDiscount': '17.6', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '25.882352941176470588235294118', 'DiscountValue': '10.35', 'CommercialPremium': '15.53', 'CommissionPercentage': '0.320000000', 'CommissionValue': '8.282352941176470588235294118', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '15.529411764705882352941176471', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.005176667'}, 'Financial': {'Iof': '1.15', 'IofPercentage': '0.0738', 'GrossPremium': '16.68', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '8.282352941176470588235294118', 'ProLaboreWithoutDiscount': '0', 'Commission': '4.9694117647058823529411764707', 'ProLabore': '0', 'Profit': '-0.70', 'NetPremium': '10.56', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '10.44'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '3000.00'}]}]

Solution

  • You can create an RDD with the column you concern and then load this RDD into dataframe using spark.read.json(), Spark will infer and merge the schema automatically for you, see below:

    from pyspark.sql.types import ArrayType
    from pyspark.sql.functions import from_json, col
    
    rdd = df.select('column').rdd.map(lambda x: x['column'])
    
    inferred_schema = ArrayType(spark.read.json(rdd).schema)
    
    df = df.withColumn("column", from_json(col("column"), inferred_schema))
    

    You can set samplingRatio (default=1.0) when creating dataframe with spark.read.json() to use a fraction of rows for schema inferring.