pythonpostgresqlapache-sparkapache-kafkaspark-streaming-kafka

Trying to write a streaming dataframe from spark in postgreSQL with Kafka and pyspark


I have been searching for this issue in every side of this site and I have not found any solution. I have wrote a java class that creates a producer in Kafka and sends some file and it works fine. Than, I want to write a python script that read this files and put them into a database in postgreSQL.

Each file (each file is a dataset with a lot of columns) becomes a topic in kafka consumer and each row of the file becomes a message in the relative topic.

This is the spark dataframe that I create in python from the streaming data:

 list = df.select("fileName", "Satellite_PRN_number", "date", "time", "Crs", "Delta_n", "m0", "Cuc",
                 "e_Eccentricity",
                 "Cus",
                 "sqrt_A", "Toe_Time_of_Ephemeris", "Cic", "OMEGA_maiusc", "cis", "i0", "Crc", "omega",
                 "omega_dot",
                 "idot")

Here is my python function that should insert each row in my postgreSQL table. I used psycopg2 for creating a connection between python and postgre and I use "self.cursor.execute" in order to write queries.

def process_row(self, row):
  self.cursor.execute(
  'INSERT INTO satellite(fileName,Satellite_PRN_number, date, time,Crs,Delta_n, m0, 
  Cuc,e_Eccentricity,Cus,'
  'sqrt_A, Toe_Time_of_Ephemeris, Cic, OMEGA_maiusc, cis, i0, Crc, omega, omega_dot, idot) VALUES 
  (%s,%s,%s,'
  '%s,%s,%s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
  (row.fileName, row.Satellite_PRN_number, row.date, row.time, row.Crs, row.Delta_n, row.m0, row.Cuc,
  row.e_Eccentricity,
  row.Cus, row.sqrt_A, row.Toe_Time_of_Ephemeris, row.Cic, row.OMEGA_maiusc, row.cis, row.i0, 
  row.Crc,
  row.omega,
  row.omega_dot, row.idot))
  self.connection.commit()

Finally, I use this method above in order to populate my table in postgreSQL with the following command:

query = list.writeStream.outputMode("append").foreachBatch(process_row)\ 
        .option("checkpointLocation", "C:\\Users\\Admin\\AppData\\Local\\Temp").start()

I got the following error: AttributeError: 'DataFrame' object has no attribute 'cursor'.

I think that the issue is in row.fileName, etc... or in the method "process_row". I don't exactly understand how to manage the method "process_row" in order to pass each row of the streaming dataframe to populate the posteSQL table.

Can anyone help me? Thanks.


Solution

  • Your signature of foreachBatch seems not correct. It should be like this:

    def foreach_batch_function(df, epoch_id):
        # Transform and write batchDF
        pass
      
    streamingDF.writeStream.foreachBatch(foreach_batch_function).start() 
    

    As you can see the first argument of the forEachBatch function is a DataFrame not what you expect the Instance of you psycopg2 class. The ForEachBatch will have a DataFrame which itself will contain all the Rows from the current micro batch not just one row.

    So you can either try to declare the Instance of your postgreSQL connection in that function to further use it or you could try that approach:

    I would create a hive jdbc source based table of your postgreSQL database like this:

    CREATE TABLE jdbcTable
    USING org.apache.spark.sql.jdbc
    OPTIONS (
      url "jdbc:postgresql:dbserver",
      dbtable "schema.tablename",
      user 'username',
      password 'password'
    )
    

    which will enable you to use your forEachBatch function like this:

    def foreach_batch_function(df, epoch_id):
        # Transform and write batchDF
        df.write.insertInto("jdbcTable")
    

    hope that was helpfull