pythonpython-3.xkafka-python

Python - Optimal way to check for a condition with a timeout?


Wrote two functions, 1. uploads a gzip file to Artifactory and 2. listens to the Kafka topic. As further steps, wanted to validate whether the upload is successful within 5 minutes by listening to the Kafka topic for the keyword - push_status: True, else fail the script. I'm kind of stumped with the logic that is more elegant and pythonic on the else part.

Code snippet:

upload_status = fetch_kafka_topic("topic", "brokers") #Returns messages from particular topic.

if bool(upload_status) == True: #Checks if the returned dictionary is empty or not
    if upload_status['file_sha'] == EXPECTED_SHA: # CHeck if the SHA Values  matches with the one I've in prior.
        if upload_status['push_status'] == True:
            print("upload is successful ...")
        else:
            Monitor and query the kafka topic again for the keyword for 5 mins and fail the build if it exceeds the timeout.

Solution

  • Is this the case?

    
    from datetime import datetime, timedelta
    
    
    def query_upload_success():
        upload_success = False
        upload_status = fetch_kafka_topic("topic", "brokers")
        if bool(upload_status) == False:
            return upload_success
        is_successful = (upload_status['file_sha'] == EXPECTED_SHA and upload_status['push_status'] == True)
        if is_successful:
            upload_success = True
            print("upload is successful ...")
        return upload_success
    
    
    def timed_query_upload():
        start_time = datetime.now()
        stop_time = start_time + timedelta(minutes=5)
        while True:
            upload_success = query_upload_success()
            if upload_success:
                break
            start_time = datetime.now()
            if start_time > stop_time:
                print("exceeds the timeout ...")
                break
    
    if __name__ == '__main__':
        timed_query_upload()