> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mixpeek.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Custom Extractor Quickstart

> Build, test, and query a custom text embedding extractor

<Tip>Custom extractors extend the warehouse's Decompose layer with your own feature extraction logic. For the full reference, see [Custom Extractors](/processing/custom-extractors).</Tip>

## What You'll Build

A custom text embedding extractor that:

1. Generates 128-dimensional embeddings from text (batch + real-time)
2. Validates locally with `lint` and `test` — no API key needed
3. Powers search through a retriever

<Info>
  **Where this runs.** Authoring + `lint` + `test` work everywhere. The **upload/deploy** steps require a [dedicated deployment](/processing/custom-extractors#availability) — the shared `api.mixpeek.com` API does not expose custom-extractor uploads. On the shared API, swap in a built-in extractor (e.g. `text_extractor`) at Step 4 and skip Step 3, or ship your extractor via [Submissions](/processing/extractor-marketplace). The ingest + search steps (4–5) run on any plan.
</Info>

Set these first:

```bash theme={null}
export MIXPEEK_API_KEY="mxp_sk_..."
export MIXPEEK_NAMESPACE="ns_..."
export MIXPEEK_API_URL="https://api.mixpeek.com"   # dedicated deployments use your tenant URL
```

<Note>These are the same three variables the `plugins.py` CLI reads, so the CLI and the raw `curl` examples below use one consistent convention.</Note>

## Step 1: Create Extractor Files

Create a directory `text_embed/` with three files.

### manifest.py

```python theme={null}
feature_extractor_name = "text_embed"
version = "1.0.0"
description = "Text embedding extractor"

dependencies = []

features = [
    {
        "feature_type": "embedding",
        "feature_name": "text_embed_v1_embedding",
        "embedding_dim": 128,
        "distance_metric": "cosine",
    }
]

# Declares real-time inference capability (query embedding for feature_search)
inference_type = "embedding"

# Skip GPU — this extractor is CPU-only
compute_profile = {"resource_type": "cpu"}
```

<Warning>
  Use the exact key names: `feature_type`, `feature_name`, `embedding_dim`, `distance_metric`. Using `name`/`type`/`dimensions`/`distance` will silently produce a collection with no vector indexes.
</Warning>

### pipeline.py

```python theme={null}
import hashlib
from typing import List

import numpy as np
import pandas as pd


def text_to_embedding(text: str, dim: int = 128) -> List[float]:
    """Generate a deterministic, L2-normalized embedding from text."""
    hash_bytes = hashlib.sha256(text.encode("utf-8")).digest()
    seed = int.from_bytes(hash_bytes[:4], byteorder="big")
    rng = np.random.default_rng(seed)
    embedding = rng.standard_normal(dim).astype(np.float32)
    norm = np.linalg.norm(embedding)
    if norm > 0:
        embedding = embedding / norm
    return embedding.tolist()


class TextEmbedBatchProcessor:
    def __init__(self, config=None, **kwargs):
        # Custom extractors receive blob content in the 'data' column
        self.text_column = "data"
        self.output_column = "text_embed_v1_embedding"
        self.embedding_dim = 128

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        if batch.empty:
            return batch
        batch = batch.reset_index(drop=True)

        texts, valid = [], []
        for idx, v in enumerate(batch.get(self.text_column, [])):
            text = "" if v is None else str(v)
            if text.strip():
                texts.append(text)
                valid.append(idx)

        batch[self.output_column] = None
        if texts:
            embs = [text_to_embedding(t, self.embedding_dim) for t in texts]
            for i, orig_idx in enumerate(valid):
                batch.at[orig_idx, self.output_column] = embs[i]
        return batch


def build_steps(extractor_request=None, container=None,
                base_steps=None, dataset_size=None, content_flags=None):
    steps = list(base_steps or [])
    steps.append(TextEmbedBatchProcessor())
    return {"steps": steps, "prepare": lambda ds: ds}
```

<Warning>
  Always read from `batch["data"]` — NOT `batch["text"]`. The `data` column holds raw text for text blobs and S3 URLs for binary blobs. The local `test` harness feeds the same `data` column, so a passing `test` matches production.
</Warning>

### realtime.py

Embeds the query at search time so it lands in the same space as your indexed vectors.

```python theme={null}
import hashlib
import numpy as np


class InferenceService:
    async def run_inference(self, inputs: dict, parameters: dict) -> dict:
        text = inputs.get("text", "")
        h = hashlib.sha256(text.encode("utf-8")).digest()
        rng = np.random.default_rng(int.from_bytes(h[:4], "big"))
        v = rng.standard_normal(128).astype(np.float32)
        n = np.linalg.norm(v)
        return {"embedding": (v / n if n else v).tolist()}
```

## Step 2: Validate Locally (no API key)

```bash theme={null}
python server/scripts/api/plugins.py lint text_embed
python server/scripts/api/plugins.py test text_embed
```

`lint` validates your manifest + runs the security scanner. `test` runs the pipeline through real Ray Data `map_batches` and confirms your output column is populated. Both are offline.

## Step 3: Deploy (dedicated infrastructure)

<Note>
  Upload/deploy is only available on a [dedicated deployment](/processing/custom-extractors#availability) — full HTTP contract in the [Custom Extractor API](/processing/custom-extractor-api) reference. On the shared API, skip to Step 4 with a built-in extractor, or use [Submissions](/processing/extractor-marketplace).
</Note>

The CLI does this for you (`plugins.py push` then `deploy`). Raw HTTP (custom extractors are addressed as `/plugins` on this surface):

```bash theme={null}
zip -r text_embed.zip text_embed/

UPLOAD=$(curl -s -X POST "$MIXPEEK_API_URL/v1/namespaces/$MIXPEEK_NAMESPACE/plugins/uploads" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "Content-Type: application/json" \
  -d '{"name":"text_embed","version":"1.0.0","file_size_bytes":5000}')
UPLOAD_ID=$(echo "$UPLOAD" | jq -r '.upload_id')
PRESIGNED_URL=$(echo "$UPLOAD" | jq -r '.presigned_url')

curl -s -X PUT "$PRESIGNED_URL" -H "Content-Type: application/zip" --data-binary @text_embed.zip
curl -s -X POST "$MIXPEEK_API_URL/v1/namespaces/$MIXPEEK_NAMESPACE/plugins/uploads/$UPLOAD_ID/confirm" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY" -H "Content-Type: application/json" -d '{}'
curl -s -X POST "$MIXPEEK_API_URL/v1/namespaces/$MIXPEEK_NAMESPACE/plugins/text_embed_1_0_0/deploy" \
  -H "Authorization: Bearer $MIXPEEK_API_KEY"
```

## Step 4: Ingest Data

<Note>
  Buckets, collections, retrievers, and batches are **top-level** resources keyed by the **`X-Namespace` header** — not nested under `/namespaces/{ns}/`.
</Note>

```bash theme={null}
H=(-H "Authorization: Bearer $MIXPEEK_API_KEY" -H "X-Namespace: $MIXPEEK_NAMESPACE" -H "Content-Type: application/json")
B="https://api.mixpeek.com/v1"

# Bucket
BUCKET_ID=$(curl -s -X POST "$B/buckets" "${H[@]}" \
  -d '{"bucket_name":"articles","bucket_schema":{"properties":{"text":{"type":"text"}}}}' | jq -r '.bucket_id')

# Object
curl -s -X POST "$B/buckets/$BUCKET_ID/objects" "${H[@]}" \
  -d '{"blobs":[{"property":"text","type":"text","data":"Quantum computing uses qubits to perform calculations exponentially faster than classical computers."}]}'

# Collection — use "text_embed" if deployed, or a built-in like "text_extractor" on the shared API
COLLECTION_ID=$(curl -s -X POST "$B/collections" "${H[@]}" -d "{
  \"collection_name\":\"text_embed_articles\",
  \"source\":{\"type\":\"bucket\",\"bucket_ids\":[\"$BUCKET_ID\"]},
  \"feature_extractor\":{\"feature_extractor_name\":\"text_embed\",\"version\":\"1.0.0\",\"input_mappings\":{\"text\":\"text\"}}
}" | jq -r '.collection_id')

# Batch is two-step: create (objects + collections) then submit
OBJIDS=$(curl -s "$B/buckets/$BUCKET_ID/objects" "${H[@]}" | jq -c '[.results[].object_id]')
BATCH_ID=$(curl -s -X POST "$B/buckets/$BUCKET_ID/batches" "${H[@]}" \
  -d "{\"batch_name\":\"run-1\",\"object_ids\":$OBJIDS,\"collection_ids\":[\"$COLLECTION_ID\"]}" | jq -r '.batch_id')
curl -s -X POST "$B/buckets/$BUCKET_ID/batches/$BATCH_ID/submit" "${H[@]}" \
  -d "{\"collection_ids\":[\"$COLLECTION_ID\"]}"

# Poll until COMPLETED
while true; do
  S=$(curl -s "$B/buckets/$BUCKET_ID/batches/$BATCH_ID" "${H[@]}" | jq -r '.status')
  echo "batch: $S"; case "$S" in COMPLETED|COMPLETED_WITH_ERRORS|FAILED|CANCELED) break;; esac; sleep 8
done
```

## Step 5: Create a Retriever and Search

<Warning>
  **First-run cold start.** On a cold engine the embedding model can take **several minutes** to load. The batch poll in Step 4 waits for the ingest side, but the **first `execute` immediately after may return 0 results** (with `status` `completed` *or* `degraded`) while the **query-side** model warms up. This is expected on the first call only — **retry after a few seconds** and subsequent searches return ranked results. A warm namespace responds immediately.
</Warning>

```bash theme={null}
RETRIEVER_ID=$(curl -s -X POST "$B/retrievers" "${H[@]}" -d "{
  \"retriever_name\":\"text_search\",
  \"collection_identifiers\":[\"$COLLECTION_ID\"],
  \"input_schema\":{\"query\":{\"type\":\"text\",\"required\":true}},
  \"stages\":[{\"stage_name\":\"semantic\",\"stage_type\":\"filter\",\"config\":{\"stage_id\":\"feature_search\",\"parameters\":{
    \"searches\":[{\"feature_uri\":\"mixpeek://text_embed@1.0.0/text_embed_v1_embedding\",\"query\":{\"input_mode\":\"text\",\"value\":\"{{INPUT.query}}\"},\"top_k\":10}],
    \"final_top_k\":10,\"fusion\":\"rrf\"}}}]
}" | jq -r '.retriever_id')

curl -s -X POST "$B/retrievers/$RETRIEVER_ID/execute" "${H[@]}" \
  -d '{"inputs":{"query":"quantum computing"}}' | jq '.results[:3]'
```

<Note>
  The execute body is `{"inputs": {"query": "..."}}`. The `feature_uri` must match your extractor + feature name (for a built-in, fetch it from `GET /v1/namespaces/$MIXPEEK_NAMESPACE/extractors/{extractor_id}`).
</Note>

## Next Steps

* [Custom Extractors](/processing/custom-extractors) — full manifest, SDK, security, and platform-services reference
* [Model Registry](/processing/model-registry) — load HuggingFace or your own fine-tuned weights
* [Taxonomies](/enrichment/taxonomies) and [Clusters](/enrichment/clusters) — auto-classify and group your embeddings
* [Alerts](/enrichment/alerts) and [Webhooks](/operations/webhooks) — monitor new content and processing events
