I am trying to read 500 millions records from a table using spark jdbc and then performance join on that tables . When i execute a sql from sql developer it takes 25 Minutes . But when i load this using spark JDBC it takes forever last time it ran for 18 hours and then i cancelled it . I am using AWS-GLUE for this .
this is how i read using spark jdbc
df = glueContext.read.format("jdbc")
.option("url","jdbc:oracle:thin://abcd:1521/abcd.com")
.option("user","USER_PROD")
.option("password","ffg#Prod")
.option("numPartitions", 15)
.option("partitionColumn", "OUTSTANDING_ACTIONS")
.option("lowerBound", 0)
.option("upperBound", 1000)
.option("dbtable","FSP.CUSTOMER_CASE")
.option("driver","oracle.jdbc.OracleDriver").load()
customer_casedf=df.createOrReplaceTempView("customer_caseOnpremView")
I have used partitionColumn OUTSTANDING_ACTIONS and here is data distribution Column 1 is partitionColumn and second is their occurrence
1 8988894
0 4227894
5 2264259
9 2263534
8 2262628
2 2261704
3 2260580
4 2260335
7 2259747
6 2257970
This is my Join where customer_caseOnpremView table loading is taking more than 18 hours and othere two tables takes 1 minutes
ThirdQueryResuletOnprem=spark.sql("SELECT CP.CLIENT_ID,COUNT(1) NoofCases FROM customer_caseOnpremView CC JOIN groupViewOnpremView FG ON FG.ID = CC.OWNER_ID JOIN client_platformViewOnpremView CP ON CP.CLIENT_ID = SUBSTR(FG.PATH, 2, INSTR(FG.PATH, '/') + INSTR(SUBSTR(FG.PATH, 1 + INSTR(FG.PATH, '/')), '/') - 2) WHERE FG.STATUS = 'ACTIVE' AND FG.TYPE = 'CLIENT' GROUP BY CP.CLIENT_ID")
Please suggest how to make it fast . I have no of worker from 10 to 40 I have used Executor type standard to GP2 biggest one but no impact on job
As your query has lot of filters you don't even need to bring in the whole dataset and then apply filter on it. But you can push this query down to db engine which will in turn filter the data and return back the result for Glue job.
This can be done as explained in https://stackoverflow.com/a/54375010/4326922 and below is an example for mysql which can be applied for oracle too with few changes.
query= "(select ab.id,ab.name,ab.date1,bb.tStartDate from test.test12 ab join test.test34 bb on ab.id=bb.id where ab.date1>'" + args['start_date'] + "') as testresult"
datasource0 = spark.read.format("jdbc").option("url", "jdbc:mysql://host.test.us-east-2.rds.amazonaws.com:3306/test").option("driver", "com.mysql.jdbc.Driver").option("dbtable", query).option("user", "test").option("password", "Password1234").load()