pysparkapache-spark-sqluser-defined-functions

Pyspark, how to calculate poisson distribution using udf?


I have a dataframe looks like this:

df_schema = StructType([StructField("date", StringType(), True),\
                              StructField("col1", FloatType(), True),\
                             StructField("col2", FloatType(), True)])
df_data = [('2020-08-01',0.09,0.8),\
                 ('2020-08-02',0.0483,0.8)]
rdd = sc.parallelize(df_data)
df = sqlContext.createDataFrame(df_data, df_schema)
df = df.withColumn("date",to_date("date", 'yyyy-MM-dd'))
df.show() 

+----------+------+----+
|      date|  col1|col2|
+----------+------+----+
|2020-08-01|  0.09| 0.8|
|2020-08-02|0.0483| 0.8|
+----------+------+----+

And I want to calculate Poisson CDF using col1 and col2.

we can easily use from scipy.stats import poisson in pandas dataframe but I don't know how to deal with pyspark.

prob = poisson.cdf(x, mu) where x= col1 , and mu = col2 in our case.

ATTEMPT 1 :

from scipy.stats import poisson
from pyspark.sql.functions import udf,col
def poisson_calc(a,b):
    return poisson.cdf(a,b,axis=1)

poisson_calc = udf(poisson_calc, FloatType())

df_new = df.select(
  poisson_calc(col('col1'),col('col2')).alias("want") )

df_new.show()

Got me an error :TypeError: _parse_args() got an unexpected keyword argument 'axis'


Solution

  • I see some issues with your attempt.

    Fixing that all up, the following should work:

    from scipy.stats import poisson
    from pyspark.sql.functions import udf,col
    
    def poisson_calc(a,b):
        return float(poisson.cdf(a,b))
    
    poisson_calc_udf = udf(poisson_calc, FloatType())
    
    df_new = df.select(
      poisson_calc_udf(col('col1'),col('col2')).alias("want") 
    )
    
    df_new.show()
    #+----------+
    #|      want|
    #+----------+
    #|0.44932896|
    #|0.44932896|
    #+----------+