pythonapache-sparkpysparkapache-spark-sql

Concatenate two PySpark dataframes


I'm trying to concatenate two PySpark dataframes with some columns that are only on one of them:

from pyspark.sql.functions import randn, rand

df_1 = sqlContext.range(0, 10)

+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+

df_2 = sqlContext.range(11, 20)

+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+

df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))

and now I want to generate a third dataframe. I would like something like pandas concat:

df_1.show()
+---+--------------------+--------------------+
| id|             uniform|              normal|
+---+--------------------+--------------------+
|  0|  0.8122802274304282|  1.2423430583597714|
|  1|  0.8642043127063618|  0.3900018344856156|
|  2|  0.8292577771850476|  1.8077401259195247|
|  3|   0.198558705368724| -0.4270585782850261|
|  4|0.012661361966674889|   0.702634599720141|
|  5|  0.8535692890157796|-0.42355804115129153|
|  6|  0.3723296190171911|  1.3789648582622995|
|  7|  0.9529794127670571| 0.16238718777444605|
|  8|  0.9746632635918108| 0.02448061333761742|
|  9|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

df_2.show()
+---+--------------------+--------------------+
| id|             uniform|            normal_2|
+---+--------------------+--------------------+
| 11|  0.3221262660507942|  1.0269298899109824|
| 12|  0.4030672316912547|   1.285648175568798|
| 13|  0.9690555459609131|-0.22986601831364423|
| 14|0.011913836266515876|  -0.678915153834693|
| 15|  0.9359607054250594|-0.16557488664743034|
| 16| 0.45680471157575453| -0.3885563551710555|
| 17|  0.6411908952297819|  0.9161177183227823|
| 18|  0.5669232696934479|  0.7270125277020573|
| 19|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

#do some concatenation here, how?

df_concat.show()

| id|             uniform|              normal| normal_2   |
+---+--------------------+--------------------+------------+
|  0|  0.8122802274304282|  1.2423430583597714| None       |
|  1|  0.8642043127063618|  0.3900018344856156| None       |
|  2|  0.8292577771850476|  1.8077401259195247| None       |
|  3|   0.198558705368724| -0.4270585782850261| None       |
|  4|0.012661361966674889|   0.702634599720141| None       |
|  5|  0.8535692890157796|-0.42355804115129153| None       |
|  6|  0.3723296190171911|  1.3789648582622995| None       |
|  7|  0.9529794127670571| 0.16238718777444605| None       |
|  8|  0.9746632635918108| 0.02448061333761742| None       |
|  9|   0.513622008243935|  0.7626741803250845| None       |
| 11|  0.3221262660507942|  None              | 0.123      |
| 12|  0.4030672316912547|  None              |0.12323     |
| 13|  0.9690555459609131|  None              |0.123       |
| 14|0.011913836266515876|  None              |0.18923     |
| 15|  0.9359607054250594|  None              |0.99123     |
| 16| 0.45680471157575453|  None              |0.123       |
| 17|  0.6411908952297819|  None              |1.123       |
| 18|  0.5669232696934479|  None              |0.10023     |
| 19|   0.513622008243935|  None              |0.916332123 |
+---+--------------------+--------------------+------------+

Is that possible?


Solution

  • Maybe you can try creating the unexisting columns and calling union (unionAll for Spark 1.6 or lower):

    from pyspark.sql.functions import lit
    
    cols = ['id', 'uniform', 'normal', 'normal_2']    
    
    df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols)
    df_2_new = df_2.withColumn("normal", lit(None)).select(cols)
    
    result = df_1_new.union(df_2_new)
    
    # To remove the duplicates:
    
    result = result.dropDuplicates()