Custom plugins extend the warehouse’s Decompose layer with your own feature extraction logic.
What You’ll Build
A custom text embedding plugin that:- Generates 128-dimensional embeddings from text
- Processes documents via batch pipeline
- Supports search via a retriever
Prerequisites
- A Mixpeek API key
- A namespace (create one via the API if needed)
curlfor API calls
Step 1: Create Plugin Files
Create a directorytext_embed/ with three files:
manifest.py
feature_extractor_name = "text_embed"
version = "1.0.0"
description = "Text embedding plugin"
dependencies = []
features = [
{
"feature_type": "embedding",
"feature_name": "text_embed_v1_embedding",
"embedding_dim": 128,
"distance_metric": "cosine",
}
]
output_schema = {
"text_embed_v1_embedding": {
"type": "array",
"items": {"type": "number"},
"description": "128-dim text embedding",
},
}
input_mappings = {"text": "text"}
tier = 1
tier_label = "SIMPLE"
# Skip GPU — this plugin is CPU-only
compute_profile = {"resource_type": "cpu"}
# Optional: BYO container image for native dependencies (Enterprise)
# container_image = "us-east1-docker.pkg.dev/mixpeek-inference-463103/plugins-<org-id>/my-image:1.0.0"
Use the exact key names:
feature_type, feature_name, embedding_dim, distance_metric. Using name/type/dimensions/distance will silently fail.Need native binaries or system packages? Set
container_image in your manifest to use a custom container. See BYO Container Image for details.pipeline.py
import hashlib
from typing import Any, Dict, List, Optional
import numpy as np
import pandas as pd
def text_to_embedding(text: str, dim: int = 128) -> List[float]:
"""Generate deterministic embedding from text."""
hash_bytes = hashlib.sha256(text.encode("utf-8")).digest()
seed = int.from_bytes(hash_bytes[:4], byteorder="big")
rng = np.random.default_rng(seed)
embedding = rng.standard_normal(dim).astype(np.float32)
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
return embedding.tolist()
class TextEmbedBatchProcessor:
def __init__(self, config=None, **kwargs):
config = config or {}
# IMPORTANT: Custom plugins receive data in the 'data' column
self.text_column = "data"
self.output_column = "text_embed_v1_embedding"
self.embedding_dim = 128
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
if batch.empty:
return batch
batch = batch.reset_index(drop=True)
texts = []
valid_indices = []
for idx, v in enumerate(batch.get(self.text_column, [])):
text = "" if v is None else str(v)
if text.strip():
texts.append(text)
valid_indices.append(idx)
batch[self.output_column] = None
if texts:
embeddings = [text_to_embedding(t, self.embedding_dim) for t in texts]
for i, orig_idx in enumerate(valid_indices):
batch.at[orig_idx, self.output_column] = embeddings[i]
return batch
def build_steps(extractor_request=None, container=None,
base_steps=None, dataset_size=None, content_flags=None):
processor = TextEmbedBatchProcessor()
steps = list(base_steps or [])
steps.append(processor)
return {"steps": steps, "prepare": lambda ds: ds}
def extract(extractor_request=None, base_steps=None,
dataset_size=None, content_flags=None):
result = build_steps(extractor_request=extractor_request,
base_steps=base_steps, dataset_size=dataset_size,
content_flags=content_flags)
class PipelineResult:
def __init__(self, steps, prepare):
self.steps = steps
self.prepare = prepare
return PipelineResult(result["steps"], result["prepare"])
Always read from
batch["data"] — NOT batch["text"] or any other column name. The data column contains raw text for text blobs and S3 URLs for binary blobs.Step 2: Package and Upload
# Package
cd /path/to/parent && zip -r text_embed.zip text_embed/
# Request presigned upload URL
UPLOAD=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{"name": "text_embed", "version": "1.0.0", "file_size_bytes": 5000}')
UPLOAD_ID=$(echo $UPLOAD | jq -r '.upload_id')
PRESIGNED_URL=$(echo $UPLOAD | jq -r '.presigned_url')
# Upload archive
curl -s -X PUT "$PRESIGNED_URL" \
-H "Content-Type: application/zip" \
--data-binary @text_embed.zip
# Confirm upload
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads/$UPLOAD_ID/confirm" \
-H "Authorization: Bearer $API_KEY"
Step 3: Deploy
# Deploy for batch processing (works on all tiers)
PLUGIN_ID="text_embed_1_0_0"
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy?deployment_type=batch_only" \
-H "Authorization: Bearer $API_KEY"
Use
?deployment_type=batch_only unless you have Enterprise tier. Realtime endpoints require dedicated infrastructure.Step 4: Create Bucket and Upload Data
# Create a bucket with text schema
BUCKET=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{"name": "articles", "bucket_schema": {"properties": {"text": {"type": "text", "required": true}}}}')
BUCKET_ID=$(echo $BUCKET | jq -r '.bucket_id')
# Upload objects
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets/$BUCKET_ID/objects" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"blobs": [{"property": "text", "type": "text", "data": "Quantum computing uses qubits to perform calculations exponentially faster than classical computers."}]
}'
Blobs must be a list, not a dict. Each blob needs
property, type, and data fields.Step 5: Create Collection and Process
# Create collection with your plugin
COLLECTION=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/collections" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"name\": \"text_embed_articles\",
\"feature_extractors\": [{\"feature_extractor_name\": \"text_embed\", \"version\": \"1.0.0\"}],
\"source\": {\"type\": \"bucket\", \"bucket_ids\": [\"$BUCKET_ID\"]}
}")
COLLECTION_ID=$(echo $COLLECTION | jq -r '.collection_id')
# Trigger batch processing
BATCH=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets/$BUCKET_ID/batches/trigger" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{\"collection_ids\": [\"$COLLECTION_ID\"]}")
BATCH_ID=$(echo $BATCH | jq -r '.batch_id')
# Poll for completion
while true; do
STATUS=$(curl -s "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets/$BUCKET_ID/batches/$BATCH_ID" \
-H "Authorization: Bearer $API_KEY" | jq -r '.status')
echo "Batch status: $STATUS"
[ "$STATUS" = "COMPLETED" ] || [ "$STATUS" = "FAILED" ] && break
sleep 10
done
Step 6: Create Retriever and Search
# Create retriever
RETRIEVER=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/retrievers" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"name\": \"text_search\",
\"stages\": [{
\"stage_name\": \"vector_search\",
\"stage_type\": \"filter\",
\"config\": {
\"stage_id\": \"feature_search\",
\"parameters\": {
\"searches\": [{
\"feature_uri\": \"mixpeek://text_embed@1.0.0/text_embed_v1_embedding\",
\"query\": \"{{INPUT.query}}\"
}]
}
}
}]
}")
RETRIEVER_ID=$(echo $RETRIEVER | jq -r '.retriever_id')
# Search
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/retrievers/$RETRIEVER_ID/execute" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{"input": {"query": "quantum computing"}}' | jq '.results[:3]'
The execute format is
{"input": {"query": "..."}} — not {"query": {"input": {...}}}.Step 7: Classify with Taxonomies
Auto-classify articles using your custom embeddings:# Create a taxonomy that uses your plugin's embeddings
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/taxonomies" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"taxonomy_name\": \"article-topics\",
\"taxonomy_type\": \"flat\",
\"retriever_id\": \"$RETRIEVER_ID\",
\"collection_id\": \"$COLLECTION_ID\",
\"input_mappings\": [{\"source\": \"payload.text\", \"target\": \"query\"}],
\"enrichment_fields\": [{\"source\": \"payload.topic\", \"target\": \"auto_topic\"}],
\"threshold\": 0.7,
\"execution_mode\": \"materialize\"
}"
auto_topic based on similarity to labeled references. See Taxonomies.
Step 8: Discover Clusters
Find topic groups in your articles:curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/clusters" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"cluster_name\": \"article-themes\",
\"collection_id\": \"$COLLECTION_ID\",
\"feature_uri\": \"mixpeek://text_embed@1.0.0/text_embed_v1_embedding\",
\"algorithm\": {\"name\": \"hdbscan\", \"params\": {\"min_cluster_size\": 3}},
\"llm_labeling\": {\"enabled\": true},
\"dimension_reduction\": {\"method\": \"umap\", \"n_components\": 2}
}"
Step 9: Set Up Alerts and Webhooks
Monitor new content and processing events:# Alert on new articles matching a condition
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/alerts" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"alert_name\": \"quantum-articles\",
\"collection_id\": \"$COLLECTION_ID\",
\"condition\": {\"field\": \"metadata.auto_topic\", \"operator\": \"eq\", \"value\": \"quantum\"},
\"notification\": {\"type\": \"webhook\", \"url\": \"https://example.com/webhook\"}
}"
# Webhook for batch processing events
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/webhooks" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"webhook_name": "pipeline-events",
"url": "https://example.com/webhook",
"events": ["batch.completed", "batch.failed"]
}'
Next Steps
- Read the full Plugin documentation for advanced features
- Add a
realtime.pyfor query-time inference (Enterprise) - Configure
compute_profilein your manifest to optimize resource allocation - Explore retriever stages for advanced search

