I need to expose some temporary tables on spark using Thrift. This is the base code I'm running on my cluster:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
# java_import(sc._gateway.jvm,"")
spark = SparkSession \
.builder \
.appName('the_test') \
.enableHiveSupport()\
.config('spark.sql.hive.thriftServer.singleSession', True)\
.getOrCreate()
sc=spark.sparkContext
sc.setLogLevel('INFO')
java_import(sc._gateway.jvm, "")
from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')
#Start the Thrift Server using the jvm and passing the same spark session corresponding to pyspark session in the jvm side.
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)
When I run this job using:
./bin/spark-submit --conf spark.executor.cores=1 --master spark://172.18.0.2:7077 /home/spark/test.py
Everything starts normally but stops right away.
This is my log stack
2019-07-02 15:16:56 INFO ObjectStore:289 - ObjectStore, initialize called
2019-07-02 15:16:56 INFO Query:77 - Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
2019-07-02 15:16:56 INFO MetaStoreDirectSql:139 - Using direct SQL, underlying DB is DERBY
2019-07-02 15:16:56 INFO ObjectStore:272 - Initialized ObjectStore
2019-07-02 15:16:56 INFO HiveMetaStore:746 - 0: get_databases: default
2019-07-02 15:16:56 INFO audit:371 - ugi=root ip=unknown-ip-addr cmd=get_databases: default
2019-07-02 15:16:56 INFO HiveMetaStore:746 - 0: Shutting down the object store...
2019-07-02 15:16:56 INFO audit:371 - ugi=root ip=unknown-ip-addr cmd=Shutting down the object store...
2019-07-02 15:16:56 INFO HiveMetaStore:746 - 0: Metastore shutdown complete.
2019-07-02 15:16:56 INFO audit:371 - ugi=root ip=unknown-ip-addr cmd=Metastore shutdown complete.
2019-07-02 15:16:56 INFO AbstractService:104 - Service:ThriftBinaryCLIService is started.
2019-07-02 15:16:56 INFO AbstractService:104 - Service:HiveServer2 is started.
2019-07-02 15:16:56 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@35ad1f14{/sqlserver,null,AVAILABLE,@Spark}
2019-07-02 15:16:56 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@739d5136{/sqlserver/json,null,AVAILABLE,@Spark}
2019-07-02 15:16:56 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3439b973{/sqlserver/session,null,AVAILABLE,@Spark}
2019-07-02 15:16:56 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@55118828{/sqlserver/session/json,null,AVAILABLE,@Spark}
2019-07-02 15:16:56 INFO ThriftCLIService:98 - Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads
2019-07-02 15:16:56 INFO SparkContext:54 - Invoking stop() from shutdown hook
2019-07-02 15:16:56 INFO HiveServer2:97 - Shutting down HiveServer2
2019-07-02 15:16:56 INFO ThriftCLIService:188 - Thrift server has stopped
2019-07-02 15:16:56 INFO AbstractService:125 - Service:ThriftBinaryCLIService is stopped.
2019-07-02 15:16:56 INFO AbstractService:125 - Service:OperationManager is stopped.
2019-07-02 15:16:56 INFO AbstractService:125 - Service:SessionManager is stopped.
2019-07-02 15:16:56 INFO AbstractConnector:318 - Stopped Spark@525b57a{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-07-02 15:16:56 INFO SparkUI:54 - Stopped Spark web UI at http://172.18.0.2:4040
2019-07-02 15:17:06 INFO AbstractService:125 - Service:CLIService is stopped.
2019-07-02 15:17:06 INFO AbstractService:125 - Service:HiveServer2 is stopped.
2019-07-02 15:17:06 INFO StandaloneSchedulerBackend:54 - Shutting down all executors
2019-07-02 15:17:06 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:54 - Asking each executor to shut down
2019-07-02 15:17:06 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2019-07-02 15:17:06 INFO MemoryStore:54 - MemoryStore cleared
2019-07-02 15:17:06 INFO BlockManager:54 - BlockManager stopped
2019-07-02 15:17:06 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2019-07-02 15:17:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-07-02 15:17:06 INFO SparkContext:54 - Successfully stopped SparkContext
2019-07-02 15:17:06 INFO ShutdownHookManager:54 - Shutdown hook called
2019-07-02 15:17:06 INFO ShutdownHookManager:54 - Deleting directory /opt/spark-660123c2-27bf-4178-a2bb-b0688ef0de84
2019-07-02 15:17:06 INFO ShutdownHookManager:54 - Deleting directory /opt/spark-660123c2-27bf-4178-a2bb-b0688ef0de84/pyspark-75030144-27b3-4526-bf5b-8c1c4eefe85a
2019-07-02 15:17:06 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-8504fc2b-9fd3-48b6-8e7c-844699a0080f
2019-07-02 15:17:06 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-c9477640-9a25-4d6a-9905-5c97032fdab7
I'm using spark.sql.hive.thriftServer.singleSession True to ensure it reads from the same session and I don't see any error, it just simply starts go through all the logic and finishes the jobs without hang so I'm able to access the temp tables from beeline or other SQL client using JDBC.
If anyone have any help that'd be great.
Your script termininates because there is nothing to do anymore after the thriftserver was started. Just keep it alive with a sleep:
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
# java_import(sc._gateway.jvm,"")
spark = SparkSession \
.builder \
.appName('the_test') \
.enableHiveSupport()\
.config('spark.sql.hive.thriftServer.singleSession', True)\
.config("hive.server2.thrift.port", "20001") \
.getOrCreate()
sc=spark.sparkContext
sc.setLogLevel('INFO')
java_import(sc._gateway.jvm, "")
from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')
#Start the Thrift Server using the jvm and passing the same spark session corresponding to pyspark session in the jvm side.
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)
#this keeps the thriftserver running
while True:
time.sleep(5)