Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.mixpeek.com/docs/llms.txt

Use this file to discover all available pages before exploring further.

For a high-level overview of what custom extractors can do, see Custom Extractors. For the upload and deploy API, see the Extractor API reference.

Extractor Structure

Every extractor has the same layout:
my_extractor/
├── manifest.py      # Schemas, metadata, vector indexes
├── pipeline.py      # Batch processing pipeline
├── realtime.py      # Real-time HTTP endpoint (optional)
└── processors/
    └── core.py      # Your processing logic
  • manifest.py declares what your extractor accepts, produces, and which vector indexes to create
  • pipeline.py wires your processor into the Ray Data batch pipeline
  • realtime.py exposes a Ray Serve endpoint for query-time inference (e.g., embedding queries for feature_search)
  • processors/ contains your actual logic — model loading, embedding, classification, etc.

Manifest

The manifest is your extractor’s contract with the platform.
# manifest.py
from pydantic import BaseModel, Field
from typing import List

class MyInput(BaseModel):
    text: str = Field(..., description="Input text to process")

class MyOutput(BaseModel):
    embedding: List[float] = Field(..., description="384-dim embedding vector")

class MyParams(BaseModel):
    model_size: str = Field(default="base", description="base or large")

metadata = {
    "feature_extractor_name": "my_extractor",
    "version": "1.0.0",
    "description": "Custom text embedding extractor",
    "category": "text",
    "inference_type": "embedding",  # declares real-time inference capability
}

input_schema = MyInput
output_schema = MyOutput
parameter_schema = MyParams
supported_input_types = ["text"]

features = [
    {
        "feature_type": "embedding",
        "feature_name": "my_embedding",
        "embedding_dim": 384,
        "distance_metric": "cosine",
    },
]

Vector Index Keys

Use the exact key names below. Wrong keys silently create a collection with no vector indexes — your batch will show COMPLETED but produce 0 documents.
KeyRequiredDescription
feature_typeYesMust be "embedding"
feature_nameYesName of the vector index
embedding_dimYesVector dimensionality
distance_metricYes"cosine", "euclid", or "dot"
Multiple vectors are supported — add one entry per embedding your extractor produces.

Inference Type

Declare what kind of real-time inference your extractor provides by setting inference_type in metadata. This lets retriever stages validate that an extractor is compatible with the stage slot.
ValueContractCompatible Stages
embeddingReturns {vector: [float]}feature_search
rerankAccepts {pairs: [[q, d]]}, returns {scores: [float]}rerank
classifyAccepts {text: str}, returns {labels: [{label, confidence}]}classify
generateAccepts {prompt: str}, returns {text: str}llm_filter, llm_enrich
generalNo specific contractRaw /inference endpoint only
If inference_type is omitted, the extractor can only be called via the raw inference endpoint.

Compute Profile

Control resource allocation by adding compute_profile to your manifest:
compute_profile = {
    "resource_type": "cpu",      # "cpu", "gpu", or "api"
    "batch_size": 32,            # Rows per __call__ (default: 64)
    "max_concurrency": 4,        # Parallel Ray actors (default: 2)
}
For API-based or hash-based extractors that don’t need GPU, set resource_type: "cpu" to skip GPU allocation — saves ~3 minutes startup time and costs ~6x less.

BYO Container Image

If your extractor needs native binaries or system packages, specify a custom container image:
# manifest.py
container_image = "us-east1-docker.pkg.dev/mixpeek-inference-463103/extractors-<your-org-id>/my-image:1.0.0"
Base your image on the Mixpeek engine image to get Ray, FFmpeg, and all SDK helpers:
FROM us-east1-docker.pkg.dev/mixpeek-inference-463103/mixpeek-engine/engine-base:latest
RUN apt-get update && apt-get install -y libcustom-sdk ...
Images must be pushed to your org-scoped Artifact Registry repo. GKE Workload Identity handles pull auth. Contact your account team to provision access.

