pythonamazon-web-servicesboto3amazon-athena

boto3 check if Athena database exists


Im making a script that creates a database in AWS Athena and then creates tables for that database, today the DB creation was taking ages, so the tables being created referred to a db that doesn't exists, is there a way to check if a DB is already created in Athena using boto3?

This is the part that created the db:

client = boto3.client('athena')
client.start_query_execution(
    QueryString='create database {}'.format('db_name'),
    ResultConfiguration=config
)

Solution

  • # -*- coding: utf-8 -*-
    import logging
    import os
    from time import sleep
    
    import boto3
    import pandas as pd
    from backports.tempfile import TemporaryDirectory
    
    logger = logging.getLogger(__name__)
    
    
    class AthenaQueryFailed(Exception):
        pass
    
    
    class Athena(object):
        S3_TEMP_BUCKET = "please-replace-with-your-bucket"
    
        def __init__(self, bucket=S3_TEMP_BUCKET):
            self.bucket = bucket
            self.client = boto3.Session().client("athena")
    
    
        def execute_query_in_athena(self, query, output_s3_directory, database="csv_dumps"):
            """ Useful when client executes a query in Athena and want result in the given `s3_directory`
            :param query: Query to be executed in Athena
            :param output_s3_directory: s3 path in which client want results to be stored
            :return: s3 path
            """
            response = self.client.start_query_execution(
                QueryString=query,
                QueryExecutionContext={"Database": database},
                ResultConfiguration={"OutputLocation": output_s3_directory},
            )
            query_execution_id = response["QueryExecutionId"]
            filename = "{filename}.csv".format(filename=response["QueryExecutionId"])
            s3_result_path = os.path.join(output_s3_directory, filename)
            logger.info(
                "Query query_execution_id <<{query_execution_id}>>, result_s3path <<{s3path}>>".format(
                    query_execution_id=query_execution_id, s3path=s3_result_path
                )
            )
            self.wait_for_query_to_complete(query_execution_id)
            return s3_result_path
    
        def wait_for_query_to_complete(self, query_execution_id):
            is_query_running = True
            backoff_time = 10
            while is_query_running:
                response = self.__get_query_status_response(query_execution_id)
                status = response["QueryExecution"]["Status"][
                    "State"
                ]  # possible responses: QUEUED | RUNNING | SUCCEEDED | FAILED | CANCELLED
                if status == "SUCCEEDED":
                    is_query_running = False
                elif status in ["CANCELED", "FAILED"]:
                    raise AthenaQueryFailed(status)
                elif status in ["QUEUED", "RUNNING"]:
                    logger.info("Backing off for {} seconds.".format(backoff_time))
                    sleep(backoff_time)
                else:
                    raise AthenaQueryFailed(status)
    
        def __get_query_status_response(self, query_execution_id):
            response = self.client.get_query_execution(QueryExecutionId=query_execution_id)
            return response
    
    

    As pointed in above answer, Athena Waiter is still not there implemented.

    I use this light weighted Athena client to do the query, it returns the s3 path of result when the query is completed.