scalaapache-sparkapache-spark-sqlapache-spark-xml

cannot resolve explode due to type mismatch in spark while parsing xml file


I have a data frame with below schema

root
 |-- DataPartition: long (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- _organizationId: long (nullable = true)
 |-- _segmentId: long (nullable = true)
 |-- seg:BusinessSegments: struct (nullable = true)
 |    |-- seg:BusinessSegment: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |-- _hierarchicalCode: long (nullable = true)
 |    |    |    |-- _industryId: long (nullable = true)
 |    |    |    |-- _ranking: long (nullable = true)
 |-- seg:GeographicSegments: struct (nullable = true)
 |    |-- seg:GeographicSegment: struct (nullable = true)
 |    |    |-- _geographyId: long (nullable = true)
 |    |    |-- seg:IsSubtracted: boolean (nullable = true)
 |    |    |-- seg:Sequence: long (nullable = true)
 |-- seg:IsCorporate: boolean (nullable = true)
 |-- seg:IsElimination: boolean (nullable = true)
 |-- seg:IsOperatingSegment: boolean (nullable = true)
 |-- seg:IsOther: boolean (nullable = true)
 |-- seg:IsShariaCompliant: boolean (nullable = true)
 |-- seg:PredecessorSegments: struct (nullable = true)
 |    |-- seg:PredecessorSegment: long (nullable = true)
 |-- seg:SegmentLocalLanguageLabel: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _languageId: long (nullable = true)
 |-- seg:SegmentName: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _languageId: long (nullable = true)
 |-- seg:SegmentType: string (nullable = true)
 |-- seg:SegmentTypeId: long (nullable = true)
 |-- seg:ValidFromPeriodEndDate: string (nullable = true)
 |-- _action: string (nullable = true)

Now I want to get seg:BusinessSegments.seg:BusinessSegment value from the schema.

But my issue is when I do this using explode

val GeographicSegmentchildDF = parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:GeographicSegments.seg:GeographicSegment").as("GeographicSegments"), $"_action")
val GeographicSegmentchildArrayDF = GeographicSegmentchildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SegmentId"), $"GeographicSegments.*", getFFActionChild($"_action").as("FFAction|!|"))

So in first line I am exploding and in the next line I am doing * or expand on that $"GeographicSegments.*", .

I get error like This is what I am doing

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(seg:GeographicSegments.seg:GeographicSegment)' due to data type mismatch:

I know the issue because in schema I get seg:GeographicSegment as struct not as array and that is why I am getting .

So the real issue is I don't have fixed schema .

When there are two records in xml file then seg:GeographicSegment becomes as array and then my code is working fine but when I get only one record then it work as struct and my code fails .

How can I handle this in my code . Do I have to put condition while parsing schema ? Or is there anyway I

Here is one of the case which is not working

val columnTypePredecessorSegments = parentDF.select($"seg:PredecessorSegments.seg:PredecessorSegment").schema.map(_.dataType).head.toString().startsWith("LongType")
    //if column type is struct then use .* and array function to convert the struct to array else just use array
    val PredecessorSegmentschildDF = if (columnTypePredecessorSegments) {
      parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode(array($"seg:PredecessorSegments.seg:PredecessorSegment")).as("PredecessorSegments"), $"_action")
    } else {
      parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:PredecessorSegments.seg:PredecessorSegment").as("PredecessorSegments"), $"_action")
    }
    val PredecessorSegmentsDFFinalChilddDF = PredecessorSegmentschildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SuccessorSegment"), $"PredecessorSegments.*", getFFActionChild($"_action").as("FFAction|!|"))
    PredecessorSegmentsDFFinalChilddDF.show(false)

Solution

  • When there are two records in xml file then seg:GeographicSegment becomes as array and then my code is working fine but when I get only one record then it work as struct and my code fails .

    Then you would need to check for the datatype of the column before using explode

    //checking for struct or array type in that column
    val columnType = parentDF.select($"seg:GeographicSegments.seg:GeographicSegment").schema.map(_.dataType).head.toString().startsWith("StructType")
    
    import org.apache.spark.sql.functions._
    //if column type is struct then use .* and array function to convert the struct to array else just use array
    val GeographicSegmentchildDF = if(columnType) {
      parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode(array($"seg:GeographicSegments.seg:GeographicSegment.*")).as("GeographicSegments"), $"_action")
    }
    else {
      parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:GeographicSegments.seg:GeographicSegment").as("GeographicSegments"), $"_action")
    }
    val GeographicSegmentchildArrayDF = GeographicSegmentchildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SegmentId"), $"GeographicSegments.*", getFFActionChild($"_action").as("FFAction|!|"))
    

    I hope the answer is helpful