Skip to content

Integrate Ray Tune with Optuna, Imblearn, MLflow and MinIO

This guide walks you through how to use Ray Tune for hyperparameter tuning in a fraud detection model. The workflow includes:

  1. Loading training data from MinIO
  2. Defining a search space with Optuna, using over-sampling and down-sampling techniques like AllKNN and SMOTE specifically in imblearn packages to handle class imbalance.
  3. Training an XGBoost binary classifier with boosters like gbtree, gblinear, and dart, and tuning hyperparameters such as lambda, alpha, and eta.
  4. Logging metrics including accuracy, precision, recall, F1, and ROC AUC to both Ray Tune and MLflow.
  5. Manually configuring MLflow to support parent-child runs, instead of using the default MLflowLoggerCallback and setup_mlflow
  6. Retraining and saving the best model with the optimal hyperparameters after tuning.

Here is a full training transcipt.

Full Training Script
training.py
# Import packages
import time
from datetime import datetime, timedelta
from pprint import pprint
from typing import Any, Dict, Optional

import pandas as pd
import pyarrow.fs
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline
from imblearn.under_sampling import AllKNN
from sklearn.metrics import (accuracy_score, average_precision_score, f1_score,
                             log_loss, precision_score, recall_score,
                             roc_auc_score)
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier

import mlflow
import ray
from feast import FeatureStore
from ray import tune
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.optuna import OptunaSearch


fs = pyarrow.fs.S3FileSystem(
    access_key="minio_user",
    secret_key="minio_password",
    scheme="http",
    endpoint_override="minio-api.minio.svc.cluster.local:9000"
)

# Get Training Data
with fs.open_input_file("ray/training_data.csv") as f:
    data = pd.read_csv(f)

# Alternative 1: Ray Data
# ds = ray.data.read_csv(
#     "s3://ray/training_data.csv",
#     filesystem=fs
# )

# Alternative 2: Feast
# now = datetime.now()
# two_days_ago = datetime.now() - timedelta(days=2)
# store = FeatureStore('.')
# fs_fraud_detection_v1 = store.get_feature_service('fraud_detection_v1')
# data = store.get_historical_features(
#     entity_df=f"""
#     select 
#         src_account as entity_id,
#         timestamp as event_timestamp,
#         is_fraud
#     from
#         feast-oss.fraud_tutorial.transactions
#     where
#         timestamp between timestamp('{two_days_ago.isoformat()}') 
#         and timestamp('{now.isoformat()}')""",
#     features=fs_fraud_detection_v1,
#     full_feature_names=False
# ).to_df()

# Configure Ray Tune
def space(trial) -> Optional[Dict[str, Any]]:
    """Define-by-run function to construct a conditional search space.
    Ensure no actual computation takes place here.

    Args:
        trial: Optuna Trial object

    Returns:
        Dict containing constant parameters or None
    """
    # Resampler
    resampler = trial.suggest_categorical("resampler", ["allknn", "smote", "passthrough"])

    # Booster
    booster = trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"])
    lmbd = trial.suggest_float("lambda", 1e-8, 1.0, log=True)
    alpha = trial.suggest_float("alpha", 1e-8, 1.0, log=True)
    if booster in ["gbtree", "dart"]:
        max_depth = trial.suggest_int("max_depth", 3, 10)
        eta = trial.suggest_float("eta", 1e-3, 0.3, log=True)
        gamma = trial.suggest_float("gamma", 1e-8, 1.0, log=True)
        grow_policy = trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
    # Constants
    return {
        "objective": "binary:logistic",
        "random_state": 1025
    }

