Skip to main content

Overview

Plugins are the core processing units in Mixpeek. They define how your data is transformed, embedded, and indexed. Mixpeek provides two types of plugins:
  • Builtin plugins: Pre-built extractors maintained by Mixpeek (text, image, video, document, etc.)
  • Custom plugins: Your own extraction logic running on Mixpeek infrastructure
Deployment modes:
ModeDescriptionUse Case
Batch processingHigh-throughput Ray Data pipelinesProcessing collections, indexing documents
Real-time inferenceRay Serve HTTP endpointsLive API requests, synchronous embedding

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                        Plugin System                            │
├──────────────────────────┬──────────────────────────────────────┤
│   Builtin Plugins        │   Custom Plugins                     │
│   (engine/plugins/)      │   (S3 uploaded archives)             │
├──────────────────────────┴──────────────────────────────────────┤
│                     Pipeline Builder                            │
│   - Declarative step definitions                                │
│   - Resource allocation (CPU/GPU/API)                           │
│   - Content-type filtering                                      │
├─────────────────────────────────────────────────────────────────┤
│              Ray Data (Batch)    │    Ray Serve (Real-time)     │
│   - map_batches() processing     │    - HTTP deployment         │
│   - DataFrame input/output       │    - Auto-scaling            │
│   - Parallel execution           │    - Load balancing          │
├─────────────────────────────────────────────────────────────────┤
│                      Model Registry                             │
│   - HuggingFace models (cluster-cached)                         │
│   - Custom S3 models (namespace-scoped)                         │
│   - Lazy loading (on-demand)                                    │
└─────────────────────────────────────────────────────────────────┘

Quick Start: Custom Plugin

1. Create Plugin Structure

my_extractor/
├── manifest.py      # Schemas + metadata
├── pipeline.py      # Batch processing pipeline
├── realtime.py      # HTTP endpoint (optional, Enterprise)
└── processors/
    └── core.py      # Your processing logic

2. Define Your Plugin (manifest.py)

from pydantic import BaseModel, Field
from typing import List, Optional

# Input schema - what your plugin accepts
class MyInput(BaseModel):
    text: str = Field(..., description="Input text to process")

# Output schema - what your plugin produces
class MyOutput(BaseModel):
    embedding: List[float] = Field(..., description="384-dim embedding vector")
    sentiment: str = Field(..., description="positive/negative/neutral")

# Parameters - user-configurable options
class MyParams(BaseModel):
    threshold: float = Field(default=0.5, ge=0, le=1, description="Confidence threshold")
    model_size: str = Field(default="base", description="Model size: base or large")

# Plugin metadata
metadata = {
    "feature_extractor_name": "my_extractor",
    "version": "1.0.0",
    "description": "Custom text embedding with sentiment analysis",
    "category": "text",
}

input_schema = MyInput
output_schema = MyOutput
parameter_schema = MyParams
supported_input_types = ["text"]

# Vector index definitions for Qdrant
features = [
    {
        "feature_name": "my_embedding",
        "feature_type": "embedding",
        "embedding_dim": 384,
        "distance_metric": "cosine",
    },
]

3. Implement Batch Processing (processors/core.py)

from dataclasses import dataclass
import pandas as pd

@dataclass
class MyConfig:
    threshold: float = 0.5
    model_size: str = "base"

class MyProcessor:
    """Batch processor for Ray Data pipelines."""

    def __init__(self, config: MyConfig, progress_actor=None):
        self.config = config
        self._model = None  # Lazy loading

    def _ensure_model_loaded(self):
        """Load model on first batch (lazy loading)."""
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            model_name = "all-MiniLM-L6-v2" if self.config.model_size == "base" else "all-mpnet-base-v2"
            self._model = SentenceTransformer(model_name)

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        """Process a batch of rows."""
        self._ensure_model_loaded()

        # IMPORTANT: Custom plugins always read from the 'data' column,
        # NOT a column named after the blob property.
        # For text blobs, 'data' contains the raw string.
        # For binary blobs, 'data' contains an S3 URL.
        texts = batch["data"].fillna("").tolist()

        # Generate embeddings
        embeddings = self._model.encode(texts).tolist()
        batch["my_embedding"] = embeddings

        # Simple sentiment (replace with your logic)
        batch["sentiment"] = ["positive" if len(t) > 50 else "neutral" for t in texts]

        return batch

4. Wire Into Pipeline (pipeline.py)

from engine.plugins.extractors.pipeline import (
    PipelineDefinition,
    ResourceType,
    StepDefinition,
    build_pipeline_steps
)
from .manifest import MyParams, metadata
from .processors.core import MyConfig, MyProcessor

def build_steps(extractor_request, container=None, base_steps=None, **kwargs):
    """Build the extraction pipeline."""

    # Parse parameters from request
    params = MyParams(**(extractor_request.extractor_config.parameters or {}))

    # Create processor config
    config = MyConfig(
        threshold=params.threshold,
        model_size=params.model_size,
    )

    # Define pipeline steps
    pipeline = PipelineDefinition(
        name=metadata["feature_extractor_name"],
        version=metadata["version"],
        steps=[
            StepDefinition(
                service_class=MyProcessor,
                resource_type=ResourceType.CPU,  # or GPU, API
                config=config,
            ),
        ]
    )

    # Build Ray Data steps
    steps = build_pipeline_steps(pipeline)

    return {"steps": steps}

5. Add Real-time Endpoint (realtime.py) - Enterprise Only

from shared.plugins.inference.serve import BaseInferenceService

class InferenceService(BaseInferenceService):
    """Real-time HTTP inference endpoint."""

    def __init__(self):
        super().__init__()
        self._model = None

    async def __call__(self, inputs: dict, parameters: dict) -> dict:
        """Handle inference request."""
        # Lazy load model
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

        text = inputs.get("text", "")
        embedding = self._model.encode([text])[0].tolist()

        return {
            "embedding": embedding,
            "sentiment": "positive" if len(text) > 50 else "neutral"
        }

6. Upload and Deploy

# Package your plugin
zip -r my_extractor.zip my_extractor/

# Get upload URL
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads" \
  -H "Authorization: Bearer $API_KEY" \
  -d '{"name": "my_extractor", "version": "1.0.0"}'

# Upload
curl -X PUT "$PRESIGNED_URL" --data-binary @my_extractor.zip

# Confirm
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads/$UPLOAD_ID/confirm" \
  -H "Authorization: Bearer $API_KEY"

# Deploy for batch processing only (all tiers)
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy?deployment_type=batch_only" \
  -H "Authorization: Bearer $API_KEY"

# Deploy with real-time endpoint (Enterprise only)
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy" \
  -H "Authorization: Bearer $API_KEY"
