Mandi Express: Live Agricultural Commodity Price Prediction
Complete MLOps Case Study Report: Real-Time
Predictive System (RPS)
Project: Real-Time Predictive System (RPS) for Mandi Price Prediction
Domain: Financial/Logistics (Predicting Agricultural Commodity Prices)
Deadline: November 30, 2025
Core Goal: Implement an end-to-end, continuously trained, and monitored MLOps pipeline
using Airflow, DVC, MLflow (via Dagshub), Docker, GitHub Actions, Prometheus, and Grafana.
Phase I: Problem Definition and Data Ingestion
The objective of Phase I was to select a time-series problem and establish the core
orchestration and data versioning mechanism.
1. Predictive Challenge (Step 1)
● Selected Task: Mandi Price Prediction.
● Goal: Predict the final transaction price of a specific agricultural commodity (e.g., Onion)
based on historical data, location, and the current date (a short-term forecast problem).
● API Simulation: A daily scheduled Airflow DAG simulates connecting to a "Mandi Price
API" (we basically fetched this data for amis pakistan website using a cusotm scrapper)
to fetch the latest price and volume data for the day.
2. Orchestration Core: Apache Airflow DAG (Step 2)
An Apache Airflow DAG, scheduled to run daily at midnight, orchestrates the entire ETL and
model retraining lifecycle.1
Extraction (2.1):
● Operator: PythonOperator (extract_live_data_task)
● Action: Connects to the simulated API endpoint, fetches the latest 24 hours of data, and
saves the raw JSON/CSV file, timestamped with the run date, to a staging directory.
Mandatory Quality Gate (2.2):
● Operator: PythonOperator (data_quality_check_task)
● Logic: Immediately follows the extraction step. It checks the raw data for:
○ Schema Validation: Verifies the existence of key columns (Crop, City, Price, Volume).
○ Null Check: Ensures no more than 1% of values in the Price column are null.
● Failure Mechanism: If the check fails, the task raises an AirflowSkipException or
AirflowFailException, halting the entire DAG run.
Transformation (2.3):
● Operator: PythonOperator (feature_engineering_task)
● Action: Cleans the data (handling minor outliers, type conversion) and generates
time-series specific features.
● Key Features Engineered:
1. Lag Features: Price and Volume from $t-1$, $t-7$ days.
2. Rolling Means: 7-day rolling mean of price to capture short-term trends.
3. Time-of-Year Encoding: Sine/Cosine transforms on the month/day to capture
cyclical seasonality.
Documentation Artifact (2.4):
● Action: Runs Pandas Profiling on the transformed dataset
(include/processed_combined.csv).
● Logging: The generated HTML report is logged as an artifact to the MLflow Tracking
Server (Dagshub) under the current Airflow run ID.
Loading & Versioning (2.5 & 3):
● Storage: The final, cleaned dataset is saved to the Dagshub DVC Remote (acting as
cloud object storage).
● Tool: DVC
● Commands Executed by Airflow Operator:
Bash
dvc add include/processed_combined.csv
dvc push include/processed_combined.csv
git commit -m "Airflow: Updated data version for processed_combined.csv"
git push
Old dataset:
After successful airfllow run:
Phase II: Experimentation and Model Management
This phase focuses on leveraging MLflow to track every training run triggered by the Airflow
DAG.
3. MLflow & Dagshub Integration (Step 4)
The model training script (scripts/train_model.py) is configured to use Dagshub as the central
MLOps platform for both MLflow tracking and DVC storage.
Configuration Environment Variables:
The following environment variables are set in the Airflow operator or the GitHub Actions
runner environment:
Variable
Value
Purpose
MLFLOW_TRACKING_URI
https://dagshub.com//.mlflow
Routes all MLflow logs and
artifacts to Dagshub.
MLFLOW_TRACKING_USER
NAME
Authentication for MLflow
Tracking Server.
MLFLOW_TRACKING_PASS
WORD
Authentication for MLflow
Tracking Server.
4. Training and Tracking (within train_model.py)
● Workflow: The training script loads the DVC-versioned data, splits it, trains a Scikit-learn
Pipeline (preprocessing + Gradient Boosting Regressor), and logs all results.
● Key MLflow Actions:
Python
import mlflow
from sklearn.ensemble import GradientBoostingRegressor
with mlflow.start_run(run_name="Scheduled_Training_Run") as run:
# 1. Log Hyperparameters
mlflow.log_params({
"n_estimators": 200,
"max_depth": 5,
"loss_function": "huber"
})
# 2. Train Model and Calculate Metrics
model.fit(X_train, y_train)
rmse, mae = calculate_metrics(y_true, y_pred)
# 3. Log Metrics
mlflow.log_metric("RMSE", rmse)
mlflow.log_metric("MAE", mae)
# 4. Log Artifact (Residuals Plot)
plt.savefig("residuals.png")
mlflow.log_artifact("residuals.png")
# 5. Log and Register the Model
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
registered_model_name="MandiPricePredictor"
)
Phase III: Continuous Integration & Deployment
(CI/CD)
The CI/CD pipeline enforces a strict branching model (dev $\rightarrow$ test $\rightarrow$
master) using GitHub Actions for automation and CML for model quality gates.
5. CI Pipeline: Feature $\rightarrow$ dev Merge (ci-dev.yml)
This step ensures code hygiene before integration.
Step
Commands Executed
(from ci-dev.yml)
Purpose
Install Tools
pip install flake8 pytest
black isort
Installs dependencies for
code quality checks.
Black Check
black --check --diff dags/
api/ scripts/
Verifies code formatting
standards (fails if files need
formatting).
Flake8 Linting
flake8 dags/ api/ scripts/
--max-line-length=120
Catches programmatic and
stylistic errors.
Unit Tests
pytest tests/ -v --tb=short
Runs fast,
non-ML-dependent tests.
6. CI Pipeline: dev $\rightarrow$ test Merge (Continuous Training ci-test.yml)
This is the Model Retraining Test where a full pipeline run is triggered, and a quality gate is
enforced by CML.
Step
Commands Executed
(from ci-test.yml)
Purpose
Install DVC
pip install ... dvc[s3]
Installs DVC with S3
support to connect to
Dagshub remote.
Setup DVC
dvc remote modify
dagshub --local auth basic
Configures DVC
authentication using runner
secrets.
Pull Data
`dvc pull
Train Model
python
scripts/train_model.py
Executes the full training
pipeline, logging metrics
and the model to MLflow.
CML Report & Gate
cml-send-comment
report.md
Generates a metric
comparison report (e.g.,
comparing current RMSE
vs. production RMSE) and
posts it to the PR.
(Mandatory Gate): The PR
merge must be manually
blocked if CML reports
degradation (e.g., if new
RMSE > old RMSE).
7. CD Pipeline: test $\rightarrow$ master Merge (Production
Deployment - cd-master.yml)
The final stage pushes a verified, production-ready image to Docker Hub.
Step
Commands Executed
(from cd-master.yml)
Purpose
DVC Pull (Model)
`dvc pull models/
Docker Build
uses:
docker/build-push-action@
v5 (Internal)
Builds the Docker image
based on api/Dockerfile.
Crucially, this build includes
the DVC-pulled model
artifact.
Docker Push
push: true, tags: app:latest,
app:vYYYYMMDD-SHA
Pushes the image to
Docker Hub with both
latest and a unique version
tag.
Deployment Verification:
Run
docker run -d -p
8000:8000 --name
mandi-test ...
Runs the new container on
the GitHub runner's host
for immediate health
checks.
Health Check
`curl -f
http://localhost:8000/healt
h
Prediction Test
curl -X POST
http://localhost:8000/predi
ct ...
Executes an end-to-end
prediction to ensure the
model pipeline and API
logic are functional.
Phase IV: Monitoring and Observability
The final phase integrates observability tools directly into the FastAPI application to ensure
the system remains reliable and stable in production.
8. Prometheus and Grafana Integration
Prometheus Data Collector
● Tool: A lightweight Python client (e.g., prometheus_client) is embedded in the FastAPI
service (api/app.py).
● Endpoint: The API exposes metrics on the /metrics endpoint.
● Service Metrics Exposed:
○ api_requests_total: A Counter tracking the total number of prediction requests.2
○ api_inference_latency_seconds: A Histogram tracking the time taken for each
prediction (latency).
● Model/Data Drift Metrics:
○ data_drift_out_of_range_ratio: A Gauge that calculates the percentage of requests
where a key input feature (e.g., Volume) falls outside the range observed during
training (a basic proxy for concept drift).
Grafana Dashboard and Alerting
● Source: Grafana is configured to use the Prometheus instance as its data source.3
● Live Dashboard: A three-panel dashboard visualizes the production system health:
1. Request Rate: Total requests per second.
2. p95 Latency: 95th percentile inference latency (critical service metric).
3. Data Drift: Time-series plot of the data_drift_out_of_range_ratio.
● Alerting Enhancement:
○ Configuration: A Grafana Alert is set on the p95 Latency metric.
○ Condition: Fires if api_inference_latency_seconds (95th percentile) is consistently
greater than 0.5 seconds (500ms) for a period of 5 minutes.
○ Action: The alert sends a notification (e.g., to a Slack channel or writes to a
persistent log file) to notify the MLOps team of performance degradation.
Summary of Integrated Tools
Category
Tool
Function in RPS Project
Orchestration
Apache Airflow
Schedules the daily ETL
and training DAG.
Data/Model Mgmt
DVC, MLflow, Dagshub
DVC for data/model
versioning; MLflow (hosted
on Dagshub) for
experiment tracking and
Model Registry.
CI/CD
GitHub Actions
Automates the dev
(linting), test (CT/CML),
and master (CD) pipelines.
Serving
Docker, FastAPI
Containerizes the
prediction service for
reliable deployment.
Monitoring
Prometheus, Grafana
Prometheus collects
latency and drift metrics;
Grafana visualizes the
dashboard and configures
the latency alert.