> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mixpeek.com/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Multi-Stage Retrieval

> The composable pipeline architecture that makes Mixpeek a warehouse, not a database

<Frame>
  <img src="https://mintcdn.com/mixpeek/pDBzbsnRaRIThJZv/assets/mixpeek-multi-stage.svg?fit=max&auto=format&n=pDBzbsnRaRIThJZv&q=85&s=0d2795ae26182450cd4d0d85a5cc7d43" alt="Multi-stage retrieval: query flows through filter (WHERE), sort (ORDER BY), reduce (LIMIT), enrich (JOIN), apply (SELECT) stages to produce ranked results" width="900" height="280" data-path="assets/mixpeek-multi-stage.svg" />
</Frame>

Single-query search returns a flat list of results ranked by one signal. That works for simple lookups. It falls apart the moment you need to combine signals, cross-reference collections, reshape output, or enforce business logic at query time. Multi-stage retrieval solves this by turning your search into a composable pipeline: a sequence of typed stages that filter, sort, reduce, enrich, and transform results in a single deterministic execution.

<Note>
  This is the definitive guide to multi-stage retrieval. For ready-to-copy pipeline configs, see the [Retrieval Cookbook](/retrieval/cookbook). For the full stage catalog and parameter schemas, see [Retrievers](/retrieval/retrievers).
</Note>

## Why Single-Query Search Isn't Enough

Traditional search systems give you one query, one index, one ranked list. This creates three problems that compound as your data grows:

**1. Signal collapse.** You want to find content that matches a face *and* contains a specific logo *and* has negative sentiment. A single vector query can only encode one of these signals. You end up running three separate queries and stitching results together in application code.

**2. N+1 enrichment.** After retrieving results, you need to join them with metadata from another collection, call an external API for licensing info, or classify each result against a taxonomy. Without pipeline-level enrichment, every result triggers a separate round-trip from your application.

**3. Brittle application logic.** Filtering, ranking, deduplication, and reshaping all live in your application layer. Every new use case means new glue code. Every change to ranking logic means a redeploy.

Multi-stage retrieval moves all of this into the retriever definition itself --- a declarative pipeline that the engine executes in a single pass.

## The SQL Analogy

If you know SQL, you already understand multi-stage retrieval. Each stage type maps to a SQL clause:

| Stage Type | SQL Equivalent         | What It Does                                                                                                 |
| ---------- | ---------------------- | ------------------------------------------------------------------------------------------------------------ |
| **filter** | `WHERE`                | Narrow the document set based on conditions --- semantic similarity, metadata predicates, feature thresholds |
| **sort**   | `ORDER BY`             | Reorder documents by score, attribute, or cross-encoder reranking                                            |
| **reduce** | `LIMIT` / `GROUP BY`   | Collapse results --- top-k sampling, deduplication, aggregation, summarization                               |
| **enrich** | `JOIN`                 | Add data from other collections, LLM-generated fields, or taxonomy classifications                           |
| **apply**  | `SELECT` / `TRANSFORM` | Reshape output, call external APIs, execute custom code, run web searches                                    |

A SQL query like:

```sql theme={null}
SELECT t.title, t.risk_score, r.license_type
FROM media_library t
JOIN rights_database r ON t.asset_id = r.asset_id
WHERE similarity(t.face_embedding, @query) > 0.72
  AND similarity(t.logo_embedding, @brand) > 0.6
ORDER BY t.risk_score DESC
LIMIT 10
```

Becomes a retriever pipeline:

```json theme={null}
{
  "stages": [
    {"stage_type": "filter", "stage_id": "feature_search", "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.face_embedding}}", "top_k": 100, "min_score": 0.72}], "final_top_k": 100, "fusion": "rrf"}},
    {"stage_type": "filter", "stage_id": "feature_search", "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.brand_embedding}}", "top_k": 100, "min_score": 0.6}], "final_top_k": 100, "fusion": "rrf"}},
    {"stage_type": "sort",   "stage_id": "score_linear",   "parameters": {"weights": {"risk_score": 1.0}}},
    {"stage_type": "enrich", "stage_id": "document_enrich", "parameters": {"target_collection_id": "col_rights", "source_field": "asset_id", "target_field": "asset_id"}},
    {"stage_type": "reduce", "stage_id": "sampling",        "parameters": {"limit": 10}}
  ]
}
```

