I am developing a FastApi which suppose to do some calculations based on a request in JSON format and then sends the response and stores it in several Databrick
s catalog tables.
So, in the API, I convert the response and also create the tables
What I am struggling with is what would be the correct databrick
s API endpoint that I should connect to?
As you can see from the code below, I defined:
url = f"{self.databricks_host}/api/2.0/sql/createTable"
but it is not working.
def send_to_dtb_catalog(self, df, table_name):
# doing some stuff here ....
# Prepare data payload for Databricks API
data = {
"tableName": f"my_database.my_schema.{table_name}",
"data": df_json
}
# Make HTTP request to Databricks REST API
# suppose databricks_host and databricks_token are pre-defined
url = f"{self.databricks_host}/api/2.0/sql/createTable"
headers = {
"Authorization": f"Bearer {self.databricks_token}",
"Content-Type": "application/json"
}
response = requests.post(url, headers=headers, json=data)
Then I will use send_to_dtb_catalog
to send the created tables to Databricks catalog tables, something like this
self.send_to_dtb_catalog(table1_df, "table1_databricks")
self.send_to_dtb_catalog(table2_df, "table2_databricks")
I appreciate any help as I am new to both Databricks and API development.
You can use the following API to execute SQL statements.
Alter your function like below.
Code:
import requests
def send_to_dtb_catalog(df, table_name):
url = f"{databricks_host}/api/2.0/sql/statements/"
headers = {
"Authorization": f"Bearer {databricks_token}",
"Content-Type": "application/json"
}
sql_q = f'''
CREATE TABLE IF NOT EXISTS {table_name} (
id INT,
name STRING
)
'''
body = {
"warehouse_id": "a415c87c62c279a5",
"statement": sql_q,
"wait_timeout": "30s",
"on_wait_timeout": "CANCEL"
}
response = requests.post(url, headers=headers, json=body)
if response.json()['status']['state'] == 'SUCCEEDED':
print("Inserting values....")
t = df.rdd.map(lambda row: tuple(row)).collect()
insert_query = f'''
INSERT INTO {table_name}
VALUES
{','.join(map(str, t))}
'''
body['statement'] = insert_query
res2 = requests.post(url, headers=headers, json=body)
return res2
Next, call your function.
Output:
Output of API request:
One more way is using drivers to connect to Databricks.
Refer this on how to connect to the server and execute queries.