Im making a script that creates a database in AWS Athena and then creates tables for that database, today the DB creation was taking ages, so the tables being created referred to a db that doesn't exists, is there a way to check if a DB is already created in Athena using boto3?
This is the part that created the db:
client = boto3.client('athena')
client.start_query_execution(
QueryString='create database {}'.format('db_name'),
ResultConfiguration=config
)
# -*- coding: utf-8 -*-
import logging
import os
from time import sleep
import boto3
import pandas as pd
from backports.tempfile import TemporaryDirectory
logger = logging.getLogger(__name__)
class AthenaQueryFailed(Exception):
pass
class Athena(object):
S3_TEMP_BUCKET = "please-replace-with-your-bucket"
def __init__(self, bucket=S3_TEMP_BUCKET):
self.bucket = bucket
self.client = boto3.Session().client("athena")
def execute_query_in_athena(self, query, output_s3_directory, database="csv_dumps"):
""" Useful when client executes a query in Athena and want result in the given `s3_directory`
:param query: Query to be executed in Athena
:param output_s3_directory: s3 path in which client want results to be stored
:return: s3 path
"""
response = self.client.start_query_execution(
QueryString=query,
QueryExecutionContext={"Database": database},
ResultConfiguration={"OutputLocation": output_s3_directory},
)
query_execution_id = response["QueryExecutionId"]
filename = "{filename}.csv".format(filename=response["QueryExecutionId"])
s3_result_path = os.path.join(output_s3_directory, filename)
logger.info(
"Query query_execution_id <<{query_execution_id}>>, result_s3path <<{s3path}>>".format(
query_execution_id=query_execution_id, s3path=s3_result_path
)
)
self.wait_for_query_to_complete(query_execution_id)
return s3_result_path
def wait_for_query_to_complete(self, query_execution_id):
is_query_running = True
backoff_time = 10
while is_query_running:
response = self.__get_query_status_response(query_execution_id)
status = response["QueryExecution"]["Status"][
"State"
] # possible responses: QUEUED | RUNNING | SUCCEEDED | FAILED | CANCELLED
if status == "SUCCEEDED":
is_query_running = False
elif status in ["CANCELED", "FAILED"]:
raise AthenaQueryFailed(status)
elif status in ["QUEUED", "RUNNING"]:
logger.info("Backing off for {} seconds.".format(backoff_time))
sleep(backoff_time)
else:
raise AthenaQueryFailed(status)
def __get_query_status_response(self, query_execution_id):
response = self.client.get_query_execution(QueryExecutionId=query_execution_id)
return response
As pointed in above answer, Athena Waiter is still not there implemented.
I use this light weighted Athena
client to do the query, it returns the s3 path of result when the query is completed.