def training_function(
    config, data,
    run_id, mlflow_tracking_uri, experiment_name
):
    # Set up mlflow 
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(experiment_name=experiment_name)

    # Split data
    X = data[[
        'has_fraud_7d',
        'num_transactions_7d',
        'credit_score',
        'account_age_days',
        'has_2fa_installed'
    ]]
    y = data['is_fraud']
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, stratify=y,
        random_state=config["random_state"]
    )

    # Define the resampler
    if config["resampler"] == "allknn":
        resampler = AllKNN()
    elif config["resampler"] == "smote":
        resampler = SMOTE()
    else:
        resampler = "passthrough"

    # Define the classifier
    new_config = {k: v for k, v in config.items() if k != "resampler"}
    classifier = XGBClassifier(**new_config)

    # Combine the resampler and classifier together
    model = Pipeline(steps=[
        ("resampler", resampler),
        ("classifier", classifier)
    ])

    # Train the model
    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(
            run_name=f"{config['resampler']}-{config['booster']}-{config['lambda']:.2f}-{config['alpha']:.2f}",
            nested=True
        ):
            model.fit(X_train, y_train)
            # Evaluate the model
            y_prob = model.predict_proba(X_test)
            y_prob = y_prob[:, 1]
            y_pred = (y_prob > 0.5).astype(int)
            metrics = {
                "accuracy": accuracy_score(y_test, y_pred),
                "precision": precision_score(y_test, y_pred),
                "recall": recall_score(y_test, y_pred),
                "f1": f1_score(y_test, y_pred),
                "roc_auc": roc_auc_score(y_test, y_prob),
                "log_loss": log_loss(y_test, y_prob),
                "average_precision": average_precision_score(y_test, y_prob)
            }
            # Log the metrics and hyperparameters
            mlflow.log_params(config)
            mlflow.log_metrics(metrics)
            tune.report(metrics)

search_alg = OptunaSearch(space=space, metric="f1", mode="max")
search_alg = ConcurrencyLimiter(search_alg, max_concurrent=4)
tune_config = tune.TuneConfig(
    search_alg=search_alg,
    num_samples=100,
)

EXPERIMENT_NAME = 'fraud_detection'
RUN_NAME = 'first'
TRACKING_URI = "http://tracking-server.mlflow.svc.cluster.local:5000"
mlflow.set_tracking_uri(TRACKING_URI)
if mlflow.get_experiment_by_name(EXPERIMENT_NAME) == None:
    mlflow.create_experiment(EXPERIMENT_NAME)
mlflow.set_experiment(EXPERIMENT_NAME)
run_config = tune.RunConfig(
    name=EXPERIMENT_NAME,
    storage_path="ray/",
    storage_filesystem=fs
)

# Run Ray Tune
ray.init()
with mlflow.start_run(run_name=RUN_NAME, nested=True) as run:
    tuner = tune.Tuner(
        tune.with_parameters(
            training_function,
            data=data,
            run_id=run.info.run_id,
            mlflow_tracking_uri=TRACKING_URI,
            experiment_name=EXPERIMENT_NAME
        ),
        tune_config=tune_config,
        run_config=run_config
    )
    results = tuner.fit()

    # Retrain the model with the hyperparameters with best result
    config = results.get_best_result(metric='f1', mode='max').config

    # 1. Split data
    X = data[[
        'has_fraud_7d',
        'num_transactions_7d',
        'credit_score',
        'account_age_days',
        'has_2fa_installed'
    ]]
    y = data['is_fraud']

    # 2. Define resampler
    if config["resampler"] == "allknn":
        resampler = AllKNN()
    elif config["resampler"] == "smote":
        resampler = SMOTE()
    else:
        resampler = "passthrough"

    # 3. Define classifier
    new_config = {k: v for k, v in config.items() if k != "resampler"}
    classifier = XGBClassifier(**new_config)

    # 4. Combine the resampler and classifier together
    model = Pipeline(steps=[
        ("resampler", resampler),
        ("classifier", classifier)
    ])

    # 5. Train and evaluate the model
    model.fit(X, y)
    y_prob = model.predict_proba(X)
    y_prob = y_prob[:, 1]
    y_pred = (y_prob > 0.5).astype(int)
    metrics = {
        "accuracy": accuracy_score(y, y_pred),
        "precision": precision_score(y, y_pred),
        "recall": recall_score(y, y_pred),
        "f1": f1_score(y, y_pred),
        "roc_auc": roc_auc_score(y, y_prob),
        "log_loss": log_loss(y, y_prob),
        "average_precision": average_precision_score(y, y_prob)
    }

    # 6. Log the hyperparameters, metrics and the model
    mlflow.log_params(config)
    mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="model",
        input_example=X.iloc[[0]],
        metadata={"version": f"{EXPERIMENT_NAME}_v1"}
    )