Batch vs Realtime deployment: All account tiers can deploy plugins for batch processing using ?deployment_type=batch_only. Realtime inference endpoints require Enterprise tier with dedicated infrastructure. If you get a 403 error, add ?deployment_type=batch_only to the deploy URL.
feature_uri and realtime.py: If your manifest includes a feature_uri (e.g., mixpeek://my_plugin@1.0.0/my_embedding), the system expects a corresponding realtime.py for query-time inference. If you only need batch processing, you can omit both feature_uri and realtime.py. If you include feature_uri without realtime.py, retriever feature_search queries against that URI will fail.

Manifest Features Format

The features list in manifest.py is how Mixpeek knows which Qdrant vector indexes to create for your plugin. Use the exact key names below — using the wrong names silently creates a collection with no vector indexes.
The wrong key names are a common pitfall. They produce no error at upload time but result in 0 documents being indexed.
# ✅ CORRECT — these exact key names are required
features = [
    {
        "feature_type": "embedding",       # must be "embedding"
        "feature_name": "my_embedding",    # the vector index name
        "embedding_dim": 768,              # vector dimensionality
        "distance_metric": "cosine",       # "cosine", "euclid", or "dot"
    },
]

# ❌ WRONG — intuitive but silently ignored (0 vectors created)
features = [
    {
        "type": "vector",          # wrong key: use feature_type
        "name": "my_embedding",    # wrong key: use feature_name
        "dimensions": 768,         # wrong key: use embedding_dim
        "distance": "cosine",      # wrong key: use distance_metric
    },
]
Multiple vectors are supported — add one entry per embedding your plugin produces:
features = [
    {
        "feature_type": "embedding",
        "feature_name": "visual_embedding",
        "embedding_dim": 768,
        "distance_metric": "cosine",
    },
    {
        "feature_type": "embedding",
        "feature_name": "semantic_embedding",
        "embedding_dim": 384,
        "distance_metric": "cosine",
    },
]

DataFrame Schema Reference

Your plugin’s __call__(batch: pd.DataFrame) receives a DataFrame with the following columns:
ColumnTypeDescription
datastrFor binary blobs: S3 URL (e.g. s3://bucket/org_.../file.jpg). For text blobs: raw string content. Never raw bytes.
document_idstrUnique document ID (e.g. doc_abc123)
object_idstrSource object ID in the bucket
blob_idstrBlob identifier within the object
blob_propertystrProperty name that produced this blob (matches your bucket schema key, e.g. "content")
blob_typestrAsset type: "image", "video", "audio", "text"
collection_idstrCollection that triggered processing
feature_extractor_idstrID of the feature extractor
mime_typestrMIME type (e.g. image/jpeg, text/plain)
size_bytesintFile size in bytes (0 for inline text)
input_mappings does NOT rename columns. The data column is always data regardless of any input_mappings configuration in manifest.py. input_mappings is metadata only — it does not transform the DataFrame. Always read from batch["data"].

Loading Assets from S3 URLs

The data column contains s3:// URLs, not raw bytes. Use the stable Plugin SDK to resolve them to local paths. Download all assets in parallel, then batch-process:
from shared.plugins import open_asset, download_asset
from shared.plugins.sdk import parallel_io

class MyImageProcessor:
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        # Step 1: Download ALL images in parallel (I/O-bound)
        def load_image(url):
            from PIL import Image
            path, is_temp = download_asset(url, suffix=".jpg")
            img = Image.open(path).convert("RGB")
            if is_temp:
                import os
                os.unlink(path)
            return img

        images = parallel_io(
            items=batch["data"].tolist(),
            process_fn=load_image,
            max_workers=8,
        )

        # Step 2: Batch-process all images at once (GPU-bound)
        tensors = [self._preprocess(images[i]) for i in sorted(images)]
        embeddings = self._model(torch.stack(tensors))  # Single GPU call
        batch["embedding"] = embeddings.tolist()

        return batch
For a single asset (e.g., in a context manager), use open_asset:
from shared.plugins import open_asset

with open_asset(url, suffix=".jpg") as local_path:
    img = Image.open(local_path)
For text blobs, the data column contains the raw string — no download needed:
class MyTextProcessor:
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        texts = batch["data"].fillna("").tolist()
        batch["embedding"] = self._model.encode(texts).tolist()
        return batch
If you need the raw (path, is_temp) tuple rather than a context manager, use download_asset:
from shared.plugins import download_asset

local_path, is_temp = download_asset(row["data"], suffix=".mp4")
try:
    result = process(local_path)
finally:
    if is_temp:
        import os
        os.unlink(local_path)
Backward compatibility: The internal path from shared.utilities.files import resolve_url_to_local_path still works but may change without notice. Prefer from shared.plugins import open_asset for future-proof plugins.

Using Built-in Services

Instead of implementing models from scratch, compose existing Mixpeek services:

Option 1: Import Batch Services Directly

from shared.inference.registry import get_batch_service

# Get the service class
E5Batch = get_batch_service("intfloat/multilingual-e5-large-instruct")
SigLIPBatch = get_batch_service("google/siglip-base-patch16-224")
WhisperBatch = get_batch_service("openai/whisper-large-v3-turbo")

# Use in your pipeline
def build_steps(extractor_request, **kwargs):
    from shared.inference.intfloat.multilingual_e5_large_instruct.models import InferenceConfigs

    config = InferenceConfigs(
        text_column="text",
        output_column_name="embedding",
    )

    pipeline = PipelineDefinition(
        name="my_extractor",
        version="v1",
        steps=[
            StepDefinition(
                service_class=E5Batch,
                resource_type=ResourceType.CPU,
                config=config,
            ),
        ]
    )
    return {"steps": build_pipeline_steps(pipeline)}

Option 2: Call Real-time Services via HTTP

import httpx

async def embed_text(text: str) -> list[float]:
    """Call the E5 embedding service."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://localhost:8001/multilingual_e5_large_instruct/v1",
            json={"inputs": {"texts": [text]}, "parameters": {}}
        )
        return response.json()["embeddings"][0]

Available Services

Service IDTypeDimensionsUse Case
intfloat/multilingual-e5-large-instructEmbedding1024Multilingual text embeddings
google/siglip-base-patch16-224Embedding512Image embeddings
jinaai/jina-embeddings-v2-base-codeEmbedding768Code embeddings
BAAI/bge-reranker-v2-m3Reranker-Search result reordering
openai/whisper-large-v3-turboTranscription-Audio to text

Loading Models

Mixpeek provides three ways to load models in your plugins:

1. HuggingFace Models (Cluster-Cached)

Models are downloaded once and cached across all workers:
from engine.models import load_hf_model
from transformers import AutoConfig

class MyProcessor:
    def __init__(self, config):
        self.config = config
        self._model = None

    def _ensure_model_loaded(self):
        if self._model is None:
            # Load from cluster cache (zero-copy sharing)
            cached = load_hf_model(
                hf_model_id="intfloat/multilingual-e5-large-instruct",
                model_class="AutoModel",
                tokenizer_class="AutoTokenizer",
                torch_dtype="float16",
            )

            # Instantiate model from cached state_dict
            from transformers import AutoModel, AutoTokenizer
            config = AutoConfig.from_dict(cached["config"])
            self._model = AutoModel(config)
            self._model.load_state_dict(cached["state_dict"])
            self._tokenizer = AutoTokenizer.from_pretrained(
                cached["tokenizer_config"]["tokenizer_dir"]
            )

2. Custom S3 Models (Namespace-Scoped)

Use your own uploaded model weights:
from engine.models import load_namespace_model
import torch

class MyProcessor:
    def __init__(self, config):
        self._model = None

    def _ensure_model_loaded(self):
        if self._model is None:
            # Load pre-uploaded weights from S3
            weights = load_namespace_model("my-fine-tuned-bert_1_0_0")

            # Initialize your model architecture
            self._model = torch.nn.Sequential(
                torch.nn.Linear(768, 256),
                torch.nn.ReLU(),
                torch.nn.Linear(256, 128),
            )
            self._model.load_state_dict(weights)
            self._model.eval()
Automatic lazy loading with cluster-wide caching:
from engine.models import LazyModelMixin
from shared.plugins.inference.batch import BaseBatchInferenceService

class MyEmbedder(LazyModelMixin, BaseBatchInferenceService):
    """Batch embedder with automatic lazy loading."""

    # Configure model (class attributes)
    model_id = "intfloat/multilingual-e5-large-instruct"
    model_class = "AutoModel"
    tokenizer_class = "AutoTokenizer"
    torch_dtype = "float16"

    def __init__(self, config, **kwargs):
        super().__init__(**kwargs)
        self.config = config
        # Model NOT loaded here - loaded on first batch

    def _process_batch(self, batch):
        # Model automatically loaded on first call
        model, tokenizer = self.get_model()

        texts = batch["text"].tolist()
        inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True)
        outputs = model(**inputs)

        batch["embedding"] = outputs.last_hidden_state.mean(dim=1).tolist()
        return batch

4. @lazy_model Decorator (Quick Pattern)

For simpler cases:
from engine.models import lazy_model

class MyProcessor:
    def __init__(self, config):
        self.config = config
        self._model = None
        self._tokenizer = None

    @lazy_model(
        model_id="intfloat/multilingual-e5-large-instruct",
        model_class="AutoModel",
        tokenizer_class="AutoTokenizer",
    )
    def __call__(self, batch):
        # self._model and self._tokenizer are automatically available
        texts = batch["text"].tolist()
        inputs = self._tokenizer(texts, return_tensors="pt", padding=True)
        outputs = self._model(**inputs)
        batch["embedding"] = outputs.last_hidden_state.mean(dim=1).tolist()
        return batch

Pipeline Step Configuration

Resource Types

TypeDescriptionUse Case
ResourceType.CPUCPU-only workersText processing, lightweight models
ResourceType.GPUGPU-allocated workersLarge models, image/video
ResourceType.APIExternal API callsOpenAI, Vertex AI, Anthropic

Row Conditions

Filter which rows your step processes:
from engine.plugins.extractors.pipeline import RowCondition

StepDefinition(
    service_class=ImageProcessor,
    resource_type=ResourceType.GPU,
    condition=RowCondition.IS_IMAGE,  # Only process images
)
ConditionMatches
RowCondition.IS_TEXTtext/* MIME types
RowCondition.IS_IMAGEimage/* MIME types
RowCondition.IS_VIDEOvideo/* MIME types
RowCondition.IS_AUDIOaudio/* MIME types
RowCondition.IS_PDFapplication/pdf
RowCondition.ALWAYSAll rows (default)

Conditional Steps

Enable/disable steps based on parameters:
pipeline = PipelineDefinition(
    name="my_extractor",
    version="v1",
    steps=[
        StepDefinition(
            service_class=TextChunker,
            resource_type=ResourceType.CPU,
            enabled=params.enable_chunking,  # Conditional
            config=chunker_config,
        ),
        StepDefinition(
            service_class=E5Batch,
            resource_type=ResourceType.CPU,
            config=embedding_config,
        ),
    ]
)

Batch Processing Optimization

Your plugin’s __call__(batch) method is called by Ray Data’s map_batches() with a DataFrame of 64 rows by default. How you process those rows determines whether your plugin runs 10x-100x faster or falls back to sequential processing.
Several examples in this guide use sequential patterns for simplicity. In production, always use the batched patterns described in this section. The performance difference is dramatic — a 64-row batch of API calls drops from ~64 seconds (sequential) to ~7 seconds (10-concurrent).

The Core Rule

Collect all items first, then process them together. Never call a model, API, or I/O operation inside a for loop over rows.
# WRONG — sequential GPU call per row (64 rows = 64 GPU calls)
def __call__(self, batch):
    for idx, row in batch.iterrows():
        embedding = self._model.encode(row["text"])  # One at a time!
        batch.at[idx, "embedding"] = embedding
    return batch

# CORRECT — single batched GPU call (64 rows = 1 GPU call)
def __call__(self, batch):
    texts = batch["text"].fillna("").tolist()          # Collect all
    embeddings = self._model.encode(texts)              # Batch process
    batch["embedding"] = [e.tolist() for e in embeddings]
    return batch

Pattern 1: GPU/Model Operations

Pass all inputs to the model at once. Most ML frameworks (PyTorch, SentenceTransformers, HuggingFace) support batched input natively.
class EmbeddingProcessor:
    def __call__(self, batch):
        self._ensure_model_loaded()

        # Collect all text from the batch
        texts = batch["data"].fillna("").tolist()

        # Single batched model call
        embeddings = self._model.encode(texts, batch_size=32)

        batch["embedding"] = embeddings.tolist()
        return batch
For image/video models, preprocess all tensors then stack:
import torch

class ImageProcessor:
    def __call__(self, batch):
        self._ensure_model_loaded()

        # Preprocess all images into tensors
        tensors = []
        for _, row in batch.iterrows():
            img = load_image(row["data"])
            tensors.append(self._preprocess(img))

        # Single batched GPU forward pass
        stacked = torch.stack(tensors)
        with torch.no_grad():
            outputs = self._model(stacked)

        batch["embedding"] = outputs.cpu().tolist()
        return batch

Pattern 2: I/O Operations (File Downloads, FFmpeg)

Use parallel_io from the Plugin SDK to download/process files in parallel threads:
from shared.plugins.sdk import parallel_io, download_asset

class VideoFrameProcessor:
    def __call__(self, batch):
        # Download ALL videos in parallel (8 concurrent threads)
        def extract_frames(url):
            path, is_temp = download_asset(url, suffix=".mp4")
            frames = ffmpeg_extract_frames(path)
            if is_temp:
                import os
                os.unlink(path)
            return frames

        frame_results = parallel_io(
            items=batch["data"].tolist(),
            process_fn=extract_frames,
            max_workers=8,
        )

        # Now batch-process all frames on GPU
        all_frames = []
        frame_map = []
        for i in sorted(frame_results):
            for frame in frame_results[i]:
                all_frames.append(self._preprocess(frame))
                frame_map.append(i)

        if all_frames:
            embeddings = self._model(torch.stack(all_frames))
            # Map embeddings back to rows...

        return batch

Pattern 3: API Calls (LLM, External Services)

Use concurrent_api_calls to make async calls with a concurrency semaphore:
from shared.plugins.sdk import concurrent_api_calls

class LLMEnrichmentProcessor:
    def __init__(self, config, **kwargs):
        self._llm = config.get("llm_service")

    def __call__(self, batch):
        texts = batch["data"].fillna("").tolist()

        # Define async function for one API call
        async def enrich(text):
            return await self._llm.generate(
                instruction="Extract key entities from this text",
                text=text,
                provider="google",
                model="gemini-2.5-flash",
            )

        # Call ALL rows concurrently (up to 10 at a time)
        results = concurrent_api_calls(
            items=texts,
            async_fn=enrich,
            max_concurrent=10,
        )

        batch["entities"] = [r.data if r else {} for r in results]
        return batch

Anti-Patterns to Avoid

Anti-PatternProblemFix
for row in batch.iterrows(): model.encode(row["text"])Sequential GPU callsCollect texts, call model.encode(all_texts)
for row in batch.iterrows(): requests.get(row["url"])Sequential downloadsUse parallel_io(urls, download_fn)
for row in batch.iterrows(): await llm.generate(row["text"])Sequential API callsUse concurrent_api_calls(texts, llm_fn)
asyncio.new_event_loop() + run_until_complete in loopCreates event loop per rowUse concurrent_api_calls (handles event loop)
Processing only batch.iloc[0]Silently drops all other rowsProcess all rows, return DataFrame with same row count

Compute Profile in Manifest

Declare resource requirements in your manifest.py to control batch size, concurrency, and resource allocation:
# manifest.py
compute_profile = {
    "resource_type": "gpu",      # "cpu", "gpu", or "api"
    "batch_size": 32,            # Rows per batch (default: 64)
    "max_concurrency": 4,        # Parallel Ray actors (default: 2)
}
FieldDefaultDescription
resource_type"cpu"Worker type: "cpu" (CPU-only), "gpu" (GPU-allocated), "api" (I/O-bound)
batch_size64Number of rows per __call__ invocation
max_concurrency2Number of parallel Ray actors processing batches
Tuning guidance:
  • GPU models: batch_size=16-32 (limited by VRAM), max_concurrency=1-2
  • Text embeddings (CPU): batch_size=64-128, max_concurrency=2-4
  • API calls: batch_size=32-64, max_concurrency=2-4 (concurrency is within each batch via concurrent_api_calls)

Validating Your Plugin

Run the batch processor linter to detect sequential processing anti-patterns before deploying:
python scripts/engine/lint_batch_processors.py my_plugin/
This checks for:
  • iterrows() loops with model/API calls inside (SEQ002)
  • Sequential row iteration without parallelization (SEQ001)
  • Per-item tokenization inside loops (SEQ003)

Plugin SDK Helpers Reference

Import from shared.plugins.sdk or shared.plugins:
FunctionUse CaseImport
parallel_io(items, fn, max_workers)Parallel file downloads, FFmpeg, I/Ofrom shared.plugins.sdk import parallel_io
concurrent_api_calls(items, async_fn, max_concurrent)Concurrent LLM/API callsfrom shared.plugins.sdk import concurrent_api_calls
open_asset(url, suffix)Context manager for S3 downloadsfrom shared.plugins import open_asset
download_asset(url, suffix)Manual S3 download with cleanup flagfrom shared.plugins import download_asset

Builtin Plugin Development

This section is for Mixpeek internal developers creating new builtin plugins.

Directory Structure

engine/plugins/builtin/my_extractor/v1/
├── __init__.py
├── definition.py     # Single source of truth
├── pipeline.py       # Batch processing pipeline
├── realtime.py       # Optional: Ray Serve endpoint
└── processors/
    ├── __init__.py
    └── core.py       # Processing logic

definition.py (Single Source of Truth)

"""My extractor plugin definition."""

from enum import IntEnum
from typing import List, Literal
from pydantic import BaseModel, Field

from shared.billing.models import CostRate, CostUnit
from shared.collection.features.extractors.models import (
    CostsInfo,
    FeatureExtractorModel,
)
from shared.collection.features.vectors.models import (
    VectorIndex,
    VectorIndexDefinition,
    VectorType,
)

# =============================================================================
# COST CONFIGURATION
# =============================================================================

class MyExtractorCosts(IntEnum):
    PER_1K_TOKENS = 1

TIER = 1
TIER_LABEL = "SIMPLE"

RATES: List[CostRate] = [
    CostRate(
        unit=CostUnit.TOKENS_1K,
        credits_per_unit=MyExtractorCosts.PER_1K_TOKENS,
        description="Per 1K tokens processed",
    ),
]

# =============================================================================
# PARAMETER SCHEMA
# =============================================================================

class MyExtractorParams(BaseModel):
    extractor_type: Literal["my_extractor"] = "my_extractor"
    threshold: float = Field(default=0.5, ge=0, le=1)
    enable_feature_x: bool = Field(default=True)

# =============================================================================
# INPUT/OUTPUT SCHEMAS
# =============================================================================

class MyExtractorInput(BaseModel):
    text: str = Field(..., min_length=1)

class MyExtractorOutput(BaseModel):
    embedding: List[float] = Field(..., min_length=1024, max_length=1024)

# =============================================================================
# VECTOR INDEX CONFIGURATION
# =============================================================================

VECTOR_INDEXES = [
    VectorIndexDefinition(
        name="my_embedding",
        description="Dense embedding vector",
        type="single",
        index=VectorIndex(
            name="my_extractor_v1_embedding",
            description="Dense vector embedding",
            dimensions=1024,
            type=VectorType.DENSE,
            distance="Cosine",
            inference_service_id="intfloat/multilingual-e5-large-instruct",
        ),
    ),
]

# =============================================================================
# PLUGIN DEFINITION
# =============================================================================

definition = FeatureExtractorModel(
    feature_extractor_name="my_extractor",
    version="v1",
    description="My custom extractor for specialized processing",
    icon="wand-2",
    input_schema=MyExtractorInput,
    output_schema=MyExtractorOutput,
    parameter_schema=MyExtractorParams,
    required_vector_indexes=VECTOR_INDEXES,
    costs=CostsInfo(tier=TIER, tier_label=TIER_LABEL, rates=RATES),
)

metadata = {
    "name": "my_extractor",
    "version": "v1",
    "description": definition.description,
}

Register in Plugin Registry

Add to engine/plugins/registry.py:
BUILTIN_PLUGINS = {
    # ... existing plugins
    "my_extractor_v1": {
        "name": "my_extractor",
        "version": "v1",
        "module_path": "engine.plugins.builtin.my_extractor.v1",
    },
}

Security Requirements

Custom plugins are scanned before deployment. Code violating these rules is rejected.

Allowed

CategoryLibraries
Datanumpy, pandas, polars, pyarrow
ML/AItorch, transformers, sentence_transformers, onnxruntime
ImagePIL, cv2, imageio
Audiolibrosa, soundfile, ffmpeg-python
HTTPrequests, httpx, aiohttp
Utilsos, json, re, typing, dataclasses, pydantic, logging

Forbidden

CategoryBlockedReason
Executionsubprocess, os.system, os.popen, os.exec*, eval, execShell/code execution
Systemctypes, socket, multiprocessingLow-level access
Builtinsopen, setattr, delattr, __import__File/attribute mutation
import os is allowed. Only specific dangerous functions (os.system, os.popen, os.exec*, os.spawn*) are blocked. Safe uses like os.environ["KEY"] and os.path.join(...) work fine.

Allowed Read-Only Builtins

getattr(), hasattr(), and dir() are allowed in custom plugins. These are read-only operations commonly used by ML libraries and configuration patterns:
# All of these are fine in custom plugins
model_name = getattr(config, "model_name", "default")
if hasattr(self, "_model"):
    self._model.eval()

# os.environ is allowed for device/runtime configuration
import os
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"

File I/O in Plugins

Direct open() calls in your plugin .py files are blocked by the security scanner.
However, library-internal file I/O is allowed. The scanner only analyzes your plugin’s Python source code via AST inspection — it does not scan compiled C extensions or library internals. This means:
PatternAllowed?Why
onnxruntime.InferenceSession("model.onnx")YesC++ internal file I/O
transformers.AutoModel.from_pretrained(...)YesLibrary-internal download + read
torch.load("weights.pt")YesLibrary-internal file I/O
open("config.json", "r") in your .pyNoDirect open() in plugin code
pd.read_csv("data.csv")YesLibrary-internal file I/O
For reading configuration files, embed your config as Python dicts/dataclasses in your plugin code, or use json.loads() on a string constant.

Using Platform LLM Services

Custom plugins can access Mixpeek’s platform LLM services (Gemini, OpenAI, Anthropic) through the container.llm accessor. This gives you:
  • Platform-managed API keys — no need to bring your own
  • Response caching for repeated calls
  • Cost tracking integrated with your org billing
  • Error handling with typed failures

Quick Example

# pipeline.py
def build_steps(extractor_request, container=None, **kwargs):
    config = {
        "llm_service": container.llm,  # Pass to your processor
        "threshold": 0.8,
    }

    pipeline = PipelineDefinition(
        name="brand_compliance",
        version="v1",
        steps=[
            StepDefinition(
                service_class=BrandComplianceProcessor,
                resource_type=ResourceType.API,
                config=config,
            ),
        ]
    )
    return {"steps": build_pipeline_steps(pipeline)}
# processors/compliance.py
from shared.plugins.sdk import concurrent_api_calls

class BrandComplianceProcessor:
    def __init__(self, config, **kwargs):
        self._llm = config.get("llm_service")
        self.threshold = config.get("threshold", 0.8)

    def __call__(self, batch):
        image_urls = batch["image_url"].tolist()

        # Define async function for a single API call
        async def evaluate_image(url):
            return await self._llm.generate(
                instruction="Evaluate this image for brand compliance",
                images=[url],
                provider="google",
                model="gemini-2.5-flash",
                schema={
                    "type": "object",
                    "properties": {
                        "compliant": {"type": "boolean"},
                        "violations": {"type": "array", "items": {"type": "string"}},
                        "score": {"type": "number"}
                    }
                }
            )

        # Call ALL images concurrently (up to 10 at a time)
        results = concurrent_api_calls(
            items=image_urls,
            async_fn=evaluate_image,
            max_concurrent=10,
        )

        batch["compliance_result"] = [r.data if r else {} for r in results]
        return batch

Available LLM Methods

MethodDescription
container.llm.generate(instruction, text, images, provider, model, schema)Convenience method for text/image generation
container.llm.process(LLMRequest(...))Full control with LLMRequest object
container.llm.serviceDirect access to CentralLLMService instance

Supported Providers

ProviderModelsBest For
googlegemini-2.5-flash, gemini-2.5-proFast multimodal, structured output
openaigpt-4o, gpt-4o-miniText generation, function calling
anthropicclaude-sonnet-4-20250514Complex reasoning, long context

Plugin Parameters and Secrets

How Parameters Flow

Plugin parameters are configured when enabling the plugin for a namespace. They flow through the pipeline as:
Enable Plugin API → MongoDB namespace.feature_extractors[].params → extractor_request.params → build_steps()
# When enabling a plugin via API:
curl -X POST "https://api.mixpeek.com/v1/organizations/$ORG_ID/plugins/$PLUGIN_ID/enable" \
  -H "Authorization: Bearer $API_KEY" \
  -d '{
    "params": {
      "model_name": "gemini-2.5-flash",
      "confidence_threshold": 0.8,
      "api_key": "your-external-api-key"
    }
  }'
# In your pipeline.py, read params:
def build_steps(extractor_request, container=None, **kwargs):
    params = extractor_request.extractor_config.parameters or {}
    model_name = params.get("model_name", "gemini-2.5-flash")
    threshold = params.get("confidence_threshold", 0.8)

Managing Secrets

Custom plugins can access encrypted organization secrets via container.secrets. Secrets are stored encrypted at rest using the Organization Secrets API and decrypted on demand at plugin runtime.
# In your pipeline.py:
def build_steps(extractor_request, container=None, **kwargs):
    processor = MyProcessor(
        config={},
        secrets=container.secrets,  # Pass secrets accessor to your processor
    )
    return {"steps": [processor], "prepare": lambda ds: ds}

class MyProcessor:
    def __init__(self, config, secrets=None):
        self._secrets = secrets

    async def process(self, data):
        # Read a secret at runtime (decrypted automatically)
        api_key = await self._secrets.get("EXTERNAL_API_KEY")
        # Use for external service calls
MethodDescription
await container.secrets.get("KEY")Get a decrypted secret value by name
await container.secrets.list()List available secret names (not values)
For platform LLM services (Gemini, OpenAI, Anthropic), use container.llm instead — it handles API keys automatically. Use container.secrets for your own external API keys, tokens, and credentials.
You can also pass non-secret configuration as plugin parameters:
# Plugin parameters (stored in MongoDB, not encrypted)
{
  "params": {
    "webhook_url": "https://your-service.com/callback",
    "threshold": 0.8
  }
}

Archive Size and Model Loading

Upload Limits

LimitValue
Archive upload size500 MB
Extracted archive size500 MB
Max files in archive1,000

Loading Large Models

Most ML models exceed 500 MB. The recommended pattern is lazy downloading from HuggingFace Hub at init time:
class MyProcessor:
    def __init__(self, config, **kwargs):
        self._model = None

    def _ensure_model_loaded(self):
        if self._model is None:
            # Downloaded once, cached on disk for subsequent calls
            from transformers import AutoModel
            self._model = AutoModel.from_pretrained(
                "google/siglip-base-patch16-224",
                cache_dir="/tmp/hf_cache"
            )

    def __call__(self, batch):
        self._ensure_model_loaded()
        # ... process batch
For custom model weights, use the namespace model API:
from shared.models.loader import load_namespace_model

weights = load_namespace_model("my-fine-tuned-model_1_0_0")
First cold start will download models (~1-2 min depending on size). Subsequent starts use the HuggingFace cache. Use LazyModelMixin for automatic cluster-wide caching.

Plugins vs Retrievers: Architecture

Understanding the boundary between plugins and retrievers is key for building retrieval-augmented pipelines.

Plugin Responsibility

Plugins handle data transformation: embedding, classification, extraction.
Raw Data → Plugin (batch/realtime) → Embeddings + Structured Output → Qdrant

Retriever Responsibility

Retrievers handle search and retrieval context: query processing, vector search, reranking, enrichment.
Query → Retriever Stages → [semantic_search → rerank → agentic_enrich] → Results

How They Interact

┌───────────────────────────────────────────────────┐
│                    Ingestion                       │
│  Object → Plugin pipeline.py → Embeddings → Qdrant│
└───────────────────────────────────────────────────┘

┌───────────────────────────────────────────────────┐
│                    Retrieval                       │
│  Query → Plugin realtime.py (embed query)          │
│       → Retriever semantic_search (Qdrant search)  │
│       → Retriever rerank (sort results)            │
│       → Retriever agentic_enrich (LLM analysis)    │
│       → Results with scores                        │
└───────────────────────────────────────────────────┘

What realtime.py Receives

The run_inference(inputs, parameters) method receives:
  • inputs["texts"] or inputs["text"] — text to embed
  • inputs["vector_index"] — which vector index to use
  • parameters — inference options
It does not receive retrieval context or search results. If your pipeline needs retrieval context (e.g., “compare this image against 5 nearest references”), use retriever stages:
  1. Plugin pipeline.py: Embed reference images during ingestion
  2. Plugin realtime.py: Embed the query image at search time
  3. Retriever semantic_search stage: Find nearest references in Qdrant
  4. Retriever agentic_enrich or code_execution stage: Run VLM comparison with retrieved references
For retrieval-dependent analysis (like brand compliance with reference comparison), the VLM evaluation belongs in a retriever stage, not in the plugin’s realtime.py.

Realtime Response Contract

Your run_inference() method must return a dictionary. The retriever’s feature_search stage uses the "embedding" key as the primary vector for Qdrant search.
# Single embedding (most common) — return "embedding" key
async def run_inference(self, inputs: dict, parameters: dict) -> dict:
    text = inputs.get("text", "")
    vector = self._model.encode([text])[0].tolist()
    return {"embedding": vector}  # ← "embedding" is the primary key
The key must be "embedding" (singular) for the primary vector. This is what feature_search extracts to query Qdrant.
Multi-vector plugins that produce multiple embeddings (e.g., text + image):
async def run_inference(self, inputs: dict, parameters: dict) -> dict:
    text_vec = self._text_model.encode([inputs["text"]])[0].tolist()
    image_vec = self._image_model.encode(inputs["image"]).tolist()
    return {
        "embedding": text_vec,         # Primary vector (used by feature_search)
        "image_embedding": image_vec,  # Additional vectors accessible by name
    }
The feature_search stage extracts the vector matching the vector_index name from the manifest features. If no match is found, it falls back to "embedding".

Compute Profile

Control resource allocation for your plugin by adding a compute_profile dict in manifest.py:
# manifest.py
compute_profile = {
    "resource_type": "cpu",      # "cpu", "gpu", or "api"
    "batch_size": 32,            # Rows per batch
    "max_concurrency": 4,        # Max parallel actors
    "actor_memory_gb": 2.0,      # RAM per actor (GB)
}
FieldDefaultDescription
resource_type"cpu"Primary resource type. "cpu" skips GPU worker allocation. "gpu" requests GPU nodes. "api" for external API-based plugins.
batch_size64Rows per batch. Smaller = more overhead, larger = more memory.
max_concurrency2Max parallel Ray actors.
actor_memory_gb2.0System RAM per actor in GB.
gpu_memory_gbNoneGPU VRAM per actor (only for resource_type: "gpu").
For hash-based or API-based plugins that don’t need GPU, set resource_type: "cpu" to avoid allocating expensive GPU nodes. This reduces batch startup time by 3+ minutes and costs ~6x less.

Example: Image Classification Plugin

Here’s a complete image processing plugin (addresses the common need for non-text templates):
# manifest.py
feature_extractor_name = "image_classifier"
version = "1.0.0"
description = "Image classification with structured labels"

dependencies = ["torch", "torchvision", "pillow"]

features = [
    {
        "feature_name": "image_embedding",
        "feature_type": "embedding",
        "embedding_dim": 512,
        "distance_metric": "cosine",
    }
]

feature_uri = "mixpeek://image_classifier@1.0.0/image_embedding"

output_schema = {
    "image_embedding": {"type": "array", "items": {"type": "number"}},
    "labels": {"type": "array", "items": {"type": "string"}},
    "confidence": {"type": "number"},
}
# processors/classifier.py
import io
import pandas as pd
from shared.plugins.sdk import parallel_io

class ImageClassifierProcessor:
    def __init__(self, config=None, **kwargs):
        self._model = None
        self._preprocess = None
        self.config = config or {}

    def _ensure_model_loaded(self):
        if self._model is None:
            import torch
            from torchvision import models, transforms

            self._model = models.mobilenet_v2(pretrained=True)
            self._model.eval()
            self._preprocess = transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                transforms.Normalize(
                    mean=[0.485, 0.456, 0.406],
                    std=[0.229, 0.224, 0.225]
                ),
            ])

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        self._ensure_model_loaded()
        import torch
        from PIL import Image

        urls = [
            row.get("source_url") or row.get("image_url", "")
            for _, row in batch.iterrows()
        ]

        # Step 1: Download ALL images in parallel (I/O-bound)
        def download_image(url):
            import requests
            resp = requests.get(url, timeout=30)
            return Image.open(io.BytesIO(resp.content)).convert("RGB")

        downloaded = parallel_io(urls, download_image, max_workers=8)

        # Step 2: Preprocess into tensor batch
        tensors = []
        valid_indices = []
        for i in range(len(urls)):
            if i in downloaded:
                tensors.append(self._preprocess(downloaded[i]))
                valid_indices.append(i)

        # Step 3: Single batched GPU inference
        embeddings = [[0.0] * 512] * len(urls)
        labels_list = [[]] * len(urls)

        if tensors:
            stacked = torch.stack(tensors)
            with torch.no_grad():
                outputs = self._model(stacked)
                for j, orig_idx in enumerate(valid_indices):
                    embeddings[orig_idx] = outputs[j].tolist()[:512]
                    labels_list[orig_idx] = [
                        str(c) for c in torch.topk(outputs[j:j+1], 5).indices[0].tolist()
                    ]

        batch["image_embedding"] = embeddings
        batch["labels"] = labels_list
        return batch
# pipeline.py
from engine.plugins.extractors.pipeline import (
    PipelineDefinition, ResourceType, RowCondition,
    StepDefinition, build_pipeline_steps,
)
from .processors.classifier import ImageClassifierProcessor

def build_steps(extractor_request, container=None, **kwargs):
    params = (extractor_request.extractor_config.parameters or {})

    pipeline = PipelineDefinition(
        name="image_classifier",
        version="1.0.0",
        steps=[
            StepDefinition(
                service_class=ImageClassifierProcessor,
                resource_type=ResourceType.GPU,
                condition=RowCondition.IS_IMAGE,
                config={"threshold": params.get("threshold", 0.5)},
            ),
        ]
    )
    return {"steps": build_pipeline_steps(pipeline)}

Example: VLM Analyzer Plugin

A plugin that uses platform LLM services for vision-language analysis:
# manifest.py
feature_extractor_name = "vlm_analyzer"
version = "1.0.0"
description = "Vision-language analysis using Gemini"

dependencies = ["pillow", "httpx"]

features = [
    {
        "feature_name": "vlm_embedding",
        "feature_type": "embedding",
        "embedding_dim": 384,
        "distance_metric": "cosine",
    }
]

output_schema = {
    "vlm_embedding": {"type": "array", "items": {"type": "number"}},
    "analysis": {"type": "object"},
    "tags": {"type": "array", "items": {"type": "string"}},
}
# processors/analyzer.py
import hashlib
import pandas as pd
from shared.plugins.sdk import concurrent_api_calls

class VLMAnalyzerProcessor:
    def __init__(self, config=None, **kwargs):
        self.config = config or {}
        self._llm = config.get("llm_service")
        self.analysis_prompt = config.get(
            "prompt",
            "Describe this image. Return tags and a detailed description."
        )

    def _text_to_embedding(self, text, dim=384):
        """Deterministic text-to-embedding for analysis results."""
        h = hashlib.sha256(text.encode()).digest()
        return [((b % 200) - 100) / 100.0 for b in (h * (dim // 32 + 1))[:dim]]

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        image_urls = batch.get("source_url", pd.Series([""] * len(batch))).tolist()

        # Call Gemini concurrently for ALL rows (up to 10 at a time)
        async def analyze_image(url):
            return await self._llm.generate(
                instruction=self.analysis_prompt,
                images=[url],
                provider="google",
                model="gemini-2.5-flash",
                schema={
                    "type": "object",
                    "properties": {
                        "description": {"type": "string"},
                        "tags": {"type": "array", "items": {"type": "string"}},
                    }
                }
            )

        results = concurrent_api_calls(
            items=image_urls,
            async_fn=analyze_image,
            max_concurrent=10,
        )

        # Map results back to DataFrame columns
        analyses = [r.data if r else {} for r in results]
        batch["vlm_embedding"] = [
            self._text_to_embedding(a.get("description", "")) if a else [0.0] * 384
            for a in analyses
        ]
        batch["analysis"] = analyses
        batch["tags"] = [a.get("tags", []) if a else [] for a in analyses]
        return batch
# pipeline.py
from engine.plugins.extractors.pipeline import (
    PipelineDefinition, ResourceType, RowCondition,
    StepDefinition, build_pipeline_steps,
)
from .processors.analyzer import VLMAnalyzerProcessor

def build_steps(extractor_request, container=None, **kwargs):
    params = (extractor_request.extractor_config.parameters or {})

    config = {
        "llm_service": container.llm,
        "prompt": params.get("prompt", "Describe this image in detail."),
    }

    pipeline = PipelineDefinition(
        name="vlm_analyzer",
        version="1.0.0",
        steps=[
            StepDefinition(
                service_class=VLMAnalyzerProcessor,
                resource_type=ResourceType.API,
                condition=RowCondition.IS_IMAGE,
                config=config,
            ),
        ]
    )
    return {"steps": build_pipeline_steps(pipeline)}

Local Testing

Unit Testing Your Plugin

Test your processors locally without the full Mixpeek stack:
# tests/test_my_processor.py
import pandas as pd
from my_plugin.processors.core import MyProcessor

def test_processor_basic():
    processor = MyProcessor(config={"threshold": 0.5})
    batch = pd.DataFrame({
        "text": ["Hello world", "Test input"],
        "document_id": ["doc_1", "doc_2"],
    })

    result = processor(batch)

    assert "my_embedding" in result.columns
    assert len(result) == 2
    assert all(len(e) == 384 for e in result["my_embedding"])

Testing with Mock LLM Service

# tests/test_with_llm.py
from unittest.mock import AsyncMock, MagicMock

def test_vlm_processor():
    mock_llm = MagicMock()
    mock_llm.generate = AsyncMock(return_value=MagicMock(
        data={"description": "A red car", "tags": ["car", "red", "vehicle"]}
    ))

    processor = VLMAnalyzerProcessor(config={
        "llm_service": mock_llm,
        "prompt": "Describe this image",
    })

    batch = pd.DataFrame({
        "source_url": ["https://example.com/car.jpg"],
    })
    result = processor(batch)

    assert result["tags"].iloc[0] == ["car", "red", "vehicle"]
    mock_llm.generate.assert_called_once()

Testing build_steps

# tests/test_pipeline.py
from unittest.mock import MagicMock
from my_plugin.pipeline import build_steps

def test_build_steps_returns_steps():
    mock_request = MagicMock()
    mock_request.extractor_config.parameters = {"threshold": 0.8}

    mock_container = MagicMock()

    result = build_steps(mock_request, container=mock_container)

    assert "steps" in result
    assert len(result["steps"]) > 0

Running the Security Scanner Locally

Before uploading, validate your plugin passes the security scan:
# test_security.py
from api.namespaces.plugins.security.scanner import PluginSecurityScanner

scanner = PluginSecurityScanner(strict_mode=False)
result = scanner.scan_directory("my_plugin/")

if not result.passed:
    for v in result.violations:
        print(f"{v.filename}:{v.line}: [{v.severity}] {v.message}")
else:
    print("Security scan passed!")

Deployment Lifecycle

Deploy Real-time Endpoint

curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy" \
  -H "Authorization: Bearer $API_KEY"

Check Status

curl "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/status"
StatusDescription
QUEUEDWaiting in deployment queue
PENDINGDeployment triggered
IN_PROGRESSBlue-green rollout in progress
DEPLOYEDReady for real-time inference
FAILEDCheck error field
NOT_DEPLOYEDBatch-only mode

Undeploy

curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/undeploy"

Delete

curl -X DELETE "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID"

Example: Multi-Stage Pipeline

Here’s a complete example combining transcription and embedding:
# pipeline.py
from shared.inference.registry import get_batch_service
from engine.plugins.extractors.pipeline import (
    PipelineDefinition,
    ResourceType,
    RowCondition,
    StepDefinition,
    build_pipeline_steps,
)

def build_steps(extractor_request, **kwargs):
    params = MyParams(**(extractor_request.extractor_config.parameters or {}))

    # Get built-in services
    WhisperBatch = get_batch_service("openai/whisper-large-v3-turbo")
    E5Batch = get_batch_service("intfloat/multilingual-e5-large-instruct")

    pipeline = PipelineDefinition(
        name="audio_to_embedding",
        version="v1",
        steps=[
            # Step 1: Transcribe audio
            StepDefinition(
                service_class=WhisperBatch,
                resource_type=ResourceType.API,
                condition=RowCondition.IS_AUDIO,
                config=WhisperConfig(output_column="transcription"),
            ),
            # Step 2: Embed transcription
            StepDefinition(
                service_class=E5Batch,
                resource_type=ResourceType.CPU,
                config=E5Config(
                    text_column="transcription",
                    output_column_name="embedding",
                ),
            ),
        ]
    )

    return {"steps": build_pipeline_steps(pipeline)}

Troubleshooting

Plugin validation failed

Check the validation_errors array. Common issues:
  • Using setattr() or delattr() — use class attributes or constructor assignment instead
  • Importing subprocess — use requests or httpx for HTTP
  • Using open() directly — library-internal file I/O (torch.load, transformers, etc.) is allowed
  • Using eval() or exec() — use json.loads() for parsing, dataclasses for config
import os and getattr() / hasattr() are all allowed. Only the dangerous functions (os.system, os.popen, os.exec*) are blocked.

Task shows COMPLETED but 0 documents were written

This is the most confusing failure mode. The task API returns COMPLETED even when the Qdrant upsert fails for every row — for example, because your plugin declared new vector names that weren’t registered in the namespace’s Qdrant schema. Symptoms:
Task status: COMPLETED
Documents found: 0
Most common cause — wrong features format in manifest.py: If you used the wrong key names (type/name/dimensions instead of feature_type/feature_name/embedding_dim), the collection is created with vector_indexes: [] in MongoDB. The engine then tries to write embeddings to vector names that don’t exist in Qdrant and fails silently. Fix: Correct the features format (see Manifest Features Format) and recreate the collection. Diagnosis steps:
  1. Check that vector_indexes is non-empty on your collection: GET /v1/collections/{collection_id}
  2. If vector_indexes is empty, delete and recreate the collection with corrected manifest.py
  3. Check Ray worker logs for Qdrant upsert errors (Not existing vector name)

Retriever creation fails with collection_identifiers error

The retriever create endpoint requires collection_identifiers, not collection_ids. The error response hints at this:
{"hint": "Add collection_identifiers to your retriever"}
Use:
{
  "collection_identifiers": ["col_abc123"]
}
Not "collection_ids" (used by other endpoints).

Model loading is slow

  • Use LazyModelMixin for automatic cluster caching
  • Pre-deploy models via the Model Registry API
  • For HuggingFace models, set cache_dir="/tmp/hf_cache" to reuse across restarts
  • Check cached: true in deployment response

Archive too large

The upload limit is 500 MB. For plugins with large models:
  • Don’t bundle model weights in the archive
  • Download from HuggingFace Hub at init time using lazy loading
  • Use load_namespace_model() for custom weights uploaded via the Models API

Batch produces 0 documents

The most common cause: your plugin reads from the wrong column.
Custom plugins always receive data in the data column — NOT a column named after your blob property. If your bucket has a text property, the content is still in batch["data"], not batch["text"].
Check Ray job logs for [FailureAggregator] entries:
  • embed=N means N rows had null embedding columns (your plugin returned None)
  • Coverage 0.0% means all rows were filtered — likely wrong column name

Batch processing fails

  1. Read from batch["data"] — not batch["text"] or other column names
  2. Ensure __call__() returns a DataFrame with the same number of rows
  3. Reset DataFrame index: batch = batch.reset_index(drop=True)
  4. Handle None/empty values: batch["data"].fillna("")
  5. Use concurrent_api_calls() for async LLM calls (not manual event loop creation)
  6. Run the linter to detect sequential processing anti-patterns: python scripts/engine/lint_batch_processors.py my_plugin/
  7. See Batch Processing Optimization for correct patterns

Python API client blocked by Cloudflare

If your Python scripts get Cloudflare error 1010 (“banned based on browser signature”), use the requests library with a proper User-Agent header instead of urllib:
import requests

headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json",
    "User-Agent": "MyApp/1.0",
}
response = requests.post(url, json=payload, headers=headers)
curl works fine. The issue only affects Python’s built-in urllib which sends a user-agent that Cloudflare blocks.

Version format rejected

Plugin versions must be semantic versioning format (X.Y.Z). Common formats are normalized automatically:
  • v11.0.0
  • 1.01.0.0
  • v2.12.1.0
  • 1.0.01.0.0 (already valid)

Real-time endpoint not responding

  1. Check deployment status via /status endpoint
  2. Verify plugin is DEPLOYED not NOT_DEPLOYED
  3. Check Ray Serve logs for errors

Can’t access retrieval context in realtime.py

realtime.py handles embedding, not retrieval. If your pipeline needs to compare against stored references, configure retriever stages (semantic_searchrerankagentic_enrich) to handle the retrieval and comparison logic. See Plugins vs Retrievers above.