Integrate Ray Tune with Optuna, Imblearn, MLflow and MinIO¶
This guide walks you through how to use Ray Tune for hyperparameter tuning in a fraud detection model. The workflow includes:
- Loading training data from MinIO
- Defining a search space with Optuna, using over-sampling and down-sampling techniques like
AllKNN
andSMOTE
specifically inimblearn
packages to handle class imbalance. - Training an XGBoost binary classifier with boosters like
gbtree
,gblinear
, anddart
, and tuning hyperparameters such as lambda, alpha, and eta. - Logging metrics including accuracy, precision, recall, F1, and ROC AUC to both Ray Tune and MLflow.
- Manually configuring MLflow to support parent-child runs, instead of using the default
MLflowLoggerCallback
andsetup_mlflow
- Retraining and saving the best model with the optimal hyperparameters after tuning.
Here is a full training transcipt.
Full Training Script
# Import packages
import time
from datetime import datetime, timedelta
from pprint import pprint
from typing import Any, Dict, Optional
import pandas as pd
import pyarrow.fs
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline
from imblearn.under_sampling import AllKNN
from sklearn.metrics import (accuracy_score, average_precision_score, f1_score,
log_loss, precision_score, recall_score,
roc_auc_score)
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
import mlflow
import ray
from feast import FeatureStore
from ray import tune
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.optuna import OptunaSearch
fs = pyarrow.fs.S3FileSystem(
access_key="minio_user",
secret_key="minio_password",
scheme="http",
endpoint_override="minio-api.minio.svc.cluster.local:9000"
)
# Get Training Data
with fs.open_input_file("ray/training_data.csv") as f:
data = pd.read_csv(f)
# Alternative 1: Ray Data
# ds = ray.data.read_csv(
# "s3://ray/training_data.csv",
# filesystem=fs
# )
# Alternative 2: Feast
# now = datetime.now()
# two_days_ago = datetime.now() - timedelta(days=2)
# store = FeatureStore('.')
# fs_fraud_detection_v1 = store.get_feature_service('fraud_detection_v1')
# data = store.get_historical_features(
# entity_df=f"""
# select
# src_account as entity_id,
# timestamp as event_timestamp,
# is_fraud
# from
# feast-oss.fraud_tutorial.transactions
# where
# timestamp between timestamp('{two_days_ago.isoformat()}')
# and timestamp('{now.isoformat()}')""",
# features=fs_fraud_detection_v1,
# full_feature_names=False
# ).to_df()
# Configure Ray Tune
def space(trial) -> Optional[Dict[str, Any]]:
"""Define-by-run function to construct a conditional search space.
Ensure no actual computation takes place here.
Args:
trial: Optuna Trial object
Returns:
Dict containing constant parameters or None
"""
# Resampler
resampler = trial.suggest_categorical("resampler", ["allknn", "smote", "passthrough"])
# Booster
booster = trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"])
lmbd = trial.suggest_float("lambda", 1e-8, 1.0, log=True)
alpha = trial.suggest_float("alpha", 1e-8, 1.0, log=True)
if booster in ["gbtree", "dart"]:
max_depth = trial.suggest_int("max_depth", 3, 10)
eta = trial.suggest_float("eta", 1e-3, 0.3, log=True)
gamma = trial.suggest_float("gamma", 1e-8, 1.0, log=True)
grow_policy = trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
# Constants
return {
"objective": "binary:logistic",
"random_state": 1025
}
def training_function(
config, data,
run_id, mlflow_tracking_uri, experiment_name
):
# Set up mlflow
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment(experiment_name=experiment_name)
# Split data
X = data[[
'has_fraud_7d',
'num_transactions_7d',
'credit_score',
'account_age_days',
'has_2fa_installed'
]]
y = data['is_fraud']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, stratify=y,
random_state=config["random_state"]
)
# Define the resampler
if config["resampler"] == "allknn":
resampler = AllKNN()
elif config["resampler"] == "smote":
resampler = SMOTE()
else:
resampler = "passthrough"
# Define the classifier
new_config = {k: v for k, v in config.items() if k != "resampler"}
classifier = XGBClassifier(**new_config)
# Combine the resampler and classifier together
model = Pipeline(steps=[
("resampler", resampler),
("classifier", classifier)
])
# Train the model
with mlflow.start_run(run_id=run_id):
with mlflow.start_run(
run_name=f"{config['resampler']}-{config['booster']}-{config['lambda']:.2f}-{config['alpha']:.2f}",
nested=True
):
model.fit(X_train, y_train)
# Evaluate the model
y_prob = model.predict_proba(X_test)
y_prob = y_prob[:, 1]
y_pred = (y_prob > 0.5).astype(int)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
"f1": f1_score(y_test, y_pred),
"roc_auc": roc_auc_score(y_test, y_prob),
"log_loss": log_loss(y_test, y_prob),
"average_precision": average_precision_score(y_test, y_prob)
}
# Log the metrics and hyperparameters
mlflow.log_params(config)
mlflow.log_metrics(metrics)
tune.report(metrics)
search_alg = OptunaSearch(space=space, metric="f1", mode="max")
search_alg = ConcurrencyLimiter(search_alg, max_concurrent=4)
tune_config = tune.TuneConfig(
search_alg=search_alg,
num_samples=100,
)
EXPERIMENT_NAME = 'fraud_detection'
RUN_NAME = 'first'
TRACKING_URI = "http://tracking-server.mlflow.svc.cluster.local:5000"
mlflow.set_tracking_uri(TRACKING_URI)
if mlflow.get_experiment_by_name(EXPERIMENT_NAME) == None:
mlflow.create_experiment(EXPERIMENT_NAME)
mlflow.set_experiment(EXPERIMENT_NAME)
run_config = tune.RunConfig(
name=EXPERIMENT_NAME,
storage_path="ray/",
storage_filesystem=fs
)
# Run Ray Tune
ray.init()
with mlflow.start_run(run_name=RUN_NAME, nested=True) as run:
tuner = tune.Tuner(
tune.with_parameters(
training_function,
data=data,
run_id=run.info.run_id,
mlflow_tracking_uri=TRACKING_URI,
experiment_name=EXPERIMENT_NAME
),
tune_config=tune_config,
run_config=run_config
)
results = tuner.fit()
# Retrain the model with the hyperparameters with best result
config = results.get_best_result(metric='f1', mode='max').config
# 1. Split data
X = data[[
'has_fraud_7d',
'num_transactions_7d',
'credit_score',
'account_age_days',
'has_2fa_installed'
]]
y = data['is_fraud']
# 2. Define resampler
if config["resampler"] == "allknn":
resampler = AllKNN()
elif config["resampler"] == "smote":
resampler = SMOTE()
else:
resampler = "passthrough"
# 3. Define classifier
new_config = {k: v for k, v in config.items() if k != "resampler"}
classifier = XGBClassifier(**new_config)
# 4. Combine the resampler and classifier together
model = Pipeline(steps=[
("resampler", resampler),
("classifier", classifier)
])
# 5. Train and evaluate the model
model.fit(X, y)
y_prob = model.predict_proba(X)
y_prob = y_prob[:, 1]
y_pred = (y_prob > 0.5).astype(int)
metrics = {
"accuracy": accuracy_score(y, y_pred),
"precision": precision_score(y, y_pred),
"recall": recall_score(y, y_pred),
"f1": f1_score(y, y_pred),
"roc_auc": roc_auc_score(y, y_prob),
"log_loss": log_loss(y, y_prob),
"average_precision": average_precision_score(y, y_prob)
}
# 6. Log the hyperparameters, metrics and the model
mlflow.log_params(config)
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
input_example=X.iloc[[0]],
metadata={"version": f"{EXPERIMENT_NAME}_v1"}
)
Import Packages¶
First, let’s import the required packages.
# Import packages
import time
from datetime import datetime, timedelta
from pprint import pprint
from typing import Any, Dict, Optional
import pandas as pd
import pyarrow.fs
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline
from imblearn.under_sampling import AllKNN
from sklearn.metrics import (accuracy_score, average_precision_score, f1_score,
log_loss, precision_score, recall_score,
roc_auc_score)
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
import mlflow
import ray
from feast import FeatureStore
from ray import tune
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.optuna import OptunaSearch
MinIO Integration¶
We’ll use pyarrow.fs.S3FileSystem
to interact with MinIO deployed on Kubernetes. There are two main tasks here.
- Load training data stored in MinIO.
- Save Ray Tune metadata (like checkpoints and logs) back to MinIO during the tuning process.
Here's how we configure the connection to MinIO using S3FileSystem
, including the access key, secret key, and endpoint.
fs = pyarrow.fs.S3FileSystem(
access_key="minio_user",
secret_key="minio_password",
scheme="http",
endpoint_override="minio-api.minio.svc.cluster.local:9000"
)
These values should match what you specified when deploying MinIO on Kubernetes. For more details, refer to the configuration section below or revisit this article.
Info
apiVersion: apps/v1
kind: Deployment
metadata:
name: minio
namespace: minio
spec:
replicas: 1
selector:
matchLabels:
app: minio
template:
metadata:
labels:
app: minio
spec:
containers:
- name: minio
image: minio/minio
args:
- server
- /data
- --console-address
- :9001
env:
- name: MINIO_ROOT_USER
value: minio_user
- name: MINIO_ROOT_PASSWORD
value: minio_password
ports:
- containerPort: 9000
protocol: TCP
- containerPort: 9001
protocol: TCP
livenessProbe:
httpGet:
path: /minio/health/live
port: 9000
initialDelaySeconds: 30
periodSeconds: 20
timeoutSeconds: 15
failureThreshold: 6
readinessProbe:
httpGet:
path: /minio/health/ready
port: 9000
initialDelaySeconds: 15
periodSeconds: 10
timeoutSeconds: 10
failureThreshold: 3
volumeMounts:
- name: storage
mountPath: /data
volumes:
- name: storage
hostPath:
path: /home/docker/data/minio
type: DirectoryOrCreate
restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
name: minio-console
namespace: minio
spec:
selector:
app: minio
type: NodePort
ports:
- name: console
port: 9001
targetPort: 9001
nodePort: 30901
---
apiVersion: v1
kind: Service
metadata:
name: minio-api
namespace: minio
spec:
selector:
app: minio
type: ClusterIP
ports:
- name: api
port: 9000
targetPort: 9000
---
apiVersion: batch/v1
kind: Job
metadata:
name: minio-create-bucket
namespace: minio
spec:
backoffLimit: 6
completions: 1
template:
metadata:
labels:
job: minio-create-bucket
spec:
initContainers:
- name: wait-for-minio
image: busybox
command:
- sh
- -c
- |
until nc -z minio-api.minio.svc.cluster.local 9000; do
echo "Waiting for MinIO..."
sleep 2
done
echo "MinIO is ready!"
containers:
- name: minio-create-buckets
image: minio/mc
command:
- sh
- -c
- |
mc alias set minio http://minio-api.minio.svc.cluster.local:9000 minio_user minio_password &&
for bucket in mlflow dbt sqlmesh ray; do
if ! mc ls minio/$bucket >/dev/null 2>&1; then
echo "Creating bucket: $bucket"
mc mb minio/$bucket
echo "Bucket created: $bucket"
else
echo "Bucket already exists: $bucket"
fi
done
restartPolicy: OnFailure
terminationGracePeriodSeconds: 30
For other custom storage configuration, see here1 for more.
Get the Training Data¶
Since this is a demo project with a small dataset that fits into memory, we’ll use Pandas to read the CSV file directly through the configured filesystem.
If the dataset were larger or didn't fit in memory, we would use Ray Data instead. In the future, this could also integrate with Feast Offline Feature Server2 for more advanced feature management.
# Get Training Data
with fs.open_input_file("ray/training_data.csv") as f:
data = pd.read_csv(f)
# Alternative 1: Ray Data
# ds = ray.data.read_csv(
# "s3://ray/training_data.csv",
# filesystem=fs
# )
# Alternative 2: Feast
# now = datetime.now()
# two_days_ago = datetime.now() - timedelta(days=2)
# store = FeatureStore('.')
# fs_fraud_detection_v1 = store.get_feature_service('fraud_detection_v1')
# data = store.get_historical_features(
# entity_df=f"""
# select
# src_account as entity_id,
# timestamp as event_timestamp,
# is_fraud
# from
# feast-oss.fraud_tutorial.transactions
# where
# timestamp between timestamp('{two_days_ago.isoformat()}')
# and timestamp('{now.isoformat()}')""",
# features=fs_fraud_detection_v1,
# full_feature_names=False
# ).to_df()
Define the RunConfig¶
Next, we configure where Ray Tune stores its metadata by setting the storage_path
and storage_filesystem
fields in tune.RunConfig()
.
run_config = tune.RunConfig(
name=EXPERIMENT_NAME,
storage_path="ray/",
storage_filesystem=fs
)
Optuna Integration (TuneConfig)¶
Ray Tune supports OptunaSearch
3, which we’ll use to define the hyperparameter search strategy. A common way to define the search space is by passing a dictionary directly via the param_space
argument in tune.Tuner()
.
tuner = tune.Tuner(
objective,
tune_config=tune.TuneConfig(
metric="mean_loss",
mode="min",
search_alg=OptunaSearch(),
num_samples=1000,
),
param_space={
"steps": 100,
"width": tune.uniform(0, 20),
"height": tune.uniform(-100, 100),
"activation": tune.choice(["relu", "tanh"]),
},
)
results = tuner.fit()
Define the Search Space¶
Sometimes, we want a more flexible search space—especially one with conditional logic. In such cases, we can pass a define-by-run function to OptunaSearch()
, which dynamically defines the search space at runtime.3
This function, typically called space
, takes a trial
object as input. We use trial.suggest_*()
methods from Optuna, along with conditionals and loops, to construct the space.4
This setup is helpful for handling more complex scenarios—such as including or excluding hyperparameters based on earlier choices.
def space(trial) -> Optional[Dict[str, Any]]:
"""Define-by-run function to construct a conditional search space.
Ensure no actual computation takes place here.
Args:
trial: Optuna Trial object
Returns:
Dict containing constant parameters or None
"""
# Resampler
resampler = trial.suggest_categorical("resampler", ["allknn", "smote", "passthrough"])
# Booster
booster = trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"])
lmbd = trial.suggest_float("lambda", 1e-8, 1.0, log=True)
alpha = trial.suggest_float("alpha", 1e-8, 1.0, log=True)
if booster in ["gbtree", "dart"]:
max_depth = trial.suggest_int("max_depth", 3, 10)
eta = trial.suggest_float("eta", 1e-3, 0.3, log=True)
gamma = trial.suggest_float("gamma", 1e-8, 1.0, log=True)
grow_policy = trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
# Constants
return {
"objective": "binary:logistic",
"random_state": 1025
}
This space()
function defines a conditional hyperparameter search space using Optuna's define-by-run API. Instead of declaring all parameters upfront, the search space is built dynamically as the trial
runs. The function suggests different values for categorical and numerical hyperparameters, such as the resampler
method (allknn
5, smote
6, or passthrough
)5 and the booster
type (gbtree
, gblinear
, or dart
). Based on the chosen booster, additional parameters like max_depth
, eta
, and grow_policy
are conditionally added.
Importantly, no actual model training or heavy computation is done inside this function—it only defines the search space structure.3 The function returns a dictionary of constant parameters (like the learning objective
and random_state
) to be merged later with the sampled hyperparameters. This design keeps the search logic modular and clean, separating the definition of search space from the training logic.
Define the Search Algorithm¶
Now we configure the search algorithm using Optuna. We pass our space()
function into OptunaSearch
, specifying that we want to maximize the F1 score. To avoid exhausting system resources, we wrap it in a ConcurrencyLimiter
that restricts parallel trials to 4. Finally, the TuneConfig
object ties everything together, specifying the search algorithm and the total number of trials (num_samples=100
) to explore.
search_alg = OptunaSearch(space=space, metric="f1", mode="max")
search_alg = ConcurrencyLimiter(search_alg, max_concurrent=4)
tune_config = tune.TuneConfig(
search_alg=search_alg,
num_samples=100,
)
MLflow Integration¶
Ray offers built-in integration with MLflow through MLflowLoggerCallback
7 and setup_mlflow
7. These are convenient options, but they don't support parent-child runs8, which are essential for organizing experiments hierarchically. I've tried Databricks approach9 for setting up parent-child runs but it didn't work.
Thankfully, it's not difficult to manually integrate MLflow. So instead of using the built-in methods, we manually set up MLflow tracking inside the script. This integration spans multiple parts of the pipeline:
- Set up the tracking URI and experiment in the driver process.
- Start a parent run in the driver.
- Set up and log to MLflow from within each trial (i.e., in the worker process).
- After all trials finish, retrain the best model and log it under the parent run.
Set up the Tracking URI and the Experiment in the Driver Process¶
We begin by setting the experiment name, the run name for this tuning session, and the address of the MLflow tracking server running inside the Kubernetes cluster.
EXPERIMENT_NAME = 'fraud_detection'
RUN_NAME = 'first'
TRACKING_URI = "http://tracking-server.mlflow.svc.cluster.local:5000"
mlflow.set_tracking_uri(TRACKING_URI)
if mlflow.get_experiment_by_name(EXPERIMENT_NAME) == None:
mlflow.create_experiment(EXPERIMENT_NAME)
mlflow.set_experiment(EXPERIMENT_NAME)
These values should match what you specified when deploying MLflow on Kubernetes. For more details, refer to the configuration section below or revisit this article.
Info
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.trackingServer.name }}
namespace: {{ .Release.Namespace }}
spec:
type: NodePort
selector:
app: {{ .Values.trackingServer.name }}
ports:
- port: {{ .Values.trackingServer.port }}
targetPort: {{ .Values.trackingServer.port }}
nodePort: 30500
Start the Parent Run in the Driver Process¶
When we launch Tuner.fit()
, we also start an MLflow parent run inside the Ray driver process. Since each trial runs in a worker process, it won’t automatically inherit the MLflow context. So we need to explicitly pass the MLflow tracking URI, experiment name, and parent run ID into each worker so they can log their results correctly under the parent run.
Integrate with MLflow in the Worker Process¶
Each trial starts by configuring MLflow to point to the correct tracking server and parent run. Inside the trial, we begin a nested (child) run under the parent run. After training, we log hyperparameters and evaluation metrics, which will be associated with this specific trial.
Retrain and Save the Model with Best Params in the Driver Process¶
Once all trials finish, we return to the driver process, where we access the ResultGrid
. This object contains all trial results. We then select the best set of hyperparameters (e.g., the one with the highest F1 score), retrain the model with those parameters, and log the final model to MLflow under the original parent run.
Training Function (Trainable)¶
This is the training logic executed inside each worker process. Here's the typical workflow:
- Retrieve hyperparameters and the dataset (from
config
anddata
). - Split the dataset into training and validation sets.
- Set up a pipeline with the selected resampling method and booster.
- Train the model and log evaluation metrics and hyperparameters.
Run Ray Tune¶
Run the Hyperparameter Optimization¶
With the data ready, search space and Optuna strategy defined, and MLflow properly configured, we’re all set to launch Ray Tune via tune.Tuner()
.
# Run Ray Tune
ray.init()
with mlflow.start_run(run_name=RUN_NAME, nested=True) as run:
tuner = tune.Tuner(
tune.with_parameters(
training_function,
data=data,
run_id=run.info.run_id,
mlflow_tracking_uri=TRACKING_URI,
experiment_name=EXPERIMENT_NAME
),
tune_config=tune_config,
run_config=run_config
)
results = tuner.fit()
Retrain and Save the Model with the Best Hyperparameters¶
After tune.fit()
completes and all trials are evaluated, we move on to retraining the best model and logging it—just as explained in the previous section.
Once everything is in place, the next step is to submit the script to a Ray cluster. There are several ways to do that, and we’ll cover them in the next article.