pythonapache-sparkpysparkthriftspark-thriftserver

How to view pyspark temporary tables on Thrift server?


I'm trying to make a temporary table a create on pyspark available via Thrift. My final goal is to be able to access that from a database client like DBeaver using JDBC.

I'm testing first using beeline.

This is what i'm doing.

  1. Started a cluster with one worker in my own machine using docker and added spark.sql.hive.thriftServer.singleSession true on spark-defaults.conf
  2. Started Pyspark shell (for testing sake) and ran the following code:

    from pyspark.sql import Row l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)] rdd = sc.parallelize(l) people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) people = people.toDF().cache() peebs = people.createOrReplaceTempView('peebs') result = sqlContext.sql('select * from peebs')

    So far so good, everything works fine.

  3. On a different terminal I initialize spark thrift server: ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --conf spark.executor.cores=1 --master spark://172.18.0.2:7077

    The server appears to start normally and I'm able to see both pyspark and thrift server jobs running on my spark cluster master UI.

  4. I then connect to the cluster using beeline

    ./bin/beeline beeline> !connect jdbc:hive2://172.18.0.2:10001

    This is what I got

    Connecting to jdbc:hive2://172.18.0.2:10001
    Enter username for jdbc:hive2://172.18.0.2:10001:
    Enter password for jdbc:hive2://172.18.0.2:10001:
    2019-06-29 20:14:25 INFO Utils:310 - Supplied authorities: 172.18.0.2:10001
    2019-06-29 20:14:25 INFO Utils:397 - Resolved authority: 172.18.0.2:10001
    2019-06-29 20:14:25 INFO HiveConnection:203 - Will try to open client transport with JDBC Uri: jdbc:hive2://172.18.0.2:10001
    Connected to: Spark SQL (version 2.3.3)
    Driver: Hive JDBC (version 1.2.1.spark2)
    Transaction isolation: TRANSACTION_REPEATABLE_READ

    Seems to be ok.

  5. When I list show tables; I can't see anything.

Two interesting things I'd like to highlight is:

  1. When I start pyspark I get these warnings

    WARN ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0

    WARN ObjectStore:568 - Failed to get database default, returning NoSuchObjectException

    WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException

  2. When I start the thrift server I get these:

    rsync from spark://172.18.0.2:7077
    ssh: Could not resolve hostname spark: Name or service not known
    rsync: connection unexpectedly closed (0 bytes received so far) [Receiver]
    rsync error: unexplained error (code 255) at io.c(235) [Receiver=3.1.2]
    starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to ...

I've been through several posts and discussions. I see people saying we can't have temporary tables exposed via thrift unless you start the server from within the same code. If that's true how can I do that in python (pyspark)?

Thanks


Solution

  • After doing several tests I was able to come up with a simple (no authentication) code that's working for me.

    It's important noticing that if you want to make temporary tables available via JDBC you need to start thrift server in the same JVM (same spark job) and ensure the code hangs so the application is kept running in the cluster.

    Following a working sample code I created for reference:

    import time
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
    from py4j.java_gateway import java_import
    
    spark = SparkSession \
        .builder \
        .appName('the_test') \
        .enableHiveSupport()\
        .config('spark.sql.hive.thriftServer.singleSession', True)\
        .config('hive.server2.thrift.port', '10001') \
        .getOrCreate()
    
    sc=spark.sparkContext
    sc.setLogLevel('INFO')
    
    java_import(sc._gateway.jvm, "")
    
    
    from pyspark.sql import Row
    l = [('John', 20), ('Heather', 34), ('Sam', 23), ('Danny', 36)]
    rdd = sc.parallelize(l)
    people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
    people = people.toDF().cache()
    peebs = people.createOrReplaceTempView('peebs')
    
    sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)
    
    while True:
        time.sleep(10)
    

    I simply used the .py above in my spark-submit and the I was able to connect via JDBC through beeline and using DBeaver using the Hive JDBC Driver.