The difference: this pipeline works over multimodal embeddings, not just relational columns. You can filter on face vectors, sort by sentiment scores, and enrich with LLM-generated classifications --- all in one execution.

## The Five Stage Types

### Filter --- Narrow the Candidate Set

Filter stages reduce the number of documents flowing through the pipeline. They are the `WHERE` clause of your retrieval query. Every pipeline starts with at least one filter.

**Use filter stages to:**

* Run semantic similarity search against any extracted feature
* Apply metadata predicates (equality, range, set membership)
* Chain multiple filters for compound conditions (face match AND logo match AND date range)

<CodeGroup>
  ```python Python theme={null}
  from mixpeek import Mixpeek

  client = Mixpeek(api_key="your-api-key")

  # Semantic search filter: find faces matching a reference
  face_filter = {
      "stage_type": "filter",
      "stage_id": "feature_search",
      "parameters": {
          "searches": [
              {
                  "feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding",
                  "query": "{{INPUT.face_embedding}}",
                  "top_k": 100,
                  "min_score": 0.72
              }
          ],
          "final_top_k": 100,
          "fusion": "rrf"
      }
  }

  # Metadata filter: restrict to a date range
  date_filter = {
      "stage_type": "filter",
      "stage_id": "metadata",
      "parameters": {
          "where": {
              "published_date": {"$gte": "2025-01-01"},
              "status": "published"
          }
      }
  }

  # Chain them: both conditions must pass
  retriever = client.retrievers.create(
      name="filtered-face-search",
      stages=[face_filter, date_filter]
  )
  ```

  ```bash cURL theme={null}
  curl -X POST https://api.mixpeek.com/v1/retrievers \
    -H "Authorization: Bearer $MP_API_KEY" \
    -H "Content-Type: application/json" \
    -H "X-Namespace: $MP_NAMESPACE" \
    -d '{
      "retriever_name": "filtered-face-search",
      "stages": [
        {
          "stage_type": "filter",
          "stage_id": "feature_search",
          "parameters": {
            "searches": [
              {
                "feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding",
                "query": "{{INPUT.face_embedding}}",
                "top_k": 100,
                "min_score": 0.72
              }
            ],
            "final_top_k": 100,
            "fusion": "rrf"
          }
        },
        {
          "stage_type": "filter",
          "stage_id": "metadata",
          "parameters": {
            "where": {
              "published_date": {"$gte": "2025-01-01"},
              "status": "published"
            }
          }
        }
      ]
    }'
  ```
</CodeGroup>

<Tip>
  Chain multiple filter stages to express AND logic. Each successive filter operates on the output of the previous one, progressively narrowing the candidate set.
</Tip>

### Sort --- Control Ranking

Sort stages reorder the document set without adding or removing documents. They are the `ORDER BY` clause. Place them after filters to control which results appear first.

**Use sort stages to:**

* Apply weighted linear scoring across multiple signals
* Rerank results with a cross-encoder model for higher precision
* Sort by a metadata attribute (date, price, popularity)

<CodeGroup>
  ```python Python theme={null}
  # Weighted linear scoring across three signals
  sort_stage = {
      "stage_type": "sort",
      "stage_id": "score_linear",
      "parameters": {
          "weights": {
              "audio.sentiment": 0.6,
              "recency": 0.3,
              "engagement": 0.1
          }
      }
  }

  # Cross-encoder reranking for maximum precision
  rerank_stage = {
      "stage_type": "sort",
      "stage_id": "cross_encoder_rerank",
      "parameters": {
          "inference_name": "BAAI__bge_reranker_v2_m3",
          "query": "{{INPUT.query_text}}"
      }
  }
  ```

  ```bash cURL theme={null}
  # Weighted linear scoring
  curl -X POST https://api.mixpeek.com/v1/retrievers \
    -H "Authorization: Bearer $MP_API_KEY" \
    -H "Content-Type: application/json" \
    -H "X-Namespace: $MP_NAMESPACE" \
    -d '{
      "retriever_name": "scored-results",
      "stages": [
        {
          "stage_type": "filter",
          "stage_id": "feature_search",
          "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.query}}", "top_k": 100}], "final_top_k": 100, "fusion": "rrf"}
        },
        {
          "stage_type": "sort",
          "stage_id": "score_linear",
          "parameters": {
            "weights": {
              "audio.sentiment": 0.6,
              "recency": 0.3,
              "engagement": 0.1
            }
          }
        }
      ]
    }'
  ```
