Skip to main content

Browse the extractor catalog on GitHub

Runnable reference for every built-in Mixpeek extractor — inputs, parameters, output fields, embedding models, and copy-paste examples. Auto-generated from the live registry, so it always matches production.
Custom extractor flow: a single extractor archive powers both the ingestion pipeline (batch Ray Data) and the retrieval realtime endpoint (Ray Serve HTTP), used by retriever stages like feature_search, rerank, and agentic_enrich
Custom extractors let you run your own code on Mixpeek infrastructure — inside the same Ray cluster that powers the built-in extractors and retriever stages. You keep full control of the logic, model, and I/O; Mixpeek handles packaging, scheduling, GPU allocation, caching, and observability.

Availability

The upload → deploy → real-time lifecycle runs on dedicated Enterprise infrastructure — it is not exposed on the shared public API at api.mixpeek.com. What works where:
CapabilityShared API (api.mixpeek.com)Dedicated deployment
Discover extractors (GET /v1/namespaces/{ns}/extractors, …/{id})
Author + lint + test locally (no API key)
Contribute via the Submission workflow (reviewed → merged as built-in)
Upload / deploy / undeploy / real-time inference
Version management (push / pull / rollback / diff)
On the shared API the upload/deploy/realtime endpoints return 404/405 by design. Contact your account team to provision a dedicated deployment for self-service custom-extractor uploads, or use Submissions to ship an extractor into the built-in catalog. The dedicated upload/deploy/realtime HTTP contract is documented in the Custom Extractor API (Dedicated Infrastructure) reference.
Want your extractor available to everyone without dedicated infra? Submit it for review to be merged into the built-in catalog — see Extractor Submissions.

What You Can Build

Custom extractors plug into two places in the warehouse:

1. Feature Extractors (Decomposition)

Attach custom logic to a collection’s feature_extractor so every ingested object flows through your pipeline during decomposition. Use this to:
  • Embed domain-specific content with your own model (fine-tuned CLIP, proprietary audio encoder, etc.)
  • Extract structured attributes via a VLM you manage (brand compliance, regulated content classification)
  • Transcribe, OCR, or segment media with a custom pre/post-processing chain
  • Produce multiple named vector indexes from a single pass