Import Packages

First, let’s import the required packages.

training.py
# Import packages
import time
from datetime import datetime, timedelta
from pprint import pprint
from typing import Any, Dict, Optional

import pandas as pd
import pyarrow.fs
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline
from imblearn.under_sampling import AllKNN
from sklearn.metrics import (accuracy_score, average_precision_score, f1_score,
                             log_loss, precision_score, recall_score,
                             roc_auc_score)
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier

import mlflow
import ray
from feast import FeatureStore
from ray import tune
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.optuna import OptunaSearch

MinIO Integration

We’ll use pyarrow.fs.S3FileSystem to interact with MinIO deployed on Kubernetes. There are two main tasks here.

  1. Load training data stored in MinIO.
  2. Save Ray Tune metadata (like checkpoints and logs) back to MinIO during the tuning process.

Here's how we configure the connection to MinIO using S3FileSystem, including the access key, secret key, and endpoint.

training.py
fs = pyarrow.fs.S3FileSystem(
    access_key="minio_user",
    secret_key="minio_password",
    scheme="http",
    endpoint_override="minio-api.minio.svc.cluster.local:9000"
)

These values should match what you specified when deploying MinIO on Kubernetes. For more details, refer to the configuration section below or revisit this article.

Info
minio.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: minio
  namespace: minio
spec:
  replicas: 1
  selector:
    matchLabels:
      app: minio
  template:
    metadata:
      labels:
        app: minio
    spec:
      containers:
        - name: minio
          image: minio/minio
          args:
            - server
            - /data
            - --console-address
            - :9001
          env:
            - name: MINIO_ROOT_USER
              value: minio_user
            - name: MINIO_ROOT_PASSWORD
              value: minio_password
          ports:
            - containerPort: 9000
              protocol: TCP
            - containerPort: 9001
              protocol: TCP
          livenessProbe:
            httpGet:
              path: /minio/health/live
              port: 9000
            initialDelaySeconds: 30
            periodSeconds: 20
            timeoutSeconds: 15
            failureThreshold: 6
          readinessProbe:
            httpGet:
              path: /minio/health/ready
              port: 9000
            initialDelaySeconds: 15
            periodSeconds: 10
            timeoutSeconds: 10
            failureThreshold: 3
          volumeMounts:
            - name: storage
              mountPath: /data
      volumes:
        - name: storage
          hostPath:
            path: /home/docker/data/minio
            type: DirectoryOrCreate
      restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
  name: minio-console
  namespace: minio
spec:
  selector:
    app: minio
  type: NodePort
  ports:
    - name: console
      port: 9001
      targetPort: 9001
      nodePort: 30901
---
apiVersion: v1
kind: Service
metadata:
  name: minio-api
  namespace: minio
spec:
  selector:
    app: minio
  type: ClusterIP
  ports:
    - name: api
      port: 9000
      targetPort: 9000
---
apiVersion: batch/v1
kind: Job
metadata:
  name: minio-create-bucket
  namespace: minio
