Skip to content

Write a Custom KServe Transformer with Feast Integration

This guide demonstrates how to integrate Feast with KServe, enabling feature retrieval and transformation during inference.

Extend the KServe Model Class

KServe.Model base class mainly defines three handlers preprocess, predict and postprocess, these handlers are executed in sequence where the output of the preprocess handler is passed to the predict handler as the input.

To create a custom transformer that retrieves features from Feast, you will extend the KServe.Model class and implement the preprocess method. This method will dynamically fetch features based on the input data and append them to the inference request.

feast_transformer.py
class FeastTransformer(kserve.Model):
    """A class object for the data handling activities of driver ranking
    Task and returns a KServe compatible response.

    Args:
        kserve (class object): The Model class from the KServe
        module is passed here.
    """

    def __init__(
        self,
        feast_url: str,
        feast_entity_id: str,
        feature_service: str,
        model_name: str,
        predictor_host: str,
        predictor_protocol: str,
    ):
        """Initialize the model name, predictor host, Feast serving URL,
           entity IDs, and feature references

        Args:
            feast_url (str): URL for the Feast online feature server, in the name of <host_name>:<port>.
            feast_entity_id (str): Name of the entity ID key for feature store lookups.
            feature_service (str): Name of the feature service to retrieve from the feature store.
            model_name (str): Name of the model used on the endpoint path (default is "model").
            predictor_host (str): Hostname for calling the predictor from the transformer (default is None).
            predictor_protocol (str): Inference protocol for the predictor (default is "v1").
        """
        super().__init__(
            name=model_name,
            predictor_config=PredictorConfig(
                predictor_host=predictor_host,
                predictor_protocol=predictor_protocol,
            )
        )
        self.feast_url = feast_url
        self.feast_entity_id = feast_entity_id
        self.feature_service = feature_service

Logic of the preprocess() Method

The preprocess() method plays a pivotal role in the inference pipeline by preparing the input data for prediction. It begins by parsing the payload to extract entity IDs (extract_entity_ids()), which are essential for querying the Feast feature store.

Once the entity IDs are identified, the method sends a request to the Feast online feature server to fetch the corresponding features.

Finally, it processes the response from Feast, transforming the retrieved features into a format that aligns with the requirements of the predictor (create_inference_request). This ensures a seamless flow of data, enabling accurate and efficient predictions.

Since the transformer is designed to handle multiple protocols, the preprocess() method checks the request type and processes it accordingly. It supports REST v1, REST v2, and gRPC protocols, ensuring compatibility with various KServe deployments. Additionally, the method processes the response received from Feast based on the protocol used by the predictor. By tailoring both the incoming request handling and the outgoing response formatting, this Feast transformer becomes a highly versatile and adaptable component within the KServe ecosystem.

feast_transformer.py
    def preprocess(
        self, payload: Union[Dict, InferRequest], headers: Dict[str, str] = None
    ) -> Union[Dict, InferRequest]:
        """Pre-process activity of the driver input data.

        Args:
            payload (Dict|InferRequest): Body of the request, v2 endpoints pass InferRequest.
            headers (Dict): Request headers.

        Returns:
            output (Dict|InferRequest): Transformed payload to ``predict`` handler or return InferRequest for predictor call.
        """
        logger.info(f"Headers: {headers}")
        logger.info(f"Type of payload: {type(payload)}")
        logger.info(f"Payload: {payload}")

        # Prepare and send a request for the Feast online feature server
        entites = self.extract_entity_ids(payload)
        logger.info(f"Extracted entities: {entites}")
        feast_response = requests.post(
            f"{self.feast_url}/get-online-features",
            data=json.dumps({
                "feature_service": self.feature_service,
                "entities": entites
            }),
            headers={
                "Content-Type": "application/json",
                "Accept": "application/json"
            }
        )

        # Parse the response from the Feast online feature server
        if feast_response.status_code != 200:
            logger.error(
                f"Error fetching features from Feast: {feast_results}")
            raise Exception(
                f"Error fetching features from Feast: {feast_results}")
        feast_results = feast_response.json()
        logger.info(f"Feast results: {feast_results}")

        output = self.create_inference_request(feast_results)
        logger.info(f"Type of output: {type(output)}")
        logger.info(f"Output of preprocess: {output}")
        return output
feast_transformer.py
    def extract_entity_ids(self, payload: Union[Dict, InferRequest]) -> Dict:
        """Extract entity IDs from the input payload.

        This method processes the input payload to extract entity IDs based on the 
        protocol (REST v1, REST v2, or gRPC v2) and returns them in a dictionary format.

        Args:
            payload (Dict|InferRequest): The input payload containing entity IDs.

        Returns:
            entites (Dict): A dictionary with the extracted entity IDs. For example:
            {
            "entity_id": ["v5zlw0", "000q95"]
            }
        """
        # The protocol here refers to the protocol used by the transformer deployment
        # v2
        if isinstance(payload, InferRequest):
            infer_input = payload.inputs[0]
            entity_ids = [
                # Decode each element based on the protocol: gRPC uses raw bytes, REST uses base64-encoded strings
                d.decode(
                    'utf-8') if payload.from_grpc else base64.b64decode(d).decode('utf-8')
                for d in infer_input.data
            ]
        # REST v1, type(payload) = Dict
        else:
            entity_ids = [
                instance[self.feast_entity_id]
                for instance in payload["instances"]
            ]

        return {self.feast_entity_id: entity_ids}