</CodeGroup>

### Reduce --- Collapse and Limit

Reduce stages collapse the result set. They are the `LIMIT`, `GROUP BY`, and `DISTINCT` clauses. Use them to control result count, remove duplicates, or aggregate values.

**Use reduce stages to:**

* Sample the top-k results after sorting
* Deduplicate by a field (e.g., one result per source URL)
* Summarize results into an aggregated output

<CodeGroup>
  ```python Python theme={null}
  # Top-k sampling: keep the 10 highest-ranked results
  sampling_stage = {
      "stage_type": "reduce",
      "stage_id": "sampling",
      "parameters": {
          "limit": 10
      }
  }

  # Deduplication: one result per source domain
  dedup_stage = {
      "stage_type": "reduce",
      "stage_id": "dedup",
      "parameters": {
          "field": "metadata.source_url"
      }
  }

  # Combine: deduplicate first, then take top 10
  retriever = client.retrievers.create(
      name="deduped-top-10",
      stages=[
          {"stage_type": "filter", "stage_id": "feature_search",
           "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.image}}", "top_k": 100}], "final_top_k": 100, "fusion": "rrf"}},
          dedup_stage,
          sampling_stage
      ]
  )
  ```

  ```bash cURL theme={null}
  curl -X POST https://api.mixpeek.com/v1/retrievers \
    -H "Authorization: Bearer $MP_API_KEY" \
    -H "Content-Type: application/json" \
    -H "X-Namespace: $MP_NAMESPACE" \
    -d '{
      "retriever_name": "deduped-top-10",
      "stages": [
        {
          "stage_type": "filter",
          "stage_id": "feature_search",
          "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.image}}", "top_k": 100}], "final_top_k": 100, "fusion": "rrf"}
        },
        {
          "stage_type": "reduce",
          "stage_id": "dedup",
          "parameters": {"field": "metadata.source_url"}
        },
        {
          "stage_type": "reduce",
          "stage_id": "sampling",
          "parameters": {"limit": 10}
        }
      ]
    }'
  ```
</CodeGroup>

### Enrich --- Join External Knowledge

Enrich stages add data to each document without changing the result set size. They are the `JOIN` clause. Use them to attach metadata from other collections, generate LLM-powered annotations, or classify documents against taxonomies.

**Use enrich stages to:**

* Cross-collection joins (product data + catalog info + pricing)
* LLM enrichment (generate summaries, extract entities, assess risk)
* Taxonomy classification (label documents against a controlled vocabulary)

<CodeGroup>
  ```python Python theme={null}
  # Cross-collection join: attach rights/licensing data
  rights_enrich = {
      "stage_type": "enrich",
      "stage_id": "document_enrich",
      "parameters": {
          "target_collection_id": "col_rights_database",
          "source_field": "metadata.asset_id",
          "target_field": "asset_id",
          "fields_to_merge": ["license_type", "expiry_date", "rights_holder"],
          "output_field": "rights_info"
      }
  }

  # LLM enrichment: generate a risk assessment for each result
  llm_enrich = {
      "stage_type": "enrich",
      "stage_id": "llm_enrich",
      "parameters": {
          "prompt": "Assess the IP risk level (low/medium/high) for this content based on the match confidence score {{DOC.score}} and rights status {{DOC.rights_info.license_type}}. Return a JSON object with 'risk_level' and 'reasoning' fields.",
          "output_field": "risk_assessment",
          "model": "gpt-4o-mini"
      }
  }

  # Taxonomy classification: label by content category
  taxonomy_enrich = {
      "stage_type": "enrich",
      "stage_id": "taxonomy_enrich",
      "parameters": {
          "taxonomy_id": "tax_content_categories",
          "top_k": 5,
          "min_score": 0.5
      }
  }
  ```

  ```bash cURL theme={null}
  # Cross-collection join
  curl -X POST https://api.mixpeek.com/v1/retrievers \
    -H "Authorization: Bearer $MP_API_KEY" \
    -H "Content-Type: application/json" \
    -H "X-Namespace: $MP_NAMESPACE" \
    -d '{
      "retriever_name": "enriched-search",
      "stages": [
        {
          "stage_type": "filter",
          "stage_id": "feature_search",
          "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.audio}}", "top_k": 100, "min_score": 0.8}], "final_top_k": 100, "fusion": "rrf"}
        },
        {
          "stage_type": "enrich",
          "stage_id": "document_enrich",
          "parameters": {
            "target_collection_id": "col_rights_database",
            "source_field": "metadata.asset_id",
            "target_field": "asset_id",
            "fields_to_merge": ["license_type", "expiry_date", "rights_holder"],
            "output_field": "rights_info"
          }
        }
      ]
    }'
  ```