spec:
  backoffLimit: 6
  completions: 1
  template:
    metadata:
      labels:
        job: minio-create-bucket
    spec:
      initContainers:
        - name: wait-for-minio
          image: busybox
          command:
            - sh
            - -c
            - |
              until nc -z minio-api.minio.svc.cluster.local 9000; do
                echo "Waiting for MinIO..."
                sleep 2
              done
              echo "MinIO is ready!"
      containers:
        - name: minio-create-buckets
          image: minio/mc
          command:
            - sh
            - -c
            - |
              mc alias set minio http://minio-api.minio.svc.cluster.local:9000 minio_user minio_password &&
              for bucket in mlflow dbt sqlmesh ray; do
                if ! mc ls minio/$bucket >/dev/null 2>&1; then
                  echo "Creating bucket: $bucket"
                  mc mb minio/$bucket
                  echo "Bucket created: $bucket"
                else
                  echo "Bucket already exists: $bucket"
                fi
              done
      restartPolicy: OnFailure
      terminationGracePeriodSeconds: 30

For other custom storage configuration, see here1 for more.

Get the Training Data

Since this is a demo project with a small dataset that fits into memory, we’ll use Pandas to read the CSV file directly through the configured filesystem.

If the dataset were larger or didn't fit in memory, we would use Ray Data instead. In the future, this could also integrate with Feast Offline Feature Server2 for more advanced feature management.

training.py
# Get Training Data
with fs.open_input_file("ray/training_data.csv") as f:
    data = pd.read_csv(f)

# Alternative 1: Ray Data
# ds = ray.data.read_csv(
#     "s3://ray/training_data.csv",
#     filesystem=fs
# )

# Alternative 2: Feast
# now = datetime.now()
# two_days_ago = datetime.now() - timedelta(days=2)
# store = FeatureStore('.')
# fs_fraud_detection_v1 = store.get_feature_service('fraud_detection_v1')
# data = store.get_historical_features(
#     entity_df=f"""
#     select 
#         src_account as entity_id,
#         timestamp as event_timestamp,
#         is_fraud
#     from
#         feast-oss.fraud_tutorial.transactions
#     where
#         timestamp between timestamp('{two_days_ago.isoformat()}') 
#         and timestamp('{now.isoformat()}')""",
#     features=fs_fraud_detection_v1,
#     full_feature_names=False
# ).to_df()

Define the RunConfig

Next, we configure where Ray Tune stores its metadata by setting the storage_path and storage_filesystem fields in tune.RunConfig().

training.py
run_config = tune.RunConfig(
    name=EXPERIMENT_NAME,
    storage_path="ray/",
    storage_filesystem=fs
)

Optuna Integration (TuneConfig)

Ray Tune supports OptunaSearch3, which we’ll use to define the hyperparameter search strategy. A common way to define the search space is by passing a dictionary directly via the param_space argument in tune.Tuner().

tuner = tune.Tuner(
    objective,
    tune_config=tune.TuneConfig(
        metric="mean_loss",
        mode="min",
        search_alg=OptunaSearch(),
        num_samples=1000,
    ),
    param_space={
        "steps": 100,
        "width": tune.uniform(0, 20),
        "height": tune.uniform(-100, 100),
        "activation": tune.choice(["relu", "tanh"]),        
    },
)
results = tuner.fit()

Define the Search Space

Sometimes, we want a more flexible search space—especially one with conditional logic. In such cases, we can pass a define-by-run function to OptunaSearch(), which dynamically defines the search space at runtime.3

This function, typically called space, takes a trial object as input. We use trial.suggest_*() methods from Optuna, along with conditionals and loops, to construct the space.4

This setup is helpful for handling more complex scenarios—such as including or excluding hyperparameters based on earlier choices.

training.py
def space(trial) -> Optional[Dict[str, Any]]:
    """Define-by-run function to construct a conditional search space.
    Ensure no actual computation takes place here.

    Args:
        trial: Optuna Trial object

    Returns:
        Dict containing constant parameters or None
    """
    # Resampler
    resampler = trial.suggest_categorical("resampler", ["allknn", "smote", "passthrough"])

    # Booster
    booster = trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"])
    lmbd = trial.suggest_float("lambda", 1e-8, 1.0, log=True)
    alpha = trial.suggest_float("alpha", 1e-8, 1.0, log=True)
    if booster in ["gbtree", "dart"]:
        max_depth = trial.suggest_int("max_depth", 3, 10)
        eta = trial.suggest_float("eta", 1e-3, 0.3, log=True)
        gamma = trial.suggest_float("gamma", 1e-8, 1.0, log=True)
        grow_policy = trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
    # Constants
    return {
        "objective": "binary:logistic",
        "random_state": 1025
    }