feast_transformer.py
    def create_inference_request(self, feast_results: Dict) -> Union[Dict, InferRequest]:
        """Create the inference request for all entities and return it as a dict.

        Args:
            feast_results (Dict): entity feast_results extracted from the feature store

        Returns:
            output (Dict|InferRequest): Returns the entity ids with feast_results
        """
        feature_names = feast_results["metadata"]["feature_names"]
        results = feast_results["results"]
        num_datapoints = len(results[0]["values"])
        num_features = len(feature_names)

        # for v1 predictor protocol, we can directly pass the dict
        if self.protocol == PredictorProtocol.REST_V1.value:
            output = {
                "instances": [
                    {
                        feature_names[j]: results[j]['values'][i] for j in range(num_features) if feature_names[j] != "entity_id"
                    }
                    for i in range(num_datapoints)
                ]
            }
        # for v2 predictor protocol, we need to build an InferRequest
        else:
            # TODO: find a way to not hardcode the data types
            type_map = {
                "has_fraud_7d": "BOOL",
                "num_transactions_7d": "INT64",
                "credit_score": "INT64",
                "account_age_days": "INT64",
                "has_2fa_installed": "BOOL",
            }
            map_datatype = lambda feature_name: type_map.get(feature_name, "BYTES")

            output = InferRequest(
                model_name=self.name,
                parameters={
                    "content-type": "pd"
                },
                infer_inputs=[
                    InferInput(
                        name=feature_names[j],
                        datatype=map_datatype(feature_names[j]),
                        shape=[num_datapoints],
                        data=[
                            results[j]["values"][i]
                            for i in range(num_datapoints)
                        ]
                    )
                    for j in range(num_features)
                    if feature_names[j] != "entity_id"
                ]
            )

        return output

Package the Custom Transformer

Packaging the custom transformer is essential to ensure it can be easily deployed and reused across different environments. By organizing the code into a well-defined structure and containerizing it, we create a portable and self-contained solution that can be seamlessly integrated into any KServe deployment. This approach not only simplifies dependency management but also ensures consistency and reliability, making it easier to scale and maintain the transformer in production.

First, create a directory structure for your custom transformer:

.
├── __init__.py
├── __main__.py
└── feast_transformer.py

Then, extend the __main__.py file to include custom arguments that allow us to easily inject Feast-related information into the KServe transformer. This makes it convenient to specify details like the Feast URL, entity ID, and feature service directly in the inference service YAML file. By doing so, we ensure that the transformer remains flexible and configurable, adapting seamlessly to different deployment environments.

__main__.py
parser = argparse.ArgumentParser(parents=[kserve.model_server.parser])
parser.add_argument(
    "--feast_url",
    type=str,
    help="URL for the Feast online feature server, in the name of <host_name>:<port>",
    required=True,
)
parser.add_argument(
    "--feast_entity_id",
    type=str,
    help="Name of the entity ID key for feature store lookups.",
    required=True,
)
parser.add_argument(
    "--feature_service",
    type=str,
    help="Name of the feature service to retrieve from the feature store.",
    required=True,
)

When you run the transformer, you can specify the Feast URL, entity ID, and feature service as command-line arguments. This allows the transformer to dynamically connect to the Feast feature store and retrieve the necessary features for inference.

python -m feast_transformer \
  --feast_url http://example.com:6565 \
  --feast_entity_id entity_id \
  --feature-service fraud_detection_v1

After the arguments are parsed, the main() function initializes the KServe model with the provided Feast configuration and starts the model server.

__main__.py
if __name__ == "__main__":
    if args.configure_logging:
        logging.configure_logging(args.log_config_file)
    transformer = FeastTransformer(
        feast_url=args.feast_url,
        feast_entity_id=args.feast_entity_id,
        feature_service=args.feature_service,
        model_name=args.model_name,
        predictor_host=args.predictor_host,
        predictor_protocol=args.predictor_protocol,
    )
    server = kserve.ModelServer()
    server.start(models=[transformer])

Here's a sequence diagram when we start the KServe Model Server:

