sqlpysparkinputinputmismatchexception

SQL Error mismatched input 'sql_query' expecting {EOF} when using Create Table in Pyspark


I'm reading sql code from a zip file of text files, saving the query as a variable sql_query, and passing the variable into a spark dataframe like spark.sql('sql_query'). The query text I parse out from the text file functions well using regular sql in databricks and I'm able to create a table. But if I run the same code using spark.sql, I either receive the mismatched input error or a 'An error occurred while calling o327.sql: java.lang.NullPointerException; if I remove the quotes around sql_query.

#Create sql statement based on text within sql file
def create_sql_statement(file_name):
  #read as df
  df = spark.read.text(file_name)
  
  #Convert to pandas
  spark.conf.set("spark.sql.execution.arrow.enabled", "true")
  pandasDF = df.toPandas()
  
  #Combine all rows into one string
  one_string = ''.join(pandasDF['value'].tolist())

  #Remove extra spaces from string
  no_space = re.sub(' +', ' ', one_string)

  # initializing split word
  spl_word = 'SELECT'
  spl_word2 = 'FROM'
  spl_word3 = '*/'

  # Get String after substring occurrence
  res = no_space.split(spl_word, 1)
  sql_statement = res[1].split(spl_word2,1)[0]

  #select table name, remove extra space and 'AS' clause
  input_db = res[0].split('.')[0].split(' ')[2]
  tbl_name = res[0].split('.')[2].split(' ')[0]

  #best match
  str = f'{input_db}.{tbl_name}'
  best_tbl_match = get_close_matches(str, db_tbl_patterns, n=1)
  best_tbl_match = best_tbl_match[0] #Remove str from list
  
  #return sql statement
  sql_query = f"CREATE TABLE {tbl_name} AS SELECT{sql_statement}FROM {best_tbl_match}" #OR REPLACE TEMPORARY VIEW
  print(sql_query)

file_list = [list of multiple files with the sql statements]

for file in file_list:
  
  try:
    sql_query = create_sql_statement(file)
    df = spark.sql(sql_query) \
    .withColumn('type', F.lit('animal_type')) \
    .withColumn('timestamp', F.current_timestamp())
    df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(f'{database}.{table.lower()}')

  
  except Exception as e:
    print(e)

I looked through a few other stackoverflow posts about similar errors but I don't see any errors in my syntax. As far as I can see, the syntax is pretty standard and spacing and capitalization is normal as well. Here is a sample:

CREATE TABLE TABLE_NAME AS SELECT FIELD1, FIELD2, FIELD3, FIELD4 FROM DATABASE.TABLENAME

Exact Error Message:

mismatched input 'sql_query' expecting {'(', 'APPLY', 'CONVERT', 'COPY', 'OPTIMIZE', 'RESTORE', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'SYNC', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
sql_query
^^^

With the fix John Gordon suggested, the query successfully creates the table and I see the pyspark df result below but don't see the new columns implemented in my table.

num_affected_rows:long
num_inserted_rows:long
type:string
timestamp:timestamp

Thank you in advance for your help.


Solution

  • df = spark.sql('sql_query')
    

    Well, of course that doesn't work, because the literal string "sql_query" is not valid sql.

    Without the quotes, you're getting a null pointer exception because

    sql_query = create_sql_statement(file)
    

    The function doesn't actually return the sql query that it created. It is printed, but that's not the same thing. The function doesn't have a return statement, therefore it returns None by default, and this causes the null pointer exception.

    To fix this, the create_sql_statement() function should actually return the sql statement that it creates, not just print it.