I'm attempting to process bz2 compressed XML files with a nested XML schema into normalized tables where each level of the schema is stored as a row, and any child elements are stored as rows in a separate table with a foreign key relating back to the keyed row of which it is a child.
These files can be rather large, 181MB compressed exploding into large numbers of rows from within a single file. If we don't supply the schema to the DataFrameReader, it will crash with out of mem errors just trying to parse the file and infer the schema. I provided the schema to get past this but...
I currently have a DataFrame at the parent level of the XML selecting a nested schema out of the entire file, but when I try to write the parent DataFrame node into a parquet, it is causing the single node doing the file load to throw out of mem errors.
How can I traverse the XML and write the data into normalized parquets for each level of the schema?
The schema is:
val schema = StructType(Seq(
StructField("_xmlns", StringType, true),
StructField("includedCalendarYearCategory", StructType(Seq(
StructField("calendarYear", LongType, true),
StructField("includedPlanCategory", StructType(Seq(
StructField("includedBilltypeCategory", ArrayType(StructType(Seq(
StructField("billTypeCode", LongType, true),
StructField("claimsExcluded", LongType, true),
StructField("claimsIncluded", LongType, true)
)), true), true),
StructField("includedMedicalClaimCategory", ArrayType(StructType(Seq(
StructField("enrolleeIdentifier", StringType, true),
StructField("includedSupplementalRecordCategory", StructType(Seq(
StructField("addDeleteVoidCode", StringType, true),
StructField("enrolleeIdentifier", StringType, true),
StructField("originalMedicalClaimId", StringType, true),
StructField("reasonCode", StringType, true),
StructField("supplementalDetailRecordId", StringType, true)
)), true),
StructField("medicalClaimIdentifier", StringType, true),
StructField("raEligibleIndicator", LongType, true),
StructField("rxcEligibleIndicator", LongType, true),
StructField("serviceCode", LongType, true)
)), true), true),
StructField("includedPharmacyClaimCategory", ArrayType(StructType(Seq(
StructField("enrolleeIdentifier", StringType, true),
StructField("nationalDrugCode", LongType, true),
StructField("pharmacyClaimIdentifier", StringType, true),
StructField("policyPaidAmount", DoubleType, true),
StructField("raEligibleIndicator", LongType, true),
StructField("reasonCode", StringType, true)
)), true), true),
StructField("includedReasonCodeCategory", ArrayType(StructType(Seq(
StructField("medicalClaimsExcluded", LongType, true),
StructField("medicalReasonCode", StringType, true),
StructField("pharmacyClaimsExcluded", LongType, true),
StructField("pharmacyReasonCode", StringType, true)
)), true), true),
StructField("includedServiceCodeCategory", ArrayType(StructType(Seq(
StructField("claimsExcluded", LongType, true),
StructField("claimsIncluded", LongType, true),
StructField("serviceCode", StringType, true)
)), true), true),
StructField("includedUnlinkedSupplementalCategory", ArrayType(StructType(Seq(
StructField("addDeleteVoidCode", LongType, true),
StructField("enrolleeIdentifier", StringType, true),
StructField("originalMedicalClaimId", StringType, true),
StructField("supplementalDiagnosisDetailRecordId", LongType, true)
)), true), true),
StructField("medicalClaimsExcluded", LongType, true),
StructField("medicalClaimsIncluded", LongType, true),
StructField("pharmacyClaimsExcluded", LongType, true),
StructField("pharmacyClaimsIncluded", LongType, true),
StructField("planIdentifier", StringType, true),
StructField("supplementalRecordsExcluded", LongType, true),
StructField("supplementalRecordsIncluded", LongType, true),
StructField("totalEnrollees", LongType, true),
StructField("totalEnrolleesWRaEligibleclaims", LongType, true),
StructField("totalUniqueNDC", LongType, true)
)), true),
StructField("medicalClaimsExcluded", LongType, true),
StructField("medicalClaimsIncluded", LongType, true),
StructField("pharmacyClaimsExcluded", LongType, true),
StructField("pharmacyClaimsIncluded", LongType, true),
StructField("supplementalRecordsExcluded", LongType, true),
StructField("supplementalRecordsIncluded", LongType, true),
StructField("totalEnrollees", LongType, true),
StructField("totalEnrolleesWRaEligibleclaims", LongType, true),
StructField("totalUniqueNDC", LongType, true)
)), true),
StructField("includedFileHeader", StructType(Seq(
StructField("cmsBatchIdentifier", StringType, true),
StructField("cmsJobIdentifier", LongType, true),
StructField("edgeServerIdentifier", LongType, true),
StructField("edgeServerProcessIdentifier", LongType, true),
StructField("edgeServerVersion", StringType, true),
StructField("globalReferenceDataVersion", StringType, true),
StructField("interfaceControlReleaseNumber", StringType, true),
StructField("issuerIdentifier", LongType, true),
StructField("outboundFileGenerationDateTime", TimestampType, true),
StructField("outboundFileIdentifier", StringType, true),
StructField("outboundFileTypeCode", StringType, true),
StructField("snapShotFileHash", StringType, true),
StructField("snapShotFileName", StringType, true)
)), true)
))
We are reading the dataframe like:
sparkSession.read
.schema(schema) // schema from above code block
.format("xml")
.option("rootTag", "riskAdjustmentClaimSelectionDetailReport")
.option("rowTag", "riskAdjustmentClaimSelectionDetailReport")
.xml(path)
.repartition(200)
Repartitioning I thought would help spread the single file across multiple nodes but it makes sense that the XML has to be fully parsed before it can figure out how to chunk it. Is there something I can configure in spark to load a massive XML file across a cluster instead of all in a single node using the embedded spark-xml library?
Turns out that Spark can't handle large XML files as it must read the entirety of it in a single node in order to determine how to break it up. If the file is too large to fit in memory uncompressed, it will choke on the massive XML file.
I had to use Scala to parse it linearly without Spark, node by node in recursive fashion, to prevent it from loading the entire file into memory.
I wrote a recursive method to traverse the entirety of the XML tree of the massive file and appended each element's children into a row in a CSV for that particular element type referencing its parent node's ID using Scala's built in XML traversal system.