flaskairfloworchestration

How to run DAG using API?


I am working on a simple Airflow-Docker project where I want to implement a ML model, use Flask to expose the model on a port so that any user can use curl command to generate a prediction from this model. This is what I did so far:

Add Dockerfile to root directory:

FROM apache/airflow:latest

USER airflow

COPY requirements.txt /

RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" -r /requirements.txt

Added docker-compose.yml to root directory:

version: '3'

services:
  sleek-airflow:
    image: pythonairflow:latest

    volumes:
      - ./airflow:/opt/airflow

    ports:
      - "8080:8080"

    command: airflow standalone

Added DAG file to root directory:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime
import requests

def train():
    # Import necessary libraries
    from sklearn.datasets import fetch_california_housing
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LinearRegression
    from sklearn.metrics import mean_squared_error

    # Step 1: Fetch the California housing dataset
    data = fetch_california_housing()

    # Step 2: Split the data into features (X) and target (y)
    X = data.data
    y = data.target

    # Step 3: Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Step 4: Preprocess the data using StandardScaler
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Step 5: Prepare the model using Linear Regression
    model = LinearRegression()

    # Step 6: Train the model on the training data
    model.fit(X_train_scaled, y_train)

    # Step 7: Use the trained model for prediction
    y_pred = model.predict(X_test_scaled)

    # Step 8: Evaluate the model (e.g., calculate Mean Squared Error)
    mse = mean_squared_error(y_test, y_pred)
    print(f"Mean Squared Error: {mse}")


dag = DAG(
    'pipeline_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 23 * * *',
    catchup=False
)

pipeline_task = PythonOperator(
    task_id='train_model',
    python_callable=train,
    dag=dag
)

pipeline_task

Also added the requirements.txt file to the root directory. It only has scikit-learn in it. I am right-clicking on the Dockerfile on VS Code, then I click on Build. Once done, I right-click on docker-compose.yml and click Compose Up. I put the DAG file inside the airflow/dags folder and restart the image from Docker Desktop. I open the web UI and once the DAG is visible, I run it manually.

Can someone help me with integrating Flask in this? Also, let me know if I am doing something wrong so far.


Solution

  • I recommend to create a plugin and expand airflow routes. Should be something like(just an example):

    from flask import jsonify
    from airflow.api.client.local_client import Client
    from flask import request
    
    class MyMLRunner(AppBuilderBaseView):
        default_view = "test"
    
        @expose("/")
        @has_access(
            [
                ...,
            ]
        )
        def test(self):
            client = Client(None, None)
            client.trigger_dag(
                dag_id='{DAG_ID_HERE}',
                run_id='from_http',
                conf=request.get_json(),  # with config from request
            )
            # blablabla...
            return jsonify(dict(status='ok'))
        
    

    Benefits:

    By the way you can use API