I have the following recursive function that determines the Outlier using the InterQuartileRange method:
def interQuartileRangeFiltering(df: DataFrame): DataFrame = {
@scala.annotation.tailrec
def inner(cols: List[String], acc: DataFrame): DataFrame = cols match {
case Nil => acc
case column :: xs =>
val quantiles = acc.stat.approxQuantile(column, Array(0.25, 0.75), 0.0) // TODO: values should come from config
println(s"$column ${quantiles.size}")
val q1 = quantiles(0)
val q3 = quantiles(1)
val iqr = q1 - q3
val lowerRange = q1 - 1.5 * iqr
val upperRange = q3 + 1.5 * iqr
val filtered = acc.filter(s"$column < $lowerRange or $column > $upperRange")
inner(xs, filtered)
}
inner(df.columns.toList, df)
}
val outlierDF = interQuartileRangeFiltering(incomingDF)
So basically what I'm doing is that I'm recursively iterating over the columns and eliminating the outliers. Strangely it results in an ArrayIndexOutOfBounds Exception and prints the following:
housing_median_age 2
inland 2
island 2
population 2
total_bedrooms 2
near_bay 2
near_ocean 2
median_house_value 0
java.lang.ArrayIndexOutOfBoundsException: 0
at inner$1(<console>:75)
at interQuartileRangeFiltering(<console>:83)
... 54 elided
What is wrong with my approach?
def checkOutliersNum(columnName:String, df:DataFrame): DataFrame ={
val total = df.count()
val quantiles = df.stat.approxQuantile(columnName,Array(0.25,0.75),0.0)
val q1 = quantiles(0)
val q3 = quantiles(1)
val iqr = q3 - q1
val dfOutliers = df.select(columnName).filter(col(columnName) < (q1-1.5*iqr) || col(columnName) > (q3+1.5*iqr))
val regsCount = dfOutliers.count().toInt
val percentage = (regsCount.toFloat/total.toFloat)*100
//Removing outliers if percentage between 1-0%
if(percentage < 1 && percentage > 0){
df.select(columnName).filter(col(columnName) >= (q1-1.5*iqr) && col(columnName) <= (q3+1.5*iqr))
}else{
df
}
}
//Each column must follow a normal distribution.
val columnsInNormalDistr = Array("column_A","column_B","column_N")
for(columnName <- columnsInNormalDistr) {
//var df = ...
df = checkOutliersNum(columnName,df)
}