I am working on a simple Airflow-Docker project where I want to implement a ML model, use Flask to expose the model on a port so that any user can use curl command to generate a prediction from this model. This is what I did so far:
Add Dockerfile to root directory:
FROM apache/airflow:latest
USER airflow
COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" -r /requirements.txt
Added docker-compose.yml to root directory:
version: '3'
services:
sleek-airflow:
image: pythonairflow:latest
volumes:
- ./airflow:/opt/airflow
ports:
- "8080:8080"
command: airflow standalone
Added DAG file to root directory:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime
import requests
def train():
# Import necessary libraries
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
# Step 1: Fetch the California housing dataset
data = fetch_california_housing()
# Step 2: Split the data into features (X) and target (y)
X = data.data
y = data.target
# Step 3: Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Step 4: Preprocess the data using StandardScaler
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Step 5: Prepare the model using Linear Regression
model = LinearRegression()
# Step 6: Train the model on the training data
model.fit(X_train_scaled, y_train)
# Step 7: Use the trained model for prediction
y_pred = model.predict(X_test_scaled)
# Step 8: Evaluate the model (e.g., calculate Mean Squared Error)
mse = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error: {mse}")
dag = DAG(
'pipeline_dag',
default_args={'start_date': days_ago(1)},
schedule_interval='0 23 * * *',
catchup=False
)
pipeline_task = PythonOperator(
task_id='train_model',
python_callable=train,
dag=dag
)
pipeline_task
Also added the requirements.txt file to the root directory. It only has scikit-learn in it. I am right-clicking on the Dockerfile on VS Code, then I click on Build. Once done, I right-click on docker-compose.yml and click Compose Up. I put the DAG file inside the airflow/dags folder and restart the image from Docker Desktop. I open the web UI and once the DAG is visible, I run it manually.
Can someone help me with integrating Flask in this? Also, let me know if I am doing something wrong so far.
I recommend to create a plugin and expand airflow routes. Should be something like(just an example):
from flask import jsonify
from airflow.api.client.local_client import Client
from flask import request
class MyMLRunner(AppBuilderBaseView):
default_view = "test"
@expose("/")
@has_access(
[
...,
]
)
def test(self):
client = Client(None, None)
client.trigger_dag(
dag_id='{DAG_ID_HERE}',
run_id='from_http',
conf=request.get_json(), # with config from request
)
# blablabla...
return jsonify(dict(status='ok'))
Benefits:
By the way you can use API