This space() function defines a conditional hyperparameter search space using Optuna's define-by-run API. Instead of declaring all parameters upfront, the search space is built dynamically as the trial runs. The function suggests different values for categorical and numerical hyperparameters, such as the resampler method (allknn5, smote6, or passthrough)5 and the booster type (gbtree, gblinear, or dart). Based on the chosen booster, additional parameters like max_depth, eta, and grow_policy are conditionally added.

Importantly, no actual model training or heavy computation is done inside this function—it only defines the search space structure.3 The function returns a dictionary of constant parameters (like the learning objective and random_state) to be merged later with the sampled hyperparameters. This design keeps the search logic modular and clean, separating the definition of search space from the training logic.

Define the Search Algorithm

Now we configure the search algorithm using Optuna. We pass our space() function into OptunaSearch, specifying that we want to maximize the F1 score. To avoid exhausting system resources, we wrap it in a ConcurrencyLimiter that restricts parallel trials to 4. Finally, the TuneConfig object ties everything together, specifying the search algorithm and the total number of trials (num_samples=100) to explore.

training.py
search_alg = OptunaSearch(space=space, metric="f1", mode="max")
search_alg = ConcurrencyLimiter(search_alg, max_concurrent=4)
tune_config = tune.TuneConfig(
    search_alg=search_alg,
    num_samples=100,
)

MLflow Integration

Ray offers built-in integration with MLflow through MLflowLoggerCallback7 and setup_mlflow7. These are convenient options, but they don't support parent-child runs8, which are essential for organizing experiments hierarchically. I've tried Databricks approach9 for setting up parent-child runs but it didn't work.

Thankfully, it's not difficult to manually integrate MLflow. So instead of using the built-in methods, we manually set up MLflow tracking inside the script. This integration spans multiple parts of the pipeline:

  1. Set up the tracking URI and experiment in the driver process.
  2. Start a parent run in the driver.
  3. Set up and log to MLflow from within each trial (i.e., in the worker process).
  4. After all trials finish, retrain the best model and log it under the parent run.

Set up the Tracking URI and the Experiment in the Driver Process

We begin by setting the experiment name, the run name for this tuning session, and the address of the MLflow tracking server running inside the Kubernetes cluster.

training.py
EXPERIMENT_NAME = 'fraud_detection'
RUN_NAME = 'first'
TRACKING_URI = "http://tracking-server.mlflow.svc.cluster.local:5000"
training.py
mlflow.set_tracking_uri(TRACKING_URI)
if mlflow.get_experiment_by_name(EXPERIMENT_NAME) == None:
    mlflow.create_experiment(EXPERIMENT_NAME)
mlflow.set_experiment(EXPERIMENT_NAME)

These values should match what you specified when deploying MLflow on Kubernetes. For more details, refer to the configuration section below or revisit this article.

Info
tracking-server.yaml
apiVersion: v1
kind: Service
metadata:
  name: {{ .Values.trackingServer.name }}
  namespace: {{ .Release.Namespace }}
spec:
  type: NodePort
  selector:
    app: {{ .Values.trackingServer.name }}
  ports:
    - port: {{ .Values.trackingServer.port }}
      targetPort: {{ .Values.trackingServer.port }}
      nodePort: 30500
values.yaml
trackingServer:
  name: tracking-server
  host: 0.0.0.0
  port: 5000

Start the Parent Run in the Driver Process

When we launch Tuner.fit(), we also start an MLflow parent run inside the Ray driver process. Since each trial runs in a worker process, it won’t automatically inherit the MLflow context. So we need to explicitly pass the MLflow tracking URI, experiment name, and parent run ID into each worker so they can log their results correctly under the parent run.