Batch Processor

Your processor receives a pandas DataFrame and returns it with new columns added.
# processors/core.py
import pandas as pd

class MyProcessor:
    def __init__(self, config, **kwargs):
        self.config = config
        self._model = None

    def _ensure_model_loaded(self):
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        self._ensure_model_loaded()
        texts = batch["data"].fillna("").tolist()
        embeddings = self._model.encode(texts).tolist()
        batch["my_embedding"] = embeddings
        return batch

DataFrame Columns

Your __call__ receives these columns:
ColumnDescription
dataFor text blobs: the raw string. For binary blobs: an S3 URL (not raw bytes).
document_idUnique document ID
object_idSource object in the bucket
blob_type"image", "video", "audio", "text"
blob_propertyProperty name from your bucket schema
mime_typeMIME type (e.g. image/jpeg)
Always read from batch["data"] — not a column named after your blob property. If your bucket has a text property, the content is still in batch["data"].

Batched Processing

Process all rows together — never call a model or API inside a per-row loop:
# WRONG — one GPU call per row
for idx, row in batch.iterrows():
    embedding = self._model.encode(row["data"])
    batch.at[idx, "embedding"] = embedding

# RIGHT — single batched call
texts = batch["data"].fillna("").tolist()
batch["embedding"] = self._model.encode(texts).tolist()

Loading Assets from S3

For binary blobs (images, video, audio), the data column contains S3 URLs. Use the Extractor SDK to download them:
from shared.extractors import open_asset
from shared.extractors.sdk import parallel_io

class ImageProcessor:
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        def load_image(url):
            from PIL import Image
            path, is_temp = download_asset(url, suffix=".jpg")
            img = Image.open(path).convert("RGB")
            if is_temp:
                import os; os.unlink(path)
            return img

        images = parallel_io(batch["data"].tolist(), load_image, max_workers=8)
        # batch-process all images on GPU...
        return batch

Pipeline

Wire your processor into the Ray Data pipeline:
# pipeline.py
from engine.extractors.my_extractor.pipeline import (
    PipelineDefinition, ResourceType, StepDefinition, build_pipeline_steps
)
from .manifest import MyParams, metadata
from .processors.core import MyProcessor

def build_steps(extractor_request, container=None, **kwargs):
    params = MyParams(**(extractor_request.extractor_config.parameters or {}))

    pipeline = PipelineDefinition(
        name=metadata["feature_extractor_name"],
        version=metadata["version"],
        steps=[
            StepDefinition(
                service_class=MyProcessor,
                resource_type=ResourceType.CPU,
                config={"model_size": params.model_size},
            ),
        ]
    )
    return {"steps": build_pipeline_steps(pipeline)}

Row Conditions

