Prerequisites
Before you begin, you will need:
pip install mixpeek
Initialize the client:
from mixpeek import Mixpeek
client = Mixpeek(api_key="your_api_key")
Step 1: Create a Namespace
A namespace is the top-level container in a multimodal data warehouse. It is analogous to a database in SQL or a collection in Qdrant. All collections, documents, and retrieval pipelines live within a namespace.
namespace = client.namespaces.create(
namespace_name="my-warehouse",
description="Production multimodal data warehouse"
)
print(f"Namespace created: {namespace.namespace_id}")
A namespace provides isolation for your data and configurations. You might use separate namespaces for development, staging, and production, or for different business units.
Step 2: Define Collections
Collections are processing pipelines. Each collection defines how incoming objects are decomposed into features. You configure one feature extractor per collection.
Face Detection Collection
face_collection = client.collections.create(
namespace_id=namespace.namespace_id,
collection_name="faces",
description="Face detection and recognition",
feature_extractors=[{
"extractor_type": "face_detection",
"model": "arcface",
"config": {
"min_confidence": 0.8,
"embedding_dimension": 512
}
}]
)
Logo Detection Collection
logo_collection = client.collections.create(
namespace_id=namespace.namespace_id,
collection_name="logos",
description="Logo and trademark detection",
feature_extractors=[{
"extractor_type": "logo_detection",
"model": "siglip",
"config": {
"min_confidence": 0.7
}
}]
)
Audio Fingerprint Collection
audio_collection = client.collections.create(
namespace_id=namespace.namespace_id,
collection_name="audio",
description="Audio fingerprinting",
feature_extractors=[{
"extractor_type": "audio_fingerprint",
"model": "ast",
"config": {
"window_seconds": 5,
"overlap": 0.5
}
}]
)
Text Extraction Collection
text_collection = client.collections.create(
namespace_id=namespace.namespace_id,
collection_name="transcripts",
description="Speech-to-text transcription",
feature_extractors=[{
"extractor_type": "text_extraction",
"model": "whisper",
"config": {
"language": "en"
}
}]
)
Step 3: Ingest Objects
Ingestion in a multimodal data warehouse follows the pattern: upload to a bucket, which triggers collection processing. You never insert directly into the feature store.
Create a Bucket and Upload
# Create a bucket
bucket = client.buckets.create(
namespace_id=namespace.namespace_id,
bucket_name="raw-assets"
)
# Upload files
client.buckets.upload(
namespace_id=namespace.namespace_id,
bucket_name="raw-assets",
file_path="/path/to/video.mp4"
)
Configure Triggers
Triggers connect buckets to collections. When a file is uploaded to a bucket, the trigger initiates processing through the associated collection.
# Trigger face detection on video uploads
client.triggers.create(
namespace_id=namespace.namespace_id,
bucket_name="raw-assets",
collection_name="faces",
file_types=["video/mp4", "image/jpeg", "image/png"]
)
Monitor Processing
After upload, objects are processed asynchronously. Monitor batch status:
# Check processing status
batches = client.batches.list(
namespace_id=namespace.namespace_id
)
for batch in batches:
print(f"Batch {batch.batch_id}: {batch.status} ({batch.progress}%)")
Step 4: Build Retrieval Pipelines
Multi-stage retrieval pipelines are the query layer of your warehouse. They compose filter, sort, reduce, enrich, and apply stages into expressive queries.
Basic Feature Search
retriever = client.retrievers.create(
namespace_id=namespace.namespace_id,
retriever_name="face-search",
description="Search for faces by similarity",
stages=[
{
"stage_type": "filter",
"stage_id": "feature_search",
"collection": "faces",
"query_type": "embedding",
"limit": 50
}
]
)
# Execute search with an image
results = client.retrievers.execute(
namespace_id=namespace.namespace_id,
retriever_name="face-search",
query_image="/path/to/reference-face.jpg"
)
Multi-Stage Pipeline
pipeline = client.retrievers.create(
namespace_id=namespace.namespace_id,
retriever_name="ip-safety-check",
description="Full IP safety check with face, logo, and audio detection",
stages=[
# Stage 1: Search for matching faces
{
"stage_type": "filter",
"stage_id": "feature_search",
"collection": "faces",
"query_type": "embedding",
"limit": 100
},
# Stage 2: Score and rank results
{
"stage_type": "sort",
"stage_id": "score_linear",
"weights": {"similarity": 1.0}
},
# Stage 3: Deduplicate near-identical results
{
"stage_type": "reduce",
"stage_id": "sampling",
"method": "top_k",
"k": 20
},
# Stage 4: Enrich with logo detections from related collection
{
"stage_type": "enrich",
"stage_id": "document_enrich",
"source_collection": "logos",
"join_type": "semantic"
},
# Stage 5: Apply taxonomy classification
{
"stage_type": "apply",
"stage_id": "taxonomy_classify",
"taxonomy": "ip-risk-level"
}
]
)
Step 5: Apply Taxonomies
Taxonomies classify your unstructured data into structured categories. Configure them based on your use case.
Materialized Taxonomy (At Ingestion)
taxonomy = client.taxonomies.create(
namespace_id=namespace.namespace_id,
taxonomy_name="content-type",
description="Classify content by type",
mode="materialized",
categories=["sports", "news", "entertainment", "commercial", "documentary"],
collection="transcripts"
)
Every new object processed through the transcripts collection will be automatically classified into one of these categories.
On-Demand Taxonomy (At Query Time)
taxonomy = client.taxonomies.create(
namespace_id=namespace.namespace_id,
taxonomy_name="brand-sentiment",
description="Classify brand sentiment",
mode="on_demand",
categories=["positive", "neutral", "negative"]
)
On-demand taxonomies are applied at query time, making them ideal for exploratory analysis.
Retroactive Taxonomy (Over Historical Data)
taxonomy = client.taxonomies.create(
namespace_id=namespace.namespace_id,
taxonomy_name="new-category-scheme",
description="Reclassify historical data with updated categories",
mode="retroactive",
categories=["category_a", "category_b", "category_c"],
collection="faces"
)
# This will batch-process all existing documents
Step 6: Configure Storage Tiering
Storage tiering manages the lifecycle of your data across cost tiers.
# Configure lifecycle policy for a collection
client.collections.update(
namespace_id=namespace.namespace_id,
collection_name="faces",
lifecycle={
"hot_days": 30, # Keep in Qdrant for 30 days
"warm_days": 90, # Move to S3 Vectors after 30 days
"cold_days": 365, # Move to S3 after 90 days
"archive_days": 730 # Archive after 1 year
}
)
Collections in hot storage deliver sub-millisecond search. Warm storage (S3 Vectors) serves as the canonical store and supports batch retrieval. Cold and archive tiers minimize cost for data you need to retain but rarely query.
Putting It Together: IP Safety Pipeline End-to-End
Here is a complete example that builds an IP safety pipeline from scratch:
from mixpeek import Mixpeek
client = Mixpeek(api_key="your_api_key")
# 1. Create namespace
ns = client.namespaces.create(namespace_name="ip-safety-prod")
# 2. Create collections for each detection type
for extractor in ["face_detection", "logo_detection", "audio_fingerprint"]:
client.collections.create(
namespace_id=ns.namespace_id,
collection_name=extractor.replace("_", "-"),
feature_extractors=[{"extractor_type": extractor}]
)
# 3. Create bucket and triggers
client.buckets.create(namespace_id=ns.namespace_id, bucket_name="reference-assets")
for collection in ["face-detection", "logo-detection", "audio-fingerprint"]:
client.triggers.create(
namespace_id=ns.namespace_id,
bucket_name="reference-assets",
collection_name=collection
)
# 4. Upload reference assets (protected content to detect)
for asset in reference_assets:
client.buckets.upload(
namespace_id=ns.namespace_id,
bucket_name="reference-assets",
file_path=asset
)
# 5. Build retrieval pipeline for pre-publication checks
client.retrievers.create(
namespace_id=ns.namespace_id,
retriever_name="pre-pub-check",
stages=[
{"stage_type": "filter", "stage_id": "feature_search", "collection": "face-detection", "limit": 50},
{"stage_type": "sort", "stage_id": "score_linear"},
{"stage_type": "reduce", "stage_id": "sampling", "method": "top_k", "k": 10},
{"stage_type": "enrich", "stage_id": "document_enrich", "source_collection": "logo-detection"},
]
)
# 6. Check new content before publication
results = client.retrievers.execute(
namespace_id=ns.namespace_id,
retriever_name="pre-pub-check",
query_file="/path/to/new-content.mp4"
)
if results.matches:
print(f"IP conflicts detected: {len(results.matches)} matches")
for match in results.matches:
print(f" - {match.reference_id}: {match.confidence:.2f}")
else:
print("Content cleared for publication")