</CodeGroup>

<Note>
  Enrich stages execute per-document but are batched internally. A `document_enrich` join resolves all lookups in a single batch query to the target collection, not one query per document.
</Note>

### Apply --- Transform and Reshape

Apply stages transform the structure or content of each document. They are the `SELECT` and function-call layer of your pipeline. Use them to reshape output for downstream consumers, call external APIs, execute custom code, or search the web.

**Use apply stages to:**

* Reshape JSON output with Jinja2 templates
* Call external APIs (Stripe, Salesforce, internal services)
* Execute custom Python/TypeScript/JavaScript in sandboxed environments
* Run web searches to augment results with external context

<CodeGroup>
  ```python Python theme={null}
  # JSON transform: reshape output for a frontend
  json_transform = {
      "stage_type": "apply",
      "stage_id": "json_transform",
      "parameters": {
          "template": '{"id": "{{DOC.document_id}}", "title": "{{DOC.metadata.title}}", "risk": "{{DOC.risk_assessment.risk_level}}", "thumbnail": "{{DOC.metadata.thumbnail_url}}"}',
          "fail_on_error": False
      }
  }

  # External API call: check licensing status
  api_call = {
      "stage_type": "apply",
      "stage_id": "api_call",
      "parameters": {
          "url": "https://licensing.internal/v1/check/{{DOC.metadata.asset_id}}",
          "method": "GET",
          "allowed_domains": ["licensing.internal"],
          "auth": {
              "type": "bearer",
              "secret_ref": "licensing_api_key"
          },
          "output_field": "metadata.license_check",
          "on_error": "skip"
      }
  }

  # Custom code execution: compute a composite score
  code_exec = {
      "stage_type": "apply",
      "stage_id": "code_execution",
      "parameters": {
          "language": "python",
          "code": "output = {'composite_score': doc['score'] * 0.7 + doc.get('metadata', {}).get('popularity', 0) * 0.3}",
          "output_field": "computed"
      }
  }
  ```

  ```bash cURL theme={null}
  # JSON transform
  curl -X POST https://api.mixpeek.com/v1/retrievers \
    -H "Authorization: Bearer $MP_API_KEY" \
    -H "Content-Type: application/json" \
    -H "X-Namespace: $MP_NAMESPACE" \
    -d '{
      "retriever_name": "transformed-output",
      "stages": [
        {
          "stage_type": "filter",
          "stage_id": "feature_search",
          "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.query}}", "top_k": 100}], "final_top_k": 100, "fusion": "rrf"}
        },
        {
          "stage_type": "apply",
          "stage_id": "json_transform",
          "parameters": {
            "template": "{\"id\": \"{{DOC.document_id}}\", \"title\": \"{{DOC.metadata.title}}\"}",
            "fail_on_error": false
          }
        }
      ]
    }'
  ```
</CodeGroup>

## Building Multi-Stage Pipelines

The power of multi-stage retrieval is in composition. Here are three production pipelines that demonstrate how stages chain together to solve complex problems that no single query can address.

### Pipeline 1: Brand Safety Scanner

**Problem:** A media company needs to find scenes where their talent appears near competitor products in negative-sentiment content --- before the content goes live.

**Pipeline logic:** Find faces matching talent roster, then check for competitor logos in the same scenes, rank by sentiment risk, take the worst offenders, and attach brand safety context.