Outputs land in MVS and MongoDB with the same feature URI scheme as built-in extractors (mixpeek://my_extractor@1.0.0/my_embedding), so retrievers, taxonomies, and clusters can reference them.

2. Retriever Operations (Query Time)

An extractor’s realtime.py exposes a Ray Serve HTTP endpoint that retriever stages can call during execution. Use this to power:
  • feature_search — embed queries at search time with the same model you used during ingestion, so the query vector lives in the same space as the indexed vectors
  • Inference operations — on-the-fly classification, scoring, or re-ranking against your model
  • LLM calls — wrap a hosted or private LLM behind a stable contract, with platform-managed secrets and cost tracking
  • Classifiers — apply your own classifier to candidate results mid-pipeline
This is what lets a single custom extractor own both halves of a retrieval flow: it encodes documents on the way in, and encodes the query on the way out.

Delivery Formats

Ship your extractor in either of two formats:
FormatWhen to Use
Zip archive (.zip)Pure-Python extractors. Mixpeek resolves dependencies against the managed runtime. Scanned by the security linter before deploy. Limit: 500 MB / 1,000 files.
Container image (OCI)Extractors that need system packages, custom CUDA builds, compiled binaries, or non-Python runtimes. Base your image on the Mixpeek engine image, push to your org-scoped Artifact Registry repo, and set container_image in manifest.py. See BYO Container Image.
Both formats expose the same runtime APIs (batch __call__, real-time run_inference, platform LLM/secret accessors).

Extractor Structure

Every extractor has the same layout:
my_extractor/
├── manifest.py      # Schemas, metadata, vector indexes
├── pipeline.py      # Batch processing pipeline
├── realtime.py      # Real-time HTTP endpoint (optional)
└── processors/
    └── core.py      # Your processing logic
  • manifest.py declares what your extractor accepts, produces, and which vector indexes to create
  • pipeline.py wires your processor into the Ray Data batch pipeline
  • realtime.py exposes a Ray Serve endpoint for query-time inference (e.g., embedding queries for feature_search)
  • processors/ contains your actual logic — model loading, embedding, classification, etc.

Manifest

The manifest is your extractor’s contract with the platform.
# manifest.py
from pydantic import BaseModel, Field
from typing import List

class MyInput(BaseModel):
    text: str = Field(..., description="Input text to process")

class MyOutput(BaseModel):
    embedding: List[float] = Field(..., description="384-dim embedding vector")

class MyParams(BaseModel):
    model_size: str = Field(default="base", description="base or large")

metadata = {
    "feature_extractor_name": "my_extractor",
    "version": "1.0.0",
    "description": "Custom text embedding extractor",
    "category": "text",
    "inference_type": "embedding",  # declares real-time inference capability
}

input_schema = MyInput
output_schema = MyOutput
parameter_schema = MyParams
supported_input_types = ["text"]

features = [
    {
        "feature_type": "embedding",
        "feature_name": "my_embedding",
        "embedding_dim": 384,
        "distance_metric": "cosine",
    },
]

Vector Index Keys

Use the exact key names below. Wrong keys silently create a collection with no vector indexes — your batch will show COMPLETED but produce 0 documents.
KeyRequiredDescription
feature_typeYesMust be "embedding"
feature_nameYesName of the vector index
embedding_dimYesVector dimensionality
distance_metricYes"cosine", "euclid", or "dot"
Multiple vectors are supported — add one entry per embedding your extractor produces.

Inference Type

Declare what kind of real-time inference your extractor provides by setting inference_type in metadata. This lets retriever stages validate that an extractor is compatible with the stage slot.
ValueContractCompatible Stages
embeddingReturns {vector: [float]}feature_search
rerankAccepts {pairs: [[q, d]]}, returns {scores: [float]}rerank
classifyAccepts {text: str}, returns {labels: [{label, confidence}]}classify
generateAccepts {prompt: str}, returns {text: str}llm_filter, llm_enrich
generalNo specific contractRaw /inference endpoint only
If inference_type is omitted, the extractor can only be called via the raw inference endpoint.

Compute Profile

Control resource allocation by adding compute_profile to your manifest:
compute_profile = {
    "resource_type": "cpu",      # "cpu", "gpu", or "api"
    "batch_size": 32,            # Rows per __call__ (default: 64)
    "max_concurrency": 4,        # Parallel Ray actors (default: 2)
}
For API-based or hash-based extractors that don’t need GPU, set resource_type: "cpu" to skip GPU allocation — saves ~3 minutes startup time and costs ~6x less.

BYO Container Image

If your extractor needs native binaries or system packages, specify a custom container image:
# manifest.py
container_image = "us-east1-docker.pkg.dev/mixpeek-inference-463103/extractors-<your-org-id>/my-image:1.0.0"
Base your image on the Mixpeek engine image to get Ray, FFmpeg, and all SDK helpers:
FROM us-east1-docker.pkg.dev/mixpeek-inference-463103/mixpeek-engine/engine-base:latest
RUN apt-get update && apt-get install -y libcustom-sdk ...
Images must be pushed to your org-scoped Artifact Registry repo. GKE Workload Identity handles pull auth. Contact your account team to provision access.

Batch Processor

Your processor receives a pandas DataFrame and returns it with new columns added.
# processors/core.py
import pandas as pd

class MyProcessor:
    def __init__(self, config, **kwargs):
        self.config = config
        self._model = None

    def _ensure_model_loaded(self):
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        self._ensure_model_loaded()
        texts = batch["data"].fillna("").tolist()
        embeddings = self._model.encode(texts).tolist()
        batch["my_embedding"] = embeddings
        return batch

DataFrame Columns

Your __call__ receives these columns:
ColumnDescription
dataFor text blobs: the raw string. For binary blobs: an S3 URL (not raw bytes).
document_idUnique document ID
object_idSource object in the bucket
blob_type"image", "video", "audio", "text"
blob_propertyProperty name from your bucket schema
mime_typeMIME type (e.g. image/jpeg)
Always read from batch["data"] — not a column named after your blob property. If your bucket has a text property, the content is still in batch["data"]. (The local test harness feeds the same data column, so an extractor that passes test behaves the same in production.)

Batched Processing

Process all rows together — never call a model or API inside a per-row loop:
# WRONG — one GPU call per row
for idx, row in batch.iterrows():
    embedding = self._model.encode(row["data"])
    batch.at[idx, "embedding"] = embedding

# RIGHT — single batched call
texts = batch["data"].fillna("").tolist()
batch["embedding"] = self._model.encode(texts).tolist()

Loading Assets from S3

For binary blobs (images, video, audio), the data column contains S3 URLs. Use the Extractor SDK to download them:
from shared.extractors import open_asset
from shared.extractors.sdk import parallel_io, download_asset

class ImageProcessor:
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        def load_image(url):
            from PIL import Image
            path, is_temp = download_asset(url, suffix=".jpg")
            img = Image.open(path).convert("RGB")
            if is_temp:
                import os; os.unlink(path)
            return img

        images = parallel_io(batch["data"].tolist(), load_image, max_workers=8)
        # batch-process all images on GPU...
        return batch

Pipeline

Wire your processor into the Ray Data pipeline:
# pipeline.py
from engine.extractors.my_extractor.pipeline import (
    PipelineDefinition, ResourceType, StepDefinition, build_pipeline_steps
)
from .manifest import MyParams, metadata
from .processors.core import MyProcessor

def build_steps(extractor_request, container=None, **kwargs):
    params = MyParams(**(extractor_request.extractor_config.parameters or {}))

    pipeline = PipelineDefinition(
        name=metadata["feature_extractor_name"],
        version=metadata["version"],
        steps=[
            StepDefinition(
                service_class=MyProcessor,
                resource_type=ResourceType.CPU,
                config={"model_size": params.model_size},
            ),
        ]
    )
    return {"steps": build_pipeline_steps(pipeline)}

Row Conditions

Filter which rows a step processes:
ConditionMatches
RowCondition.IS_TEXTtext/* MIME types
RowCondition.IS_IMAGEimage/* MIME types
RowCondition.IS_VIDEOvideo/* MIME types
RowCondition.IS_AUDIOaudio/* MIME types
RowCondition.IS_PDFapplication/pdf
RowCondition.ALWAYSAll rows (default)

Using Built-in Models

Compose existing Mixpeek services instead of loading models yourself:
from shared.inference.registry import get_batch_service

WhisperBatch = get_batch_service("openai/whisper-large-v3-turbo")
E5Batch = get_batch_service("intfloat/multilingual-e5-large-instruct")

# Use in your pipeline steps
StepDefinition(service_class=E5Batch, resource_type=ResourceType.CPU, config=e5_config)
ServiceTypeDimensions
intfloat/multilingual-e5-large-instructEmbedding1024
google/siglip-base-patch16-224Embedding512
jinaai/jina-embeddings-v2-base-codeEmbedding768
BAAI/bge-reranker-v2-m3Reranker
openai/whisper-large-v3-turboTranscription
For HuggingFace and your own fine-tuned weights, see the Model Registry.

Real-time Endpoint

Add realtime.py to expose an HTTP endpoint for query-time inference. This is what lets retriever feature_search stages embed queries with your model.
# realtime.py
from shared.extractors.inference.serve import BaseInferenceService

class InferenceService(BaseInferenceService):
    def __init__(self):
        super().__init__()
        self._model = None

    async def run_inference(self, inputs: dict, parameters: dict) -> dict:
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

        text = inputs.get("text", "")
        embedding = self._model.encode([text])[0].tolist()
        return {"embedding": embedding}
The return dict must include an "embedding" key — this is what feature_search uses as the query vector. For multi-vector extractors, include additional keys matching your feature_name values.
realtime.py handles embedding only, not retrieval. If your pipeline needs retrieval context (comparing against stored references), configure retriever stages to handle that logic. The real-time endpoint serves on dedicated infrastructure (see Availability).

Platform Services

LLM Access

Use container.llm to call platform-managed LLMs with built-in cost tracking and caching:
# pipeline.py — pass to your processor
config = {"llm_service": container.llm}

# processors/core.py — call concurrently
from shared.extractors.sdk import concurrent_api_calls

async def analyze(text):
    return await self._llm.generate(
        instruction="Extract entities from this text",
        text=text,
        provider="google",
        model="gemini-2.5-flash",
        schema={"type": "object", "properties": {"entities": {"type": "array"}}}
    )

results = concurrent_api_calls(texts, analyze, max_concurrent=10)
ProviderModels
googlegemini-2.5-flash, gemini-2.5-pro
openaigpt-4o, gpt-4o-mini
anthropicclaude-sonnet-4-20250514

Secrets

Access encrypted org secrets at runtime via container.secrets:
api_key = await container.secrets.get("EXTERNAL_API_KEY")
For platform LLMs, use container.llm instead — it handles API keys automatically.

CLI Tools

Custom extractors can’t import subprocess directly. Use run_tool for whitelisted CLI tools:
from shared.extractors.sdk import run_tool

result = run_tool("ffmpeg", ["-y", "-i", input_path, "-c:v", "libx264", output_path], timeout=600)
Available tools: ffmpeg, ffprobe, convert, identify, magick, exiftool, mediainfo, sox, soxi, REDline, art-cmd

Pre-installed Tools

The engine runtime includes these media tools, available via run_tool:
ToolFormatDescription
ffmpeg / ffprobeStandard video/audioTranscode, extract frames, probe metadata
REDlineRED R3DDecode RED cinema camera raw files to ProRes/DPX/EXR
art-cmdARRI RAWDecode ARRI raw (.ari/.arriraw/.arx) to ProRes
exiftoolAll mediaRead/write EXIF and XMP metadata
mediainfoAll mediaDetailed format and codec inspection
convert / identifyImagesImageMagick image processing
sox / soxiAudioAudio processing and info
RED R3D and ARRI RAW decode examples:
run_tool("REDline", ["--i", input_path, "--o", output_path, "--format", "201", "--resize", "2"], timeout=600)
run_tool("art-cmd", ["--input", input_path, "--output", output_dir, "--format", "prores"], timeout=600)
# Chain with FFmpeg for web playback:
run_tool("ffmpeg", ["-y", "-i", prores_path, "-c:v", "libx264", "-crf", "23", output_mp4], timeout=600)

Typed SDK

The Extractor SDK provides typed base classes that replace bare Python variables with validated, IDE-friendly types — autocompletion, validation at upload time, and output column checking in the test harness. Import from shared.extractors.sdk or shared.extractors.
# manifest.py
from shared.extractors.sdk import ExtractorManifest, Feature

manifest = ExtractorManifest(
    feature_extractor_name="my_embedder",
    version="v1",
    description="Custom text embedder",
    dependencies=["sentence-transformers>=2.2"],
    system_packages=["libmagic1"],  # apt packages installed in the container
    features=[Feature.embedding("my_embedder_v1", dim=384)],
    inference_type="embedding",
)

# Backward-compat: AST parser reads these module-level variables
feature_extractor_name = manifest.feature_extractor_name
version = manifest.version
description = manifest.description
dependencies = manifest.dependencies
features = manifest.features_as_dicts()
feature_uri = manifest.feature_uri
# pipeline.py
from shared.extractors.sdk import BatchProcessor, prefetch_hf_model

class MyProcessor(BatchProcessor):
    def setup(self):
        prefetch_hf_model("sentence-transformers/all-MiniLM-L6-v2")
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer("all-MiniLM-L6-v2")

    def process(self, batch):
        texts = batch["data"].fillna("").tolist()
        batch["my_embedder_v1_embedding"] = self.model.encode(texts).tolist()
        return batch
setup() runs once on first batch (lazy model loading); process() runs on each batch. prefetch_hf_model() pre-downloads the model to the HF cache to reduce cold start.
# realtime.py
from shared.extractors.sdk import InferenceService

class MyInference(InferenceService):
    def setup(self):
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer("all-MiniLM-L6-v2")

    async def infer(self, inputs, parameters):
        text = inputs.get("text", "")
        return {"embedding": self.model.encode([text])[0].tolist()}

SDK Reference

Function / ClassPurpose
ExtractorManifestTyped manifest with validation
Feature.embedding(name, dim)Create an embedding feature definition
Feature.classification(name, labels)Create a classification feature definition
BatchProcessorBase class with setup() / process() lifecycle
InferenceServiceBase class with setup() / infer() lifecycle
prefetch_hf_model(model_id)Pre-download HF model to cache (cold start mitigation)
parallel_io(items, fn, max_workers)Parallel file downloads and I/O
concurrent_api_calls(items, async_fn, max_concurrent)Concurrent LLM/API calls
open_asset(url, suffix)Context manager for S3 downloads
download_asset(url, suffix)Manual S3 download with cleanup flag
run_tool(tool, args, timeout)Execute whitelisted CLI tools
upload_asset(path, namespace_id, internal_id, resource_id)Upload processed files back to S3

Security Rules

Custom extractors are scanned before deployment. Code violating these rules is rejected.
  • Allowed: numpy, pandas, torch, transformers, sentence_transformers, onnxruntime, PIL, cv2, requests, httpx, os (safe functions only), json, re, pydantic, logging, getattr, hasattr
  • Blocked: subprocess, os.system, os.popen, os.exec*, eval, exec, ctypes, socket, multiprocessing, open, setattr, delattr
import os is allowed — only dangerous functions are blocked. Library-internal file I/O (torch.load, transformers.from_pretrained, pd.read_csv) is fine since the scanner only inspects your extractor’s source code.

Local Development

Validate and test your extractor locally with the CLI before uploading. No API key needed — these run fully offline and work on any plan.
# Validate manifest and run the security scanner
python server/scripts/api/plugins.py lint path/to/my_extractor

# Run the pipeline through the Ray Data test harness (real map_batches + Arrow)
python server/scripts/api/plugins.py test path/to/my_extractor
lint catches common mistakes before upload:
  • Wrong feature key names (name instead of feature_name)
  • Missing required fields
  • Security scanner violations
test runs your processor through real Ray Data map_batches with Arrow serialization — the same path used in production. Sample rows are fed in the data column (matching production), and the harness validates that your output columns match the manifest features.

Version Management

On a dedicated deployment, the same CLI manages deployed versions with a git-like workflow:
CommandDescription
pullDownload the active version’s source files to a local directory
pushZip, upload, and confirm a new version (auto-bumps patch version if omitted)
logShow version history with deploy timestamps and commit messages
statusShow active version, extractor ID, and deployment status
rollbackRestore a previous version as active
diffCompare source files between two versions
The CLI reads MIXPEEK_API_KEY, MIXPEEK_NAMESPACE, and MIXPEEK_API_URL from the environment (or --api-key / --namespace).

Archive Limits

LimitValue
Upload size500 MB
Max files1,000
Don’t bundle model weights — download from HuggingFace Hub at init time, or use the Model Registry for custom weights.

End-to-End: Extractor → Collection → Retriever

This walkthrough connects all the pieces. Set MIXPEEK_API_KEY, MIXPEEK_NAMESPACE, and MIXPEEK_API_URL first — the same variables the plugins.py CLI reads.
Steps 1 and 5 (the extractor upload/deploy) require a dedicated deployment. On the shared API, use a built-in extractor (e.g. text_extractor) for the feature_extractor in step 3 and skip steps 1 and 5.

1. Deploy the Extractor (dedicated infra)

The simplest path is the CLI — python server/scripts/api/plugins.py push then deploy. The equivalent raw HTTP (custom extractors are addressed as /plugins on this surface; see the Custom Extractor API reference):
zip -r my_extractor.zip my_extractor/

# Presigned upload → confirm → deploy
UPLOAD=$(curl -s -X POST "$MIXPEEK_API_URL/v1/namespaces/$MIXPEEK_NAMESPACE/plugins/uploads" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "Content-Type: application/json" \
  -d '{"name":"my_extractor","version":"1.0.0","file_size_bytes":5000}')
UPLOAD_ID=$(echo "$UPLOAD" | jq -r '.upload_id')
PRESIGNED_URL=$(echo "$UPLOAD" | jq -r '.presigned_url')

curl -s -X PUT "$PRESIGNED_URL" -H "Content-Type: application/zip" --data-binary @my_extractor.zip

curl -s -X POST "$MIXPEEK_API_URL/v1/namespaces/$MIXPEEK_NAMESPACE/plugins/uploads/$UPLOAD_ID/confirm" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "Content-Type: application/json" -d '{}'

curl -s -X POST "$MIXPEEK_API_URL/v1/namespaces/$MIXPEEK_NAMESPACE/plugins/my_extractor_1_0_0/deploy" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY"

2. Create a Bucket and Upload Data

Buckets, collections, retrievers, and batches are top-level resources addressed by the X-Namespace header — not nested under /namespaces/{ns}/. (Only extractors and models are path-scoped.)
BUCKET=$(curl -s -X POST "https://api.mixpeek.com/v1/buckets" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "X-Namespace: $MIXPEEK_NAMESPACE" \
  -H "Content-Type: application/json" \
  -d '{"bucket_name":"sec-filings","bucket_schema":{"properties":{"filing_text":{"type":"text"}}}}')
BUCKET_ID=$(echo "$BUCKET" | jq -r '.bucket_id')

curl -s -X POST "https://api.mixpeek.com/v1/buckets/$BUCKET_ID/objects" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "X-Namespace: $MIXPEEK_NAMESPACE" \
  -H "Content-Type: application/json" \
  -d '{"blobs":[{"property":"filing_text","type":"text","data":"Revenue increased 22% year-over-year..."}],"metadata":{"ticker":"AAPL","form":"10-K"}}'

3. Create a Collection with the Extractor

Every object uploaded to the bucket flows through this extractor’s batch processor.
COLLECTION=$(curl -s -X POST "https://api.mixpeek.com/v1/collections" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "X-Namespace: $MIXPEEK_NAMESPACE" \
  -H "Content-Type: application/json" \
  -d "{
    \"collection_name\":\"sec-filing-chunks\",
    \"source\":{\"type\":\"bucket\",\"bucket_ids\":[\"$BUCKET_ID\"]},
    \"feature_extractor\":{\"feature_extractor_name\":\"my_extractor\",\"version\":\"1.0.0\",\"input_mappings\":{\"text\":\"filing_text\"}}
  }")
COLLECTION_ID=$(echo "$COLLECTION" | jq -r '.collection_id')

4. Process the Data (two-step batch)

A batch is created (with the objects + target collections) and then submitted:
# List object IDs, then create the batch
OBJIDS=$(curl -s "https://api.mixpeek.com/v1/buckets/$BUCKET_ID/objects" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "X-Namespace: $MIXPEEK_NAMESPACE" \
  | jq -c '[.results[].object_id]')

BATCH=$(curl -s -X POST "https://api.mixpeek.com/v1/buckets/$BUCKET_ID/batches" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "X-Namespace: $MIXPEEK_NAMESPACE" \
  -H "Content-Type: application/json" \
  -d "{\"batch_name\":\"run-1\",\"object_ids\":$OBJIDS,\"collection_ids\":[\"$COLLECTION_ID\"]}")
BATCH_ID=$(echo "$BATCH" | jq -r '.batch_id')

curl -s -X POST "https://api.mixpeek.com/v1/buckets/$BUCKET_ID/batches/$BATCH_ID/submit" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "X-Namespace: $MIXPEEK_NAMESPACE" \
  -H "Content-Type: application/json" \
  -d "{\"collection_ids\":[\"$COLLECTION_ID\"]}"

# Poll GET /v1/buckets/$BUCKET_ID/batches/$BATCH_ID until status is COMPLETED
At query time the retriever calls your extractor’s realtime.py (dedicated infra) to embed the query, then searches the vectors your batch processor produced.
RETRIEVER=$(curl -s -X POST "https://api.mixpeek.com/v1/retrievers" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "X-Namespace: $MIXPEEK_NAMESPACE" \
  -H "Content-Type: application/json" \
  -d "{
    \"retriever_name\":\"filing-search\",
    \"collection_identifiers\":[\"$COLLECTION_ID\"],
    \"input_schema\":{\"query\":{\"type\":\"text\",\"required\":true}},
    \"stages\":[{\"stage_name\":\"semantic\",\"stage_type\":\"filter\",\"config\":{\"stage_id\":\"feature_search\",\"parameters\":{
      \"searches\":[{\"feature_uri\":\"mixpeek://my_extractor@1.0.0/my_embedding\",\"query\":{\"input_mode\":\"text\",\"value\":\"{{INPUT.query}}\"},\"top_k\":20}],
      \"final_top_k\":20,\"fusion\":\"rrf\"}}}]
  }")
RETRIEVER_ID=$(echo "$RETRIEVER" | jq -r '.retriever_id')

curl -s -X POST "https://api.mixpeek.com/v1/retrievers/$RETRIEVER_ID/execute" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "X-Namespace: $MIXPEEK_NAMESPACE" \
  -H "Content-Type: application/json" \
  -d '{"inputs":{"query":"revenue growth year over year"}}'

Troubleshooting

The upload/deploy/realtime lifecycle is only available on a dedicated deployment — the shared api.mixpeek.com exposes only GET list/details for extractors. Develop + lint + test locally, then either provision a dedicated deployment or ship via Submissions.
Usually wrong features key names in manifest.py. Run python server/scripts/api/plugins.py lint path/to/my_extractor to validate. Check that your collection has non-empty vector_indexes via GET /v1/collections/{id}. See Vector Index Keys.
Most common cause: reading from the wrong column. Always use batch["data"], not batch["text"] or other property names. Check Ray logs for [FailureAggregator] entries.
Check validation_errors. Common issues: using subprocess (use run_tool), using open() directly (use library I/O), using eval/exec (use json.loads).
Use prefetch_hf_model() in your setup() method to pre-download models to the HF cache. On GKE, HF_HOME points to a shared PVC so models persist across pod restarts. First cold start downloads (~1-2 min); subsequent starts use cache.
On a cold engine, the embedding model loads on demand — the first batch can take several minutes to leave PROCESSING, and the first retriever execute afterward may return 0 results (status completed or degraded) while the query-side model warms. This is a cold-start artifact, not a real no-match: retry after a few seconds and results appear. A warm namespace responds immediately. Keep a namespace warm by issuing a periodic lightweight query, or contact your account team about a warm replica floor for latency-sensitive workloads.
If your manifest includes a feature_uri, the system expects a corresponding realtime.py. Without it, feature_search queries against that URI will fail. Omit both if you only need batch processing.

Next Steps

Quickstart

Build, test, and query a minimal text embedding extractor end-to-end.

Model Registry

Load HuggingFace models or your own fine-tuned weights inside an extractor.

Extractor Submissions

Submit your extractor for review to be merged into the built-in catalog.

Multi-Tier Extraction

Chain collections into a DAG — transcribe, then embed, then classify.

Reprocess Existing Content

Run a new extractor over an already-ingested corpus — scoped, cost-safe, priced before you run.