Filter which rows a step processes:
ConditionMatches
RowCondition.IS_TEXTtext/* MIME types
RowCondition.IS_IMAGEimage/* MIME types
RowCondition.IS_VIDEOvideo/* MIME types
RowCondition.IS_AUDIOaudio/* MIME types
RowCondition.IS_PDFapplication/pdf
RowCondition.ALWAYSAll rows (default)

Using Built-in Models

Compose existing Mixpeek services instead of loading models yourself:
from shared.inference.registry import get_batch_service

WhisperBatch = get_batch_service("openai/whisper-large-v3-turbo")
E5Batch = get_batch_service("intfloat/multilingual-e5-large-instruct")

# Use in your pipeline steps
StepDefinition(service_class=E5Batch, resource_type=ResourceType.CPU, config=e5_config)
ServiceTypeDimensions
intfloat/multilingual-e5-large-instructEmbedding1024
google/siglip-base-patch16-224Embedding512
jinaai/jina-embeddings-v2-base-codeEmbedding768
BAAI/bge-reranker-v2-m3Reranker
openai/whisper-large-v3-turboTranscription

Real-time Endpoint

Add realtime.py to expose an HTTP endpoint for query-time inference. This is what lets retriever feature_search stages embed queries with your model.
# realtime.py
from shared.extractors.inference.serve import BaseInferenceService

class InferenceService(BaseInferenceService):
    def __init__(self):
        super().__init__()
        self._model = None

    async def run_inference(self, inputs: dict, parameters: dict) -> dict:
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

        text = inputs.get("text", "")
        embedding = self._model.encode([text])[0].tolist()
        return {"embedding": embedding}
The return dict must include an "embedding" key — this is what feature_search uses as the query vector. For multi-vector extractors, include additional keys matching your feature_name values.
realtime.py handles embedding only, not retrieval. If your pipeline needs retrieval context (comparing against stored references), configure retriever stages to handle that logic.

Platform Services

LLM Access

Use container.llm to call platform-managed LLMs (Gemini, OpenAI, Anthropic) with built-in cost tracking and caching:
# pipeline.py — pass to your processor
config = {"llm_service": container.llm}

# processors/core.py — call concurrently
from shared.extractors.sdk import concurrent_api_calls

async def analyze(text):
    return await self._llm.generate(
        instruction="Extract entities from this text",
        text=text,
        provider="google",
        model="gemini-2.5-flash",
        schema={"type": "object", "properties": {"entities": {"type": "array"}}}
    )

results = concurrent_api_calls(texts, analyze, max_concurrent=10)
ProviderModels
googlegemini-2.5-flash, gemini-2.5-pro
openaigpt-4o, gpt-4o-mini
anthropicclaude-sonnet-4-20250514

Secrets

Access encrypted org secrets at runtime via container.secrets:
api_key = await container.secrets.get("EXTERNAL_API_KEY")
For platform LLMs, use container.llm instead — it handles API keys automatically.

CLI Tools

Custom extractors can’t import subprocess directly. Use run_tool for whitelisted CLI tools:
from shared.extractors.sdk import run_tool

result = run_tool("ffmpeg", ["-y", "-i", input_path, "-c:v", "libx264", output_path], timeout=600)
Available tools: ffmpeg, ffprobe, convert, identify, magick, exiftool, mediainfo, sox, soxi, REDline, art-cmd See Pre-installed Tools and Cinema Camera Raw examples for details.

Cinema Camera Raw Files

RED R3D — decode via REDline:
run_tool("REDline", ["--i", input_path, "--o", output_path, "--format", "201", "--resize", "2"], timeout=600)
ARRI RAW — decode via ART CMD:
run_tool("art-cmd", ["--input", input_path, "--output", output_dir, "--format", "prores"], timeout=600)
Both output ProRes. Chain with FFmpeg for web playback:
run_tool("ffmpeg", ["-y", "-i", prores_path, "-c:v", "libx264", "-crf", "23", output_mp4], timeout=600)

Typed SDK

The Extractor SDK provides typed base classes that replace bare Python variables with validated, IDE-friendly types. Use these for autocompletion, validation at upload time, and output column checking in the test harness.

Typed Manifest

# manifest.py
from shared.extractors.sdk import ExtractorManifest, Feature

manifest = ExtractorManifest(
    feature_extractor_name="my_embedder",
    version="v1",
    description="Custom text embedder",
    dependencies=["sentence-transformers>=2.2"],
    system_packages=["libmagic1"],  # apt packages installed in the container
    features=[
        Feature.embedding("my_embedder_v1", dim=384),
    ],
    inference_type="embedding",
)

# Backward-compat: AST parser reads these module-level variables
feature_extractor_name = manifest.feature_extractor_name
version = manifest.version
description = manifest.description
dependencies = manifest.dependencies
features = manifest.features_as_dicts()
feature_uri = manifest.feature_uri

Typed Batch Processor

# pipeline.py
from shared.extractors.sdk import BatchProcessor, prefetch_hf_model

class MyProcessor(BatchProcessor):
    def setup(self):
        prefetch_hf_model("sentence-transformers/all-MiniLM-L6-v2")
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer("all-MiniLM-L6-v2")

    def process(self, batch):
        texts = batch["data"].fillna("").tolist()
        batch["my_embedder_v1_embedding"] = self.model.encode(texts).tolist()
        return batch
setup() runs once on first batch (lazy model loading). process() runs on each batch. prefetch_hf_model() pre-downloads the model to the HF cache to reduce cold start latency.

Typed Inference Service

# realtime.py
from shared.extractors.sdk import InferenceService

class MyInference(InferenceService):
    def setup(self):
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer("all-MiniLM-L6-v2")

    async def infer(self, inputs, parameters):
        text = inputs.get("text", "")
        embedding = self.model.encode([text])[0].tolist()
        return {"embedding": embedding}

System Packages

Declare apt packages your extractor needs in the manifest:
system_packages = ["ffmpeg", "libmagic1", "tesseract-ocr"]
These are installed in the container at deploy time. For complex native dependencies, use a BYO container image instead.

Extractor SDK Reference

Import from shared.extractors.sdk or shared.extractors:
Function / ClassPurpose
ExtractorManifestTyped manifest with validation
Feature.embedding(name, dim)Create an embedding feature definition
Feature.classification(name, labels)Create a classification feature definition
BatchProcessorBase class with setup() / process() lifecycle
InferenceServiceBase class with setup() / infer() lifecycle
prefetch_hf_model(model_id)Pre-download HF model to cache (cold start mitigation)
parallel_io(items, fn, max_workers)Parallel file downloads and I/O
concurrent_api_calls(items, async_fn, max_concurrent)Concurrent LLM/API calls
open_asset(url, suffix)Context manager for S3 downloads
download_asset(url, suffix)Manual S3 download with cleanup flag
run_tool(tool, args, timeout)Execute whitelisted CLI tools
upload_asset(path, namespace_id, internal_id, resource_id)Upload processed files back to S3

Example: Text Document Extractor

A complete extractor for domain-specific text processing — chunking documents by paragraph and embedding each chunk with a fine-tuned model. This pattern works for legal briefs, financial filings, medical records, or any text-heavy corpus.

Manifest

# manifest.py
from shared.extractors.sdk import ExtractorManifest, Feature

manifest = ExtractorManifest(
    feature_extractor_name="sec_filing_embedder",
    version="1.0.0",
    description="Chunk SEC filings by section and embed with finance-tuned model",
    dependencies=["sentence-transformers>=2.2"],
    features=[
        Feature.embedding("filing_embedding", dim=768),
    ],
    inference_type="embedding",
)

feature_extractor_name = manifest.feature_extractor_name
version = manifest.version
description = manifest.description
dependencies = manifest.dependencies
features = manifest.features_as_dicts()
feature_uri = manifest.feature_uri

Batch Processor

# processors/core.py
import pandas as pd

class FilingProcessor:
    def __init__(self, config, **kwargs):
        self._model = None

    def _ensure_model(self):
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("BAAI/bge-base-en-v1.5")

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        self._ensure_model()
        texts = batch["data"].fillna("").tolist()
        embeddings = self._model.encode(texts, normalize_embeddings=True).tolist()
        batch["filing_embedding"] = embeddings
        return batch

Pipeline

# pipeline.py
from engine.extractors.sec_filing_embedder.pipeline import (
    PipelineDefinition, ResourceType, StepDefinition, build_pipeline_steps
)
from .manifest import metadata
from .processors.core import FilingProcessor

def build_steps(extractor_request, container=None, **kwargs):
    pipeline = PipelineDefinition(
        name=metadata["feature_extractor_name"],
        version=metadata["version"],
        steps=[
            StepDefinition(
                service_class=FilingProcessor,
                resource_type=ResourceType.CPU,
                config={},
            ),
        ]
    )
    return {"steps": build_pipeline_steps(pipeline)}

Real-time Endpoint

# realtime.py
from shared.extractors.inference.serve import BaseInferenceService

class InferenceService(BaseInferenceService):
    def __init__(self):
        super().__init__()
        self._model = None

    async def run_inference(self, inputs: dict, parameters: dict) -> dict:
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("BAAI/bge-base-en-v1.5")

        text = inputs.get("text", "")
        embedding = self._model.encode([text], normalize_embeddings=True)[0].tolist()
        return {"embedding": embedding}

End-to-End: Extractor → Collection → Retriever

This walkthrough connects all the pieces — from deploying a custom extractor to searching against it.

1. Deploy the Extractor

# Package and upload
zip -r sec_filing_embedder.zip sec_filing_embedder/

curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/extractors/uploads" \
  -H "Authorization: Bearer $API_KEY" \
  -H "X-Namespace: $NS_ID" \
  -d '{"name": "sec_filing_embedder", "version": "1.0.0"}'

curl -X PUT "$PRESIGNED_URL" --data-binary @sec_filing_embedder.zip

curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/extractors/uploads/$UPLOAD_ID/confirm" \
  -H "Authorization: Bearer $API_KEY" \
  -H "X-Namespace: $NS_ID"

curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/extractors/$EXTRACTOR_ID/deploy" \
  -H "Authorization: Bearer $API_KEY" \
  -H "X-Namespace: $NS_ID"

2. Create a Bucket and Upload Data

# Create a bucket for SEC filings
curl -X POST "https://api.mixpeek.com/v1/buckets" \
  -H "Authorization: Bearer $API_KEY" \
  -H "X-Namespace: $NS_ID" \
  -H "Content-Type: application/json" \
  -d '{
    "bucket_name": "sec-filings",
    "blob_schema": {
      "filing_text": { "type": "text" }
    }
  }'

# Upload objects
curl -X POST "https://api.mixpeek.com/v1/buckets/$BUCKET_ID/objects" \
  -H "Authorization: Bearer $API_KEY" \
  -H "X-Namespace: $NS_ID" \
  -H "Content-Type: application/json" \
  -d '{
    "objects": [
      { "blob_data": { "filing_text": "Revenue increased 22% year-over-year..." }, "metadata": { "ticker": "AAPL", "form": "10-K" } }
    ]
  }'

3. Create a Collection with the Extractor

The collection references your extractor. Every object uploaded to the bucket will flow through your extractor’s batch processor.
curl -X POST "https://api.mixpeek.com/v1/collections" \
  -H "Authorization: Bearer $API_KEY" \
  -H "X-Namespace: $NS_ID" \
  -H "Content-Type: application/json" \
  -d '{
    "collection_name": "sec-filing-chunks",
    "source": { "type": "bucket", "bucket_id": "'$BUCKET_ID'" },
    "feature_extractor": {
      "feature_extractor_name": "sec_filing_embedder",
      "version": "1.0.0",
      "input_mappings": { "text": "payload.filing_text" }
    }
  }'

4. Build a Retriever

The retriever uses feature_search with your extractor’s feature URI. At query time, the retriever calls your extractor’s realtime.py to embed the query, then searches against the vectors your batch processor produced.
curl -X POST "https://api.mixpeek.com/v1/retrievers" \
  -H "Authorization: Bearer $API_KEY" \
  -H "X-Namespace: $NS_ID" \
  -H "Content-Type: application/json" \
  -d '{
    "retriever_name": "filing-search",
    "collection_ids": ["'$COLLECTION_ID'"],
    "stages": [
      {
        "stage_type": "search",
        "stage_id": "semantic",
        "feature_search": {
          "feature_uri": "mixpeek://sec_filing_embedder@1.0.0/filing_embedding",
          "limit": 20
        }
      }
    ],
    "fusion_strategy": { "type": "rrf" }
  }'
curl -X POST "https://api.mixpeek.com/v1/retrievers/$RETRIEVER_ID/execute" \
  -H "Authorization: Bearer $API_KEY" \
  -H "X-Namespace: $NS_ID" \
  -H "Content-Type: application/json" \
  -d '{
    "query": { "query_text": "revenue growth year over year" },
    "limit": 10
  }'
The response includes matched document chunks with scores, metadata, and lineage back to the original filing object.

Upload and Deploy

# 1. Package
zip -r my_extractor.zip my_extractor/

# 2. Get upload URL
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/extractors/uploads" \
  -H "Authorization: Bearer $API_KEY" \
  -d '{"name": "my_extractor", "version": "1.0.0"}'

# 3. Upload
curl -X PUT "$PRESIGNED_URL" --data-binary @my_extractor.zip

# 4. Confirm
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/extractors/uploads/$UPLOAD_ID/confirm" \
  -H "Authorization: Bearer $API_KEY"

# 5. Deploy
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/extractors/$EXTRACTOR_ID/deploy" \
  -H "Authorization: Bearer $API_KEY"
StatusDescription
QUEUEDWaiting in deployment queue
PENDINGDeployment triggered
IN_PROGRESSBlue-green rollout in progress
DEPLOYEDReady for real-time inference
FAILEDCheck error field
NOT_DEPLOYEDBatch-only mode
Upload API → · Deploy API → · Status API →

Local Development

Before uploading, validate and test your extractor locally with the CLI:
# Validate manifest and run security scanner (no API key needed)
python scripts/api/extractors.py lint path/to/my_extractor

# Run pipeline through Ray Data test harness (no API key needed)
python scripts/api/extractors.py test path/to/my_extractor
lint catches common mistakes before upload:
  • Wrong feature key names (name instead of feature_name)
  • Missing required fields
  • Security scanner violations
test runs your processor through real Ray Data map_batches with Arrow serialization — the same path used in production. It validates that output columns match your manifest features.

Version Management

Iterate on deployed extractors with a git-like workflow. See Version Management for the full command reference.

Archive Limits

LimitValue
Upload size500 MB
Max files1,000
Don’t bundle model weights — download from HuggingFace Hub at init time or use load_namespace_model() for custom weights.

Security Rules

Custom extractors are scanned before deployment. Code violating these rules is rejected.

Allowed

numpy, pandas, torch, transformers, sentence_transformers, onnxruntime, PIL, cv2, requests, httpx, os (safe functions only), json, re, pydantic, logging, getattr, hasattr

Blocked

subprocess, os.system, os.popen, os.exec*, eval, exec, ctypes, socket, multiprocessing, open, setattr, delattr
import os is allowed — only dangerous functions are blocked. Library-internal file I/O (torch.load, transformers.from_pretrained, pd.read_csv) is also fine since the scanner only inspects your extractor’s source code.

Troubleshooting

Usually wrong features key names in manifest.py. Run python scripts/api/extractors.py lint path/to/my_extractor to validate your manifest keys. Check that your collection has non-empty vector_indexes via GET /v1/collections/{id}. If empty, fix the manifest keys and recreate the collection. See Vector Index Keys.
Most common cause: reading from the wrong column. Always use batch["data"], not batch["text"] or other property names. Check Ray logs for [FailureAggregator] entries.
Check validation_errors. Common issues: using subprocess (use run_tool), using open() directly (use library I/O), using eval/exec (use json.loads).
Use prefetch_hf_model() in your setup() method to pre-download models to the HF cache. On GKE, HF_HOME points to a shared PVC so models persist across pod restarts. First cold start downloads (~1-2 min); subsequent starts use cache.
realtime.py handles embedding, not retrieval. Use retriever stages (semantic_searchrerankagentic_enrich) for retrieval logic.
If your manifest includes a feature_uri, the system expects a corresponding realtime.py. Without it, feature_search queries against that URI will fail. Omit both if you only need batch processing.
Use requests with a User-Agent header instead of urllib. curl works fine.