<CodeGroup>
  ```python Python theme={null}
  retriever = client.retrievers.create(
      name="brand-safety-scanner",
      namespace="media-library",
      stages=[
          # Stage 1: Find scenes containing talent faces
          {
              "stage_type": "filter",
              "stage_id": "feature_search",
              "parameters": {
                  "searches": [
                      {
                          "feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding",
                          "query": "{{INPUT.talent_embedding}}",
                          "top_k": 500,
                          "min_score": 0.72
                      }
                  ],
                  "final_top_k": 500,
                  "fusion": "rrf"
              }
          },
          # Stage 2: Narrow to scenes that also contain competitor logos
          {
              "stage_type": "filter",
              "stage_id": "feature_search",
              "parameters": {
                  "searches": [
                      {
                          "feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding",
                          "query": "{{INPUT.competitor_logo_embedding}}",
                          "top_k": 500,
                          "min_score": 0.65
                      }
                  ],
                  "final_top_k": 500,
                  "fusion": "rrf"
              }
          },
          # Stage 3: Rank by weighted risk (sentiment + recency + engagement)
          {
              "stage_type": "sort",
              "stage_id": "score_linear",
              "parameters": {
                  "weights": {
                      "audio.sentiment": 0.6,
                      "recency": 0.3,
                      "engagement": 0.1
                  }
              }
          },
          # Stage 4: Take the 10 highest-risk scenes
          {
              "stage_type": "reduce",
              "stage_id": "sampling",
              "parameters": {"limit": 10}
          },
          # Stage 5: Attach brand safety scores from reference collection
          {
              "stage_type": "enrich",
              "stage_id": "document_enrich",
              "parameters": {
                  "target_collection_id": "col_brand_safety_scores",
                  "source_field": "metadata.content_id",
                  "target_field": "content_id",
                  "fields_to_merge": ["safety_rating", "advertiser_category", "risk_flags"],
                  "output_field": "brand_context"
              }
          }
      ]
  )

  # Execute the pipeline
  results = client.retrievers.execute(
      retriever_id=retriever.id,
      inputs={
          "talent_embedding": celebrity_face_vector,
          "competitor_logo_embedding": competitor_logo_vector
      }
  )
  ```

  ```bash cURL theme={null}
  curl -X POST https://api.mixpeek.com/v1/retrievers \
    -H "Authorization: Bearer $MP_API_KEY" \
    -H "Content-Type: application/json" \
    -H "X-Namespace: $MP_NAMESPACE" \
    -d '{
      "retriever_name": "brand-safety-scanner",
      "stages": [
        {"stage_type": "filter", "stage_id": "feature_search", "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.talent_embedding}}", "top_k": 500, "min_score": 0.72}], "final_top_k": 500, "fusion": "rrf"}},
        {"stage_type": "filter", "stage_id": "feature_search", "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.competitor_logo_embedding}}", "top_k": 500, "min_score": 0.65}], "final_top_k": 500, "fusion": "rrf"}},
        {"stage_type": "sort", "stage_id": "score_linear", "parameters": {"weights": {"audio.sentiment": 0.6, "recency": 0.3, "engagement": 0.1}}},
        {"stage_type": "reduce", "stage_id": "sampling", "parameters": {"limit": 10}},
        {"stage_type": "enrich", "stage_id": "document_enrich", "parameters": {"target_collection_id": "col_brand_safety_scores", "source_field": "metadata.content_id", "target_field": "content_id", "fields_to_merge": ["safety_rating", "advertiser_category", "risk_flags"], "output_field": "brand_context"}}
      ]
    }'
  ```
</CodeGroup>

**Stage flow:** 500 face matches --> \~50 with competitor logos --> sorted by risk --> top 10 --> enriched with brand context

***

### Pipeline 2: IP Clearance Pipeline

**Problem:** Before publishing new content, a legal team needs to check it against a database of copyrighted material across audio fingerprints, visual similarity, and metadata --- then attach licensing information for review.

**Pipeline logic:** Match audio fingerprints, check visual similarity for the same assets, filter by rights status, sort by match confidence, and attach the full licensing record.

