dataframeapache-spark-sqlhiveqlspark-hive

Spark Hive: Filter rows of one DataFrame by the values of another DataFrame's column


I have the following two DataFrames:

DataFrame "dfPromotion":
date        | store
===================
2017-01-01  | 1    
2017-01-02  | 1


DataFrame "dfOther":
date        | store
===================
2017-01-01  | 1    
2017-01-03  | 1    

Later I need to union both of the DataFrames above. But before I have to remove all rows of dfOther that have a date value, that is also contained in dfPromotion.

The result of the following filtering step should look like this:

DataFrame "dfPromotion" (this stays always the same, must not be changed in this step!)
date        | store
===================
2017-01-01  | 1    
2017-01-02  | 1


DataFrame "dfOther" (first row is removed as dfPromotion contains the date 2017-01-01 in the "date" column)
date        | store
===================
2017-01-03  | 1 

Is there a way to do this in Java? I only found the DataFrame.except method herefore, but this checks all columns of the DataFrames. I need to filter the second DataFrame just by the date column, as other columns could be added later, which could contain different values...

Calling dfOther.filter(dfOther.col("date").isin(dfPromotion.col("date"))) throws following exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) date#64 missing from date#0,store#13 in operator !Filter date#0 IN (date#64);

Solution

  • Since you mentioned about Spark Hive, can you try spark sql approach like below?

    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
    val dfpromotion = sqlContext.sql("select * from dfpromotion");
    
    dfpromotion.show
    +----------+-----+
    |        dt|store|
    +----------+-----+
    |2017-01-01|    1|
    |2017-01-02|    1|
    +----------+-----+
    
    val dfother = sqlContext.sql("select * from dfother");
    
    dfother.show
    +----------+-----+
    |        dt|store|
    +----------+-----+
    |2017-01-01|    1|
    |2017-01-03|    1|
    +----------+-----+
    
    
    val dfdiff = sqlContext.sql("select o.dt, o.store from dfpromotion p right         outer join dfother o on p.dt = o.dt where p.dt is null");
    val dfunion = dfpromotion.union(dfdiff);
    
    
    scala> dfunion.show
    +----------+-----+
    |        dt|store|
    +----------+-----+
    |2017-01-01|    1|
    |2017-01-02|    1|
    |2017-01-03|    1|