azuredatabricksazure-databricks

How to schedule a second Databricks task to run at a dynamic, specific time?


I have a Workflow that will do two things:

  1. Running a notebook to find the datetime sunset for the current day
  2. Send a notification to myself via sms when the datetime arrives.
import requests
from datetime import datetime
url = 'https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={appid}'
response = requests.get(url)
weather = response.json()
sunset_utc = weather["sys"]["sunset"]
sunset_utc_formatted = datetime.fromtimestamp(sunset_utc).strftime('%Y-%m-%d %H:%M:%S')
dbutils.jobs.taskValues.set(key = "Time", value = sunset_utc_formatted)
sunset_utc_formatted

This code gives me a datetime of '2024-08-12 01:17:51'. The second notebook I tested and it successfully sent me a sms.

I was going to start with this as my workflow and realized this wouldn't work. When the second task would run it would fail and not work. How can I setup my workflow so that the third task 'Notification' starts at a specific time as defined by the GetTime task? enter image description here


Solution

  • With your current setup you need to be in loop or keep on checking until the sunset time comes and condition becomes true then your notification notebook will execute. But keeping the pipeline running to meet condition is not recommended and not ideal way.

    But also, you are getting time according to the local time zone of your cluster or compute in the 1st notebook running.

    So, the solution is getting time in your local time zone and add schedule to the notification task in separate pipeline instead of running the pipeline in loop for if condition match.

    Below is the scrip which needs to be run every day to update the schedule of the notification task.

    import requests
    from datetime import datetime
    import pytz
    import json
    
    def datetime_to_cron(dt: datetime) -> str:
        return dt.strftime(f'{dt.second} {dt.minute} {dt.hour} * * ?')
    
    url =  'https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={appid}'
    response = requests.get(url)
    weather = response.json()
    sunset_utc = weather["sys"]["sunset"]
    ist = pytz.timezone('Asia/Kolkata')
    sunset_utc_formatted = datetime.fromtimestamp(sunset_utc,ist)
    

    Here, i am getting time in my local time zone IST and same time zone i used to schedule.

    Next updating the schedule.

    
    databricks_instance = "https://<Your databricks instance>.azuredatabricks.net"  
    api_token = "dapi2871....." #apiToken
    job_id = "872001410877658" #Your notification job id which is created separately.
    
    
    cron_expression = datetime_to_cron(sunset_utc_formatted)
    
    payload = {
        "job_id":job_id,
        "new_settings":{
            "schedule": {
            "quartz_cron_expression": cron_expression,
            "timezone_id": "Asia/Kolkata",  # Set the timezone
            "pause_status": "UNPAUSED"
        }
        }
    }
    
    
    headers = {
        "Authorization": f"Bearer {api_token}",
        "Content-Type": "application/json"
    }
    
    url = f"{databricks_instance}/api/2.1/jobs/update"
    response = requests.post(url, headers=headers, data=json.dumps(payload))
    
    # Check the response
    if response.status_code == 200:
        print(f"Job scheduled at {sunset_utc_formatted.strftime('%Y-%m-%d %H:%M:%S')}.")
    else:
        print(f"Failed to update job schedule: {response.status_code}")
        print(response.json()
    

    You run this notebook every day and it updates schedule daily.

    Output:

    enter image description here

    and the same is updated in pipeline.

    enter image description here