<CodeGroup>
  ```python Python theme={null}
  retriever = client.retrievers.create(
      name="ip-clearance-pipeline",
      namespace="rights-catalog",
      stages=[
          # Stage 1: Audio fingerprint matching against known works
          {
              "stage_type": "filter",
              "stage_id": "feature_search",
              "parameters": {
                  "searches": [
                      {
                          "feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding",
                          "query": "{{INPUT.audio_fingerprint}}",
                          "top_k": 200,
                          "min_score": 0.8
                      }
                  ],
                  "final_top_k": 200,
                  "fusion": "rrf"
              }
          },
          # Stage 2: Visual similarity check on the same content
          {
              "stage_type": "filter",
              "stage_id": "feature_search",
              "parameters": {
                  "searches": [
                      {
                          "feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding",
                          "query": "{{INPUT.visual_frames}}",
                          "top_k": 200,
                          "min_score": 0.7
                      }
                  ],
                  "final_top_k": 200,
                  "fusion": "rrf"
              }
          },
          # Stage 3: Exclude already-licensed content
          {
              "stage_type": "filter",
              "stage_id": "metadata",
              "parameters": {
                  "where": {
                      "license_status": {"$ne": "cleared"}
                  }
              }
          },
          # Stage 4: Rank by match confidence weighted with rights severity
          {
              "stage_type": "sort",
              "stage_id": "score_linear",
              "parameters": {
                  "weights": {
                      "match_confidence": 0.8,
                      "rights_severity": 0.2
                  }
              }
          },
          # Stage 5: Attach full licensing records for legal review
          {
              "stage_type": "enrich",
              "stage_id": "document_enrich",
              "parameters": {
                  "target_collection_id": "col_licensing_records",
                  "source_field": "metadata.rights_id",
                  "target_field": "rights_id",
                  "fields_to_merge": ["rights_holder", "license_type", "territory", "expiry_date", "contact_email"],
                  "output_field": "licensing"
              }
          },
          # Stage 6: LLM-generated risk summary for each match
          {
              "stage_type": "enrich",
              "stage_id": "llm_enrich",
              "parameters": {
                  "prompt": "Based on the match confidence ({{DOC.score}}) and licensing status ({{DOC.licensing.license_type}}), provide a one-sentence risk assessment and recommended action (clear/review/block).",
                  "output_field": "legal_summary",
                  "model": "gpt-4o-mini"
              }
          }
      ]
  )
  ```

  ```bash cURL theme={null}
  curl -X POST https://api.mixpeek.com/v1/retrievers \
    -H "Authorization: Bearer $MP_API_KEY" \
    -H "Content-Type: application/json" \
    -H "X-Namespace: $MP_NAMESPACE" \
    -d '{
      "retriever_name": "ip-clearance-pipeline",
      "stages": [
        {"stage_type": "filter", "stage_id": "feature_search", "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.audio_fingerprint}}", "top_k": 200, "min_score": 0.8}], "final_top_k": 200, "fusion": "rrf"}},
        {"stage_type": "filter", "stage_id": "feature_search", "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.visual_frames}}", "top_k": 200, "min_score": 0.7}], "final_top_k": 200, "fusion": "rrf"}},
        {"stage_type": "filter", "stage_id": "metadata", "parameters": {"where": {"license_status": {"$ne": "cleared"}}}},
        {"stage_type": "sort", "stage_id": "score_linear", "parameters": {"weights": {"match_confidence": 0.8, "rights_severity": 0.2}}},
        {"stage_type": "enrich", "stage_id": "document_enrich", "parameters": {"target_collection_id": "col_licensing_records", "source_field": "metadata.rights_id", "target_field": "rights_id", "fields_to_merge": ["rights_holder", "license_type", "territory", "expiry_date", "contact_email"], "output_field": "licensing"}},
        {"stage_type": "enrich", "stage_id": "llm_enrich", "parameters": {"prompt": "Based on the match confidence ({{DOC.score}}) and licensing status ({{DOC.licensing.license_type}}), provide a one-sentence risk assessment and recommended action (clear/review/block).", "output_field": "legal_summary", "model": "gpt-4o-mini"}}
      ]
    }'
  ```
</CodeGroup>

**Stage flow:** 200 audio matches --> \~30 with visual matches --> exclude cleared --> sorted by risk --> licensing data attached --> LLM risk summary generated

***

### Pipeline 3: Content Moderation

**Problem:** A platform needs to scan user-uploaded content across multiple safety dimensions (NSFW, violence, toxicity), aggregate risk scores, and route flagged content to a moderation queue.

**Pipeline logic:** Filter for NSFW content above threshold, check text toxicity, sort by combined risk, take the worst offenders, classify against a moderation taxonomy, and push to the review queue.

