Dataframe df1 contains columns : a, b, c, d, e (Empty dataframe)
Dataframe df2 contains columns : b, c, d, e, _c4 (Contains Data)
I want to do a union on these two dataframes. I tried using
df1.union(df2);
This populates data with position. but i want to populated data with name of the columns.
Then I tried with
df1.unionByName(df2, allowMissingColumns= true);
But it throws the error in ``allowMissingColumns= true`. I get to know the error this is because of the version. I use spark version 2.4.4.
df1:
|a|b|c|d|e|
+---------+
| | | | | |
+---------+
df2:
|b|c|d|e|_c4|
+-----------+
|2|3|5|6| |
+-----------+
Expected Output:
|a|b|c|d|e|
+---------+
| |2|3|5|6|
+---------+
My question is there any other way to override an empty dataframe (df1) with populated dataframe (df2) using the column names? or should i need to change the version in pom.xml file? Kindly pour some suggestions.
Pom file:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rule</groupId>
<artifactId>qwerty</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>qwerty</name>
<description>code</description>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<archive>
<manifest>
<mainClass>qwerty.qwerty</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration> <source>1.8</source> <target>1.8</target> </configuration>
</plugin>
</plugins>
</build>
</project>
unionByName
exists since spark 2.3
but the allowMissingColumns
only appeared in spark 3.1
, hence the error you obtain in 2.4
.
In spark 2.4
, you could try to implement the same behavior yourself. That is, transforming df2
so that it contains all the columns from df1
. If a column is not in df2
, we can set it to null. In scala, you could do it this way:
val df2_as1 = df2
.select(df1
.columns
.map(c => if(df2.columns.contains(c)) col(c) else lit(null).as(c))
: _*)
// Here, union would work just as well.
val result = df1.unionByName(df2_as1)
In java, that's obviously much more painful:
List<String> df2_cols = Arrays.asList(df2.columns());
// cols is the list of columns contained in df1, but all columns
// that are not in df2 are set to null.
List<Column> cols = new ArrayList<>();
for (String c : df1.columns()) {
if(df2_cols.contains(c))
cols.add(functions.col(c));
else
cols.add(functions.lit(null).alias(c));
}
// We modify df2 so that its schema matches df1's.
Dataset<Row> df2_as1 = df2.select(JavaConverters.asScalaBuffer(cols).toSeq());
// Here, union would work just as well.
Dataset<Row> result = df1.unionByName(df2_as1);