training.py
# Run Ray Tune
ray.init()
with mlflow.start_run(run_name=RUN_NAME, nested=True) as run:
    tuner = tune.Tuner(
        tune.with_parameters(
            training_function,
            data=data,
            run_id=run.info.run_id,
            mlflow_tracking_uri=TRACKING_URI,
            experiment_name=EXPERIMENT_NAME
        ),
        tune_config=tune_config,
        run_config=run_config
    )
    results = tuner.fit()

Integrate with MLflow in the Worker Process

Each trial starts by configuring MLflow to point to the correct tracking server and parent run. Inside the trial, we begin a nested (child) run under the parent run. After training, we log hyperparameters and evaluation metrics, which will be associated with this specific trial.

training.py
def training_function(
    config, data,
    run_id, mlflow_tracking_uri, experiment_name
):
    # Set up mlflow 
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(experiment_name=experiment_name)

    # Split data
    X = data[[
        'has_fraud_7d',
        'num_transactions_7d',
        'credit_score',
        'account_age_days',
        'has_2fa_installed'
    ]]
    y = data['is_fraud']
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, stratify=y,
        random_state=config["random_state"]
    )

    # Define the resampler
    if config["resampler"] == "allknn":
        resampler = AllKNN()
    elif config["resampler"] == "smote":
        resampler = SMOTE()
    else:
        resampler = "passthrough"

    # Define the classifier
    new_config = {k: v for k, v in config.items() if k != "resampler"}
    classifier = XGBClassifier(**new_config)

    # Combine the resampler and classifier together
    model = Pipeline(steps=[
        ("resampler", resampler),
        ("classifier", classifier)
    ])

    # Train the model
    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(
            run_name=f"{config['resampler']}-{config['booster']}-{config['lambda']:.2f}-{config['alpha']:.2f}",
            nested=True
        ):
            model.fit(X_train, y_train)
            # Evaluate the model
            y_prob = model.predict_proba(X_test)
            y_prob = y_prob[:, 1]
            y_pred = (y_prob > 0.5).astype(int)
            metrics = {
                "accuracy": accuracy_score(y_test, y_pred),
                "precision": precision_score(y_test, y_pred),
                "recall": recall_score(y_test, y_pred),
                "f1": f1_score(y_test, y_pred),
                "roc_auc": roc_auc_score(y_test, y_prob),
                "log_loss": log_loss(y_test, y_prob),
                "average_precision": average_precision_score(y_test, y_prob)
            }
            # Log the metrics and hyperparameters
            mlflow.log_params(config)
            mlflow.log_metrics(metrics)
            tune.report(metrics)

Retrain and Save the Model with Best Params in the Driver Process

Once all trials finish, we return to the driver process, where we access the ResultGrid. This object contains all trial results. We then select the best set of hyperparameters (e.g., the one with the highest F1 score), retrain the model with those parameters, and log the final model to MLflow under the original parent run.

training.py
    # Retrain the model with the hyperparameters with best result
    config = results.get_best_result(metric='f1', mode='max').config

    # 1. Split data
    X = data[[
        'has_fraud_7d',
        'num_transactions_7d',
        'credit_score',
        'account_age_days',
        'has_2fa_installed'
    ]]
    y = data['is_fraud']

    # 2. Define resampler
    if config["resampler"] == "allknn":
        resampler = AllKNN()
    elif config["resampler"] == "smote":
        resampler = SMOTE()
    else:
        resampler = "passthrough"

    # 3. Define classifier
    new_config = {k: v for k, v in config.items() if k != "resampler"}
    classifier = XGBClassifier(**new_config)

    # 4. Combine the resampler and classifier together
    model = Pipeline(steps=[
        ("resampler", resampler),
        ("classifier", classifier)
    ])

    # 5. Train and evaluate the model
    model.fit(X, y)
    y_prob = model.predict_proba(X)
    y_prob = y_prob[:, 1]
    y_pred = (y_prob > 0.5).astype(int)
    metrics = {
        "accuracy": accuracy_score(y, y_pred),
        "precision": precision_score(y, y_pred),
        "recall": recall_score(y, y_pred),
        "f1": f1_score(y, y_pred),
        "roc_auc": roc_auc_score(y, y_prob),
        "log_loss": log_loss(y, y_prob),
        "average_precision": average_precision_score(y, y_prob)
    }

    # 6. Log the hyperparameters, metrics and the model
    mlflow.log_params(config)
    mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="model",
        input_example=X.iloc[[0]],
        metadata={"version": f"{EXPERIMENT_NAME}_v1"}
    )

Training Function (Trainable)

This is the training logic executed inside each worker process. Here's the typical workflow:

training.py
def training_function(
    config, data,
    run_id, mlflow_tracking_uri, experiment_name
):
    # Set up mlflow 
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(experiment_name=experiment_name)

    # Split data
    X = data[[
        'has_fraud_7d',
        'num_transactions_7d',
        'credit_score',
        'account_age_days',
        'has_2fa_installed'
    ]]
    y = data['is_fraud']
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, stratify=y,
        random_state=config["random_state"]
    )

    # Define the resampler
    if config["resampler"] == "allknn":
        resampler = AllKNN()
    elif config["resampler"] == "smote":
        resampler = SMOTE()
    else:
        resampler = "passthrough"

    # Define the classifier
    new_config = {k: v for k, v in config.items() if k != "resampler"}
    classifier = XGBClassifier(**new_config)

    # Combine the resampler and classifier together
    model = Pipeline(steps=[
        ("resampler", resampler),
        ("classifier", classifier)
    ])

    # Train the model
    with mlflow.start_run(run_id=run_id):
        with mlflow.start_run(
            run_name=f"{config['resampler']}-{config['booster']}-{config['lambda']:.2f}-{config['alpha']:.2f}",
            nested=True
        ):
            model.fit(X_train, y_train)
            # Evaluate the model
            y_prob = model.predict_proba(X_test)
            y_prob = y_prob[:, 1]
            y_pred = (y_prob > 0.5).astype(int)
            metrics = {
                "accuracy": accuracy_score(y_test, y_pred),
                "precision": precision_score(y_test, y_pred),
                "recall": recall_score(y_test, y_pred),
                "f1": f1_score(y_test, y_pred),
                "roc_auc": roc_auc_score(y_test, y_prob),
                "log_loss": log_loss(y_test, y_prob),
                "average_precision": average_precision_score(y_test, y_prob)
            }
            # Log the metrics and hyperparameters
            mlflow.log_params(config)
            mlflow.log_metrics(metrics)
            tune.report(metrics)
  1. Retrieve hyperparameters and the dataset (from config and data).
  2. Split the dataset into training and validation sets.
  3. Set up a pipeline with the selected resampling method and booster.
  4. Train the model and log evaluation metrics and hyperparameters.

Run Ray Tune

Run the Hyperparameter Optimization

With the data ready, search space and Optuna strategy defined, and MLflow properly configured, we’re all set to launch Ray Tune via tune.Tuner().

training.py
# Run Ray Tune
ray.init()
with mlflow.start_run(run_name=RUN_NAME, nested=True) as run:
    tuner = tune.Tuner(
        tune.with_parameters(
            training_function,
            data=data,
            run_id=run.info.run_id,
            mlflow_tracking_uri=TRACKING_URI,
            experiment_name=EXPERIMENT_NAME
        ),
        tune_config=tune_config,
        run_config=run_config
    )
    results = tuner.fit()

Retrain and Save the Model with the Best Hyperparameters

After tune.fit() completes and all trials are evaluated, we move on to retraining the best model and logging it—just as explained in the previous section.

Once everything is in place, the next step is to submit the script to a Ray cluster. There are several ways to do that, and we’ll cover them in the next article.