<CodeGroup>
  ```python Python theme={null}
  retriever = client.retrievers.create(
      name="content-moderation",
      namespace="user-uploads",
      stages=[
          # Stage 1: Flag visually unsafe content
          {
              "stage_type": "filter",
              "stage_id": "feature_search",
              "parameters": {
                  "searches": [
                      {
                          "feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding",
                          "query": "{{INPUT.unsafe_reference}}",
                          "top_k": 1000,
                          "min_score": 0.6
                      }
                  ],
                  "final_top_k": 1000,
                  "fusion": "rrf"
              }
          },
          # Stage 2: Check text-based toxicity in the same content
          {
              "stage_type": "filter",
              "stage_id": "feature_search",
              "parameters": {
                  "searches": [
                      {
                          "feature_uri": "mixpeek://text_extractor@v1/multilingual_e5_large_instruct_v1",
                          "query": {"input_mode": "text", "value": "{{INPUT.toxic_terms}}"},
                          "top_k": 1000,
                          "min_score": 0.5
                      }
                  ],
                  "final_top_k": 1000,
                  "fusion": "rrf"
              }
          },
          # Stage 3: Aggregate risk signals into a combined score
          {
              "stage_type": "sort",
              "stage_id": "score_linear",
              "parameters": {
                  "weights": {
                      "nsfw_score": 0.4,
                      "violence_score": 0.3,
                      "toxicity_score": 0.3
                  }
              }
          },
          # Stage 4: Take the top 50 highest-risk items
          {
              "stage_type": "reduce",
              "stage_id": "sampling",
              "parameters": {"limit": 50}
          },
          # Stage 5: Classify against moderation taxonomy
          {
              "stage_type": "enrich",
              "stage_id": "taxonomy_enrich",
              "parameters": {
                  "taxonomy_id": "tax_moderation_categories",
                  "top_k": 5,
                  "min_score": 0.5
              }
          },
          # Stage 6: Push to external moderation queue
          {
              "stage_type": "apply",
              "stage_id": "api_call",
              "parameters": {
                  "url": "https://moderation.internal/v1/review-queue",
                  "method": "POST",
                  "allowed_domains": ["moderation.internal"],
                  "auth": {
                      "type": "bearer",
                      "secret_ref": "moderation_api_key"
                  },
                  "output_field": "metadata.review_ticket",
                  "on_error": "skip"
              }
          }
      ]
  )
  ```

  ```bash cURL theme={null}
  curl -X POST https://api.mixpeek.com/v1/retrievers \
    -H "Authorization: Bearer $MP_API_KEY" \
    -H "Content-Type: application/json" \
    -H "X-Namespace: $MP_NAMESPACE" \
    -d '{
      "retriever_name": "content-moderation",
      "stages": [
        {"stage_type": "filter", "stage_id": "feature_search", "parameters": {"searches": [{"feature_uri": "mixpeek://multimodal_extractor@v1/vertex_multimodal_embedding", "query": "{{INPUT.unsafe_reference}}", "top_k": 1000, "min_score": 0.6}], "final_top_k": 1000, "fusion": "rrf"}},
        {"stage_type": "filter", "stage_id": "feature_search", "parameters": {"searches": [{"feature_uri": "mixpeek://text_extractor@v1/multilingual_e5_large_instruct_v1", "query": {"input_mode": "text", "value": "{{INPUT.toxic_terms}}"}, "top_k": 1000, "min_score": 0.5}], "final_top_k": 1000, "fusion": "rrf"}},
        {"stage_type": "sort", "stage_id": "score_linear", "parameters": {"weights": {"nsfw_score": 0.4, "violence_score": 0.3, "toxicity_score": 0.3}}},
        {"stage_type": "reduce", "stage_id": "sampling", "parameters": {"limit": 50}},
        {"stage_type": "enrich", "stage_id": "taxonomy_enrich", "parameters": {"taxonomy_id": "tax_moderation_categories", "top_k": 5, "min_score": 0.5}},
        {"stage_type": "apply", "stage_id": "api_call", "parameters": {"url": "https://moderation.internal/v1/review-queue", "method": "POST", "allowed_domains": ["moderation.internal"], "auth": {"type": "bearer", "secret_ref": "moderation_api_key"}, "output_field": "metadata.review_ticket", "on_error": "skip"}}
      ]
    }'
  ```
</CodeGroup>

**Stage flow:** 1000 NSFW candidates --> \~200 also toxic --> sorted by combined risk --> top 50 --> taxonomy labels attached --> pushed to moderation queue

## Performance Characteristics

Multi-stage pipelines avoid the N+1 problem that plagues application-level orchestration. Here is how:

**1. Filter stages execute server-side against indexes.** A `feature_search` filter runs directly against the MVS vector index. No data leaves the engine until the candidate set is narrowed. Chaining two filter stages does not mean two round-trips from your application --- both execute within the engine in sequence.

**2. Enrich stages batch internally.** A `document_enrich` join across 50 results resolves in a single batch query to the target collection, not 50 separate lookups. LLM enrichment stages batch prompts where possible.

**3. Reduce stages shrink the working set early.** Place a `sampling` or `dedup` stage as early as possible to minimize the number of documents flowing through expensive downstream stages (LLM enrichment, API calls).

**4. The pipeline streams, not materializes.** Documents flow through stages incrementally. A 6-stage pipeline does not create 6 intermediate copies of the full result set. Each stage processes and passes documents forward.

<Warning>
  Stage ordering matters for performance. Place cheap, high-selectivity filters first (metadata filters, feature searches with high thresholds) and expensive stages last (LLM enrichment, external API calls). A pipeline that enriches 1000 documents and then filters to 10 is dramatically slower than one that filters to 10 and then enriches.
</Warning>

## When to Use Which Stage Type

Use this decision guide when designing your pipeline:

<AccordionGroup>
  <Accordion title="I need to narrow down results based on content or metadata">
    Use a **filter** stage. Start with `feature_search` for semantic/vector-based filtering, or `metadata` for structured attribute filtering. Chain multiple filters for compound conditions.
  </Accordion>

  <Accordion title="I need to reorder results by relevance or business logic">
    Use a **sort** stage. Choose `score_linear` for weighted multi-signal ranking, `cross_encoder_rerank` for high-precision reranking with a cross-encoder model, or `attribute_sort` for simple field-based ordering.
  </Accordion>

  <Accordion title="I need fewer results, or deduplicated results">
    Use a **reduce** stage. Choose `sampling` for top-k limits, `dedup` for deduplication by field, or `summarize` for LLM-powered aggregation of results into a single summary.
  </Accordion>

  <Accordion title="I need to add data from another collection, an LLM, or a taxonomy">
    Use an **enrich** stage. Choose `document_enrich` for cross-collection joins, `llm_enrich` for AI-generated fields, or `taxonomy_enrich` for classification against a controlled vocabulary.
  </Accordion>

  <Accordion title="I need to reshape output, call an API, or run custom logic">
    Use an **apply** stage. Choose `json_transform` for output reshaping, `api_call` for external service integration, `code_execution` for custom Python/TypeScript/JavaScript, or `external_web_search` for web augmentation.
  </Accordion>
</AccordionGroup>

### Stage Ordering Rules of Thumb

1. **Filter first.** Every pipeline should start with one or more filter stages to narrow the candidate set.
2. **Sort second.** Apply ranking after filtering so you are sorting a smaller set.
3. **Reduce third.** Cut the result set to a manageable size before enrichment.
4. **Enrich fourth.** Add external data only to the documents that survived filtering, sorting, and reduction.
5. **Apply last.** Reshape output and trigger side effects at the end of the pipeline.

<Note>
  These are guidelines, not hard rules. Some pipelines benefit from enriching before sorting (e.g., sort by a field that only exists after enrichment). Design your pipeline around your data flow, not a rigid template.
</Note>

## Related Resources

<CardGroup cols={2}>
  <Card title="Retrievers" icon="magnifying-glass" href="/retrieval/retrievers">
    Full stage catalog, parameter schemas, and retriever configuration reference
  </Card>

  <Card title="Retrieval Cookbook" icon="book" href="/retrieval/cookbook">
    Ready-to-copy pipeline configurations for common use cases
  </Card>

  <Card title="Stage Reference" icon="layer-group" href="/retrieval/retrievers#stage-catalog">
    Detailed documentation for every stage type and stage ID
  </Card>

  <Card title="Caching" icon="bolt" href="/best-practices/caching-strategies">
    Configure retriever-level caching for repeated queries
  </Card>
</CardGroup>
