pythonpyspark

Spark SQL for-loop error 'bool' attribute has no attribute 'alias'


I am trying to create new columns in a Spark SQL dataframe that compare two columns within the dataframe, and return True if they are equal and False otherwise. I have to do this for a dataset with thousands of columns. To be a sample problem, I've included all of my code here. However, the important problem comes in the second for loop at the end of the code bloc.

from pyspark.sql import SQLContext
from pyspark.sql.types import *    

data = sc.parallelize([[1, None, 'BND'], [2, None, 'RMP'], [3, None, 'SWP'], [4, None, "IRS"], [5, None, "SWP"], [6, None, "IRS"]])
match = sc.parallelize([[1, 2,  100], [3, 5, 101], [4, 6, 102]])
    
trade_schema_string = 'trade_id,match_id,product'
trade_fields = [StructField(field_name, StringType(), True) for field_name in trade_schema_string.split(',')]
trade_fields[0].dataType = IntegerType()
trade_fields[1].dataType = IntegerType()
trade_schema = StructType(trade_fields)
    
match_schema_string = "pri_netting_id,sec_netting_id,match_id"
match_fields = [StructField(field_name, IntegerType(), True) for field_name in match_schema_string.split(',')]
match_schema = StructType(match_fields)

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(data, trade_schema)
odf = sqlContext.createDataFrame(match, match_schema)
df.registerTempTable("trade")
odf.registerTempTable("match")

# Get match_ids so you can match up front office and back office records
# Change column names for fo and bo dataframes so that they say "bo_product" and "fo_product", etc.
fo = sqlContext.sql("SELECT t.trade_id,t.product,m.match_id FROM trade t INNER JOIN match m WHERE t.trade_id = m.pri_netting_id")
bo = sqlContext.sql("SELECT t.trade_id,t.product,m.match_id FROM trade t INNER JOIN match m WHERE t.trade_id = m.sec_netting_id")
col_names = fo.columns
for x in range(0, len(col_names)):
    col_name = col_names[x]
    fo = fo.withColumnRenamed(col_name, "fo_" + col_name)
    bo = bo.withColumnRenamed(col_name, "bo_" + col_name)

fo.registerTempTable("front_office")
bo.registerTempTable("back_office")

fobo = sqlContext.sql("SELECT f.fo_trade_id,f.fo_product,b.bo_trade_id,b.bo_product FROM front_office f INNER JOIN back_office b WHERE f.fo_match_id = b.bo_match_id")
fobo = fobo.repartion(5)
# How to create diff columns
num_cols = len(fobo.columns)
fobo_names = fobo.columns
first = fobo.first()

for x in range(0, num_cols / 2):
    new_name = "\'diff_" + fobo_names[x][3:] + "\'"
    old_column_fo = "fobo." + fobo_names[x]
    old_column_bo = "fobo." + fobo_names[x + (num_cols / 2)]
    fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)

The error I get is:

Traceback (most recent call last):
  File "<stdin>", line 8, in <module>
  File "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/spark/python/pyspark/sql/dataframe.py", line 695, in withColumn
    return self.select('*', col.alias(colName))
AttributeError: 'bool' object has no attribute 'alias'

So, the strange thing is that if I execute the following by hand:

fobo = fobo.withColumn("diff_product", fobo.fo_product == fobo.bo_product)

and

fobo = fobo.withColumn("diff_trade_id", fobo.fo_trade_id == fobo.bo_trade_id)

The whole thing works perfectly. However, this isn't practical for my true use case, which has many columns.


Solution

  • old_column_fo = "fobo." + fobo_names[x]
    old_column_bo = "fobo." + fobo_names[x + (num_cols / 2)]
    fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)
    

    old_column_fo and old_column_bo will be strings that merely look like the attribute names you're trying to access, but they won't be the actual attributes. Try using getattr instead.

    old_column_fo = getattr(fobo, fobo_names[x])
    old_column_bo = getattr(fobo, fobo_names[x + (num_cols / 2)])
    fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)