pythonapache-sparkjdbcpysparkspark-thriftserver

Unable to keep Spark Thrift Server running


I need to expose some temporary tables on spark using Thrift. This is the base code I'm running on my cluster:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
# java_import(sc._gateway.jvm,"")

spark = SparkSession \
    .builder \
    .appName('the_test') \
    .enableHiveSupport()\
    .config('spark.sql.hive.thriftServer.singleSession', True)\
    .getOrCreate()

sc=spark.sparkContext
sc.setLogLevel('INFO')

java_import(sc._gateway.jvm, "")


from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')

#Start the Thrift Server using the jvm and passing the same spark session corresponding to pyspark session in the jvm side.
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)

When I run this job using:

./bin/spark-submit --conf spark.executor.cores=1  --master spark://172.18.0.2:7077 /home/spark/test.py

Everything starts normally but stops right away.

This is my log stack

2019-07-02 15:16:56 INFO  ObjectStore:289 - ObjectStore, initialize called
2019-07-02 15:16:56 INFO  Query:77 - Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
2019-07-02 15:16:56 INFO  MetaStoreDirectSql:139 - Using direct SQL, underlying DB is DERBY
2019-07-02 15:16:56 INFO  ObjectStore:272 - Initialized ObjectStore
2019-07-02 15:16:56 INFO  HiveMetaStore:746 - 0: get_databases: default
2019-07-02 15:16:56 INFO  audit:371 - ugi=root  ip=unknown-ip-addr  cmd=get_databases: default  
2019-07-02 15:16:56 INFO  HiveMetaStore:746 - 0: Shutting down the object store...
2019-07-02 15:16:56 INFO  audit:371 - ugi=root  ip=unknown-ip-addr  cmd=Shutting down the object store...   
2019-07-02 15:16:56 INFO  HiveMetaStore:746 - 0: Metastore shutdown complete.
2019-07-02 15:16:56 INFO  audit:371 - ugi=root  ip=unknown-ip-addr  cmd=Metastore shutdown complete.    
2019-07-02 15:16:56 INFO  AbstractService:104 - Service:ThriftBinaryCLIService is started.
2019-07-02 15:16:56 INFO  AbstractService:104 - Service:HiveServer2 is started.
2019-07-02 15:16:56 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@35ad1f14{/sqlserver,null,AVAILABLE,@Spark}
2019-07-02 15:16:56 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@739d5136{/sqlserver/json,null,AVAILABLE,@Spark}
2019-07-02 15:16:56 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3439b973{/sqlserver/session,null,AVAILABLE,@Spark}
2019-07-02 15:16:56 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@55118828{/sqlserver/session/json,null,AVAILABLE,@Spark}
2019-07-02 15:16:56 INFO  ThriftCLIService:98 - Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads
2019-07-02 15:16:56 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2019-07-02 15:16:56 INFO  HiveServer2:97 - Shutting down HiveServer2
2019-07-02 15:16:56 INFO  ThriftCLIService:188 - Thrift server has stopped
2019-07-02 15:16:56 INFO  AbstractService:125 - Service:ThriftBinaryCLIService is stopped.
2019-07-02 15:16:56 INFO  AbstractService:125 - Service:OperationManager is stopped.
2019-07-02 15:16:56 INFO  AbstractService:125 - Service:SessionManager is stopped.
2019-07-02 15:16:56 INFO  AbstractConnector:318 - Stopped Spark@525b57a{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-07-02 15:16:56 INFO  SparkUI:54 - Stopped Spark web UI at http://172.18.0.2:4040
2019-07-02 15:17:06 INFO  AbstractService:125 - Service:CLIService is stopped.
2019-07-02 15:17:06 INFO  AbstractService:125 - Service:HiveServer2 is stopped.
2019-07-02 15:17:06 INFO  StandaloneSchedulerBackend:54 - Shutting down all executors
2019-07-02 15:17:06 INFO  CoarseGrainedSchedulerBackend$DriverEndpoint:54 - Asking each executor to shut down
2019-07-02 15:17:06 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2019-07-02 15:17:06 INFO  MemoryStore:54 - MemoryStore cleared
2019-07-02 15:17:06 INFO  BlockManager:54 - BlockManager stopped
2019-07-02 15:17:06 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2019-07-02 15:17:06 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-07-02 15:17:06 INFO  SparkContext:54 - Successfully stopped SparkContext
2019-07-02 15:17:06 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-07-02 15:17:06 INFO  ShutdownHookManager:54 - Deleting directory /opt/spark-660123c2-27bf-4178-a2bb-b0688ef0de84
2019-07-02 15:17:06 INFO  ShutdownHookManager:54 - Deleting directory /opt/spark-660123c2-27bf-4178-a2bb-b0688ef0de84/pyspark-75030144-27b3-4526-bf5b-8c1c4eefe85a
2019-07-02 15:17:06 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-8504fc2b-9fd3-48b6-8e7c-844699a0080f
2019-07-02 15:17:06 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-c9477640-9a25-4d6a-9905-5c97032fdab7

I'm using spark.sql.hive.thriftServer.singleSession True to ensure it reads from the same session and I don't see any error, it just simply starts go through all the logic and finishes the jobs without hang so I'm able to access the temp tables from beeline or other SQL client using JDBC.

If anyone have any help that'd be great.


Solution

  • Your script termininates because there is nothing to do anymore after the thriftserver was started. Just keep it alive with a sleep:

    import time
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
    from py4j.java_gateway import java_import
    # java_import(sc._gateway.jvm,"")
    
    spark = SparkSession \
        .builder \
        .appName('the_test') \
        .enableHiveSupport()\
        .config('spark.sql.hive.thriftServer.singleSession', True)\
        .config("hive.server2.thrift.port", "20001") \
        .getOrCreate()
    
    sc=spark.sparkContext
    sc.setLogLevel('INFO')
    
    java_import(sc._gateway.jvm, "")
    
    
    from pyspark.sql import Row
    l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
    rdd = sc.parallelize(l)
    people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
    people = people.toDF().cache()
    peebs = people.createOrReplaceTempView('peebs')
    
    #Start the Thrift Server using the jvm and passing the same spark session corresponding to pyspark session in the jvm side.
    sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)
    
    #this keeps the thriftserver running
    while True:
      time.sleep(5)