sequenceDiagram
    participant User as User
    participant ModelServer as ModelServer
    participant ModelRepo as Model Repository
    participant RESTServer as REST Server
    participant GRPCServer as GRPC Server
    participant EventLoop as Event Loop
    participant SignalHandler as Signal Handler

    User->>ModelServer: Instantiate ModelServer
    ModelServer->>ModelRepo: Initialize Model Repository
    ModelServer->>EventLoop: Setup Event Loop
    ModelServer->>SignalHandler: Register Signal Handlers

    User->>ModelServer: Call start(models)
    ModelServer->>ModelRepo: Register Models
    alt At least one model is ready
        ModelServer->>RESTServer: Start REST Server
        RESTServer-->>ModelServer: REST Server Running
        ModelServer->>GRPCServer: Start GRPC Server
        GRPCServer-->>ModelServer: GRPC Server Running
    else No models are ready
        ModelServer-->>User: Raise NoModelReady Exception
    end
    ModelServer->>EventLoop: Run Event Loop

    User->>ModelServer: Call stop(sig)
    ModelServer->>RESTServer: Stop REST Server
    RESTServer-->>ModelServer: REST Server Stopped
    ModelServer->>GRPCServer: Stop GRPC Server
    GRPCServer-->>ModelServer: GRPC Server Stopped
    ModelServer->>ModelRepo: Unload Models
    ModelRepo-->>ModelServer: Models Unloaded
    ModelServer-->>User: Server Stopped

Containerize the Transformer

kserve/docker
├── README.md
├── pyproject.toml
├── uv.lock
├── Dockerfile
├── .dockerignore
├── feast_transformer
│   ├── __init__.py
│   ├── __main__.py
└   └── feast_transformer.py

First, create a pyproject.toml file to define the dependencies for your custom transformer. This file will be used by the uv package manager to install the necessary libraries.

pyproject.toml
[project]
name = "feast-transformer"
version = "0.1.0"
description = "Feast Transformer for KServe"
requires-python = "~=3.11"
authors = [
    { email = "kuanchoulai10@gmail.com" }
]
readme = "README.md"

dependencies = [
    "kserve==0.15.1",
    "requests>=2.22.0",
    "numpy>=1.16.3"
]


[build-system]
requires = ["setuptools>=80.0.0"]
build-backend = "setuptools.build_meta"

Next, use the uv package manager to install the dependencies and generate a lock file. The lock file ensures that the same versions of the dependencies are used across different environments, providing consistency and reliability.

uv lock

Then, create a Dockerfile to build the custom transformer image. This Dockerfile uses a multi-stage build process to ensure that the final image is lightweight and contains only the necessary components.

Dockerfile
# Stage 1: Dependency installation using uv
# https://docs.astral.sh/uv/guides/integration/docker/#available-images
# https://github.com/astral-sh/uv-docker-example/blob/main/multistage.Dockerfile
FROM ghcr.io/astral-sh/uv:python3.11-bookworm-slim AS builder

ENV UV_COMPILE_BYTECODE=1 UV_LINK_MODE=copy UV_PYTHON_DOWNLOADS=0

WORKDIR /app

# Install build tools required for psutil
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc python3-dev && \
    apt-get clean && rm -rf /var/lib/apt/lists/*


RUN --mount=type=cache,target=/root/.cache/uv \
    --mount=type=bind,source=uv.lock,target=uv.lock \
    --mount=type=bind,source=pyproject.toml,target=pyproject.toml \
    uv sync --locked --no-install-project --no-dev
COPY . /app
RUN --mount=type=cache,target=/root/.cache/uv \
    uv sync --locked --no-dev

# Stage 2: Final image
FROM python:3.11-slim-bookworm

WORKDIR /app

COPY --from=builder --chown=app:app /app /app

ENV PATH="/app/.venv/bin:$PATH"

# Set the entry point for the container
ENTRYPOINT ["python", "-m", "feast_transformer"]

The first stage (builder) uses the uv package manager to install all dependencies defined in the pyproject.toml file. It also caches dependencies to speed up subsequent builds and compiles Python bytecode for better performance. Additionally, it installs build tools like gcc and python3-dev to handle dependencies requiring compilation, such as psutil.

The uv sync command is run twice in this stage. The first uv sync installs only the dependencies without the project files, ensuring that the dependency installation is cached and reused across builds. The second uv sync installs the project files and finalizes the environment. This two-step process minimizes rebuild times by leveraging cached dependencies while still ensuring the application code is up-to-date.

The second stage creates the final image. It copies the application and its dependencies from the builder stage into a minimal Python base image. This ensures the final image is small and optimized for production use. The PATH environment variable is updated to include the virtual environment created by uv, and the entry point is set to run the transformer using the python -m feast_transformer command.

This approach ensures a clean, efficient, and portable container image for deploying the Feast transformer.

Finally, build the Docker image and load it into Minikube. This step is crucial for deploying the transformer in a local Kubernetes environment, allowing you to test and validate the integration with KServe and Feast.

docker buildx build \
  --platform linux/amd64 \
  -t feast-transformer:v0.1.0 \
  .
minikube image load feast-transformer:v0.1.0