How Mixpeek runs distributed multimodal ML on Ray: architecture, patterns, and production lessons
We run 20+ ML models in parallel across video, image, and document pipelines. Here's the Ray architecture behind it -- custom resource isolation, flexible actor pools, distributed Qdrant writes, and the lessons we learned the hard way.

When you index a 10-minute video at Mixpeek, you don't run one model. You run a transcript model, a visual embedding model, a scene description model, a face detection model, an object detection model, a brand safety classifier, an IAB taxonomy tagger, and a shot boundary detector in parallel. Each has different compute requirements, different batch sizes, different GPU/CPU preferences, and different failure modes.
A single user-defined feature extractor chains several of these. A production batch job might run 15 extractors simultaneously across tens of thousands of files. Each extractor then fans out further to process individual frames, chunks, or pages in parallel before results converge into a searchable index.
We needed a distributed compute layer that could handle all of this without us building scheduling, retries, resource isolation, and fault tolerance from scratch. After evaluating Celery, Dask, and a bespoke gRPC approach, we chose Ray. This is a technical walkthrough of how we use it, the patterns we settled on, and the lessons we learned the hard way.
The architecture
We run a KubeRay cluster on GKE, deployed as a RayService custom resource. Two logical layers sit on top: Ray Serve for always-on model inference, and Ray Data for batch pipeline execution.
GKE / KubeRay
+--------------------------------------------------------+
| |
| +----------------+ +----------------------------+ |
| | Head Node | | Ray Serve Layer | |
| | (0 CPUs) |<-->| 20+ model deployments | |
| | control only | | per-model autoscaling | |
| +----------------+ +----------------------------+ |
| | |
| +-------+------+ |
| v v |
| +----------+ +----------+ |
| | CPU | | GPU | custom resource: |
| | Workers | | Workers | {"batch": 1} |
| | 1-5 | | 0-3 | isolates batch jobs |
| | pods | | pods | from Serve replicas |
| +----------+ +----------+ |
+--------------------------------------------------------+
The head node is deliberately computation-free (num-cpus: "0") -- it only handles the control plane. This is a Ray best practice we ignored until a runaway batch job starved the scheduler. CPU and GPU worker groups scale independently via KubeRay's autoscaler.
# infra/gke/rayservice.yaml (abbreviated)
apiVersion: ray.io/v1
kind: RayService
metadata:
name: mixpeek-engine-svc
spec:
rayClusterConfig:
headGroupSpec:
rayStartParams:
num-cpus: "0" # head handles control, not work
dashboard-host: "0.0.0.0"
template:
spec:
containers:
- name: ray-head
resources:
requests: { cpu: "4", memory: "32Gi" }
limits: { cpu: "8", memory: "64Gi" }
workerGroupSpecs:
- groupName: cpu-workers
minReplicas: 1
maxReplicas: 5
template:
spec:
containers:
- name: ray-worker
resources:
requests: { cpu: "7", memory: "28Gi" }
limits: { cpu: "7", memory: "56Gi" }
- groupName: gpu-workers
minReplicas: 0 # scale to zero when idle
maxReplicas: 3
template:
spec:
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: ray-worker
resources:
requests:
cpu: "4"
memory: "16Gi"
nvidia.com/gpu: "1"
Ray Serve: 20+ models, one cluster
Every inference model runs as a named Ray Serve deployment -- text embedders, CLIP variants, transcript models, classifiers. We use the declarative serveConfigV2 YAML format rather than the imperative Python API, which makes deployments GitOps-friendly and lets KubeRay manage rollouts without custom deployment scripts.
The key design decision is per-model autoscaling. A text embedding model has very different throughput and memory characteristics than a video captioning model. Treating them as a homogeneous fleet wastes GPU memory and causes head-of-line blocking:
# serveConfigV2 (excerpt)
serveConfigV2: |
applications:
# Lightweight text embedder: scale wide, low memory
- name: intfloat__multilingual_e5_large_instruct
import_path: engine.inference.intfloat.multilingual_e5_large_instruct.routes:app
deployments:
- name: MultilingualE5LargeInstructV1Deployment
autoscaling_config:
min_replicas: 2
max_replicas: 10
target_ongoing_requests: 2
upscale_delay_s: 5
downscale_delay_s: 300
ray_actor_options:
num_cpus: 0.5
num_gpus: 0
memory: 2147483648 # 2GB
max_ongoing_requests: 3
# Heavy video captioner: scale conservatively, GPU required
- name: video_captioner
import_path: engine.inference.video.caption.routes:app
deployments:
- name: VideoCaptionerDeployment
autoscaling_config:
min_replicas: 0 # scale to zero when idle
max_replicas: 2
target_ongoing_requests: 1
upscale_delay_s: 30
downscale_delay_s: 600
ray_actor_options:
num_cpus: 2
num_gpus: 0.5
memory: 8589934592 # 8GB
max_ongoing_requests: 1
target_ongoing_requests is the key lever. For high-throughput, low-latency models you target more concurrent requests per replica. For heavy models (video captioning, large-scale image encoders), you target 1 to avoid replica OOM from batched inputs piling up.
Ray Data: the extraction pipeline
Batch processing uses Ray Data with map_batches and ActorPoolStrategy. Each pipeline stage is a Python callable operating on a batch of rows.
A naive implementation runs preprocessing (S3 download, format normalization, frame extraction) once per extractor. With 10 extractors on a 1,000-file batch, that's 10,000 redundant S3 reads. We run preprocessing once, cache the result as a Ray Dataset in object store, and fan it out to all extractors:
# engine/pipelines/helpers/job_builder.py
def run_preprocessing_pipeline(
input_dataset: ray.data.Dataset,
) -> ray.data.Dataset:
# 1 locally, 56 in prod -- detected at startup via hardware_config
s3_concurrency = hardware_config.cpu_concurrency
preprocessing_steps = [
MapBatchesPipelineStep(
S3MediaResolver,
concurrency=s3_concurrency,
batch_size=8,
actor_options={"memory": 3 * 1024 * 1024 * 1024}, # 3GB
),
MapBatchesPipelineStep(
ContentPrep,
concurrency=s3_concurrency,
batch_size=16,
actor_options={"memory": 3 * 1024 * 1024 * 1024}, # 3GB
),
]
return BasePipeline(preprocessing_steps).run(input_dataset)
The 3GB memory actor reservation is production-profiled. Early on we ran without memory hints and workers were silently OOM-killed with no useful error. Ray's scheduler doesn't know your model weights are 2.4GB unless you tell it.
Flexible actor pools prevent deadlock
The original code used fixed-size actor pools:
# This deadlocks under concurrent batch jobs
compute = ray.data.ActorPoolStrategy(size=8)
With two concurrent batch jobs each requesting 8-actor pools on a 12-worker cluster, both jobs get stuck waiting for the other to release workers. Flexible pools fix it:
# engine/pipelines/steps.py
class MapBatchesPipelineStep(BasePipelineStep):
DEFAULT_POOL_MAX_SIZE = 8
def __init__(self, concurrency=None, pool_max_size=None, **kwargs):
concurrency_val = concurrency if concurrency is not None else 1
capped = min(concurrency_val, pool_max_size or self.DEFAULT_POOL_MAX_SIZE)
# min_size=1: the job can always make progress with a single worker.
# Ray fills in more workers as they become available. No deadlock.
self.compute = ray.data.ActorPoolStrategy(min_size=1, max_size=capped)
Isolating batch jobs from Serve replicas
This is the pattern we're most pleased with. Ray Serve replicas and batch pipeline tasks run on the same cluster. Without isolation, a large batch job can starve Serve replicas of workers, causing inference timeouts for live API requests.
The solution is custom Ray resources. We declare a synthetic resource called batch on worker nodes, then require it for every batch task:
# engine/pipelines/tasks.py
def _batch_resource_options() -> dict:
# Ray Serve replicas never request {"batch": 1},
# so they physically cannot land on batch-reserved slots.
return {"resources": {"batch": 1}}
@ray.remote(max_retries=3, **_batch_resource_options())
def process_feature_extractor(
extractor_request: ExtractorRequest,
input_dataset: ray.data.Dataset,
) -> None:
registry = get_inference_registry()
registry.add_packages(inference_pkg, plugins_pkg, taxonomies_pkg)
# ... run extraction steps
In the KubeRay worker spec, CPU workers expose this resource:
# CPU worker group rayStartParams
rayStartParams:
resources: '{"batch": 4}' # 4 concurrent batch tasks per CPU worker node
A massive overnight batch job can saturate all batch slots without affecting P99 latency on live inference requests. The separation cost is zero -- it's a scheduler hint with no runtime overhead.
Production patterns worth stealing
Non-blocking progress tracking with a Ray Actor
Ray Data pipelines are hard to introspect externally. Worker tasks can't push progress to an external DB without blocking the pipeline. We use a long-lived Ray actor as a shared counter that workers update fire-and-forget:
# engine/monitoring/performance/utils.py
@ray.remote
class ProgressActor:
def __init__(self, total=None, job_id=None):
self._processed = 0
self._total = total
def incr(self, n=1):
self._processed += n
return self._processed
def get_progress(self):
return {
"processed": self._processed,
"total": self._total,
"percent": (self._processed / self._total * 100) if self._total else None,
}
# Instantiate once, pass handle into map_batches workers
progress_actor = ProgressActor.remote(total=dataset_size, job_id=batch_id)
# Inside a Ray Data worker -- fire-and-forget, no blocking:
progress_actor.incr.remote(len(batch))
# From the API layer:
progress = ray.get(progress_actor.get_progress.remote())
Workers call .remote() which returns immediately. The call is queued on the actor's mailbox and executed serially -- atomic increments without locks, without blocking the pipeline.
Custom Datasink for distributed Qdrant writes
Collecting all pipeline output on one node before writing forces full materialization in memory. Ray Data's Datasink API distributes writes across all workers with built-in backpressure:
# engine/databases/qdrant/datasink.py
class QdrantDatasink(Datasink):
@property
def supports_distributed_writes(self) -> bool:
return True # any worker node can write directly to Qdrant
@property
def min_rows_per_write(self) -> int:
return self.config.batch_size # Qdrant's optimal upsert batch size
def write(self, blocks, ctx):
qdrant = QdrantBaseSync(prefer_grpc=True, ...)
for block in blocks:
rows = BlockAccessor.for_block(block).to_pydict()
# upsert with exponential backoff retry
Peak write throughput scales linearly with worker count. min_rows_per_write prevents a flood of tiny Qdrant upserts that tank performance.
The LocalStack parquet workaround
Ray Data's native S3 parquet I/O uses PyArrow's S3 filesystem under the hood. It works great against AWS S3 in production but silently hangs against LocalStack in local dev. No error, no timeout. We wrapped all parquet I/O:
# engine/utils/ray_parquet.py
def write_parquet_safe(dataset: ray.data.Dataset, path: str) -> int:
if is_localstack_env():
# PyArrow + boto3 against LocalStack -- reliable
rows = dataset.take_all()
table = pa.Table.from_pylist(rows)
with get_localstack_s3fs().open(s3_path, "wb") as f:
pq.write_table(table, f)
return len(rows)
else:
# Production: Ray Data native (distributed across workers)
dataset.write_parquet(path)
return dataset.count()
End-to-end flow
User triggers batch job on a collection
|
v
FastAPI endpoint
| schedules as Ray remote task
v
process_feature_extractor.remote() requires {"batch": 1}
|
v
read_parquet_safe() reads manifest from S3
|
v
run_preprocessing_pipeline() S3MediaResolver + ContentPrep
(runs ONCE, shared across N Ray Data map_batches
extractors in this job)
|
+------------------+---- ... ----+
v v v
extractor_1 extractor_2 extractor_N
map_batches() map_batches() map_batches()
-> Ray Serve -> Ray Serve -> Ray Serve
|
+------------------+---- ... ----+
|
v
QdrantDatasink.write() distributed writes
ProgressActor.incr() fire-and-forget
|
v
Indexed and searchable via /retrievers
What we're excited about
Ray Compiled Graphs. Our pipeline has many small coordination steps between stages -- passing dataset handles, routing between CPU and GPU workers. Each carries overhead from Ray's default scheduling path. Compiled graphs pre-compile the execution plan and remove per-call overhead. For latency-sensitive single-document requests this matters more than for bulk batch jobs.
Streaming execution in Ray Data. The current pipeline materializes intermediate datasets between stages, so peak memory scales with dataset size. Ray Data's streaming execution (now the default in Ray 2.x) runs the pipeline lazily -- blocks flow through stages without full materialization. We're migrating progressively, starting with large video batches that currently hit memory pressure on the preprocessing stage.
Finer-grained scheduling policies. Right now we have a coarse split: CPU workers for batch, GPU workers for Serve. As we add more GPU-heavy feature extractors to the batch path, we'll need finer-grained policies. Ray's placement groups and resource bundles give us the primitives -- we just haven't built the logic yet.
Anyscale for managed Ray. We self-manage KubeRay on GKE today. The operational burden is real -- node pool configuration, KubeRay version upgrades, GKE autoscaler tuning, monitoring. We're watching Anyscale's managed offering closely. The make-vs-buy math is still in favor of self-managed at our scale, but that inflection point is approaching.
Lessons, briefly
- Zero-CPU head nodes from day one. The head is for control. If it runs user code, a runaway task will take down your scheduler.
- Flexible actor pools, not fixed.
ActorPoolStrategy(min_size=1, max_size=N)instead ofsize=N. Fixed pools deadlock under concurrent jobs. - Custom resources for workload isolation. Declaring synthetic resources lets you partition cluster capacity between workload types without separate clusters.
- Always reserve memory in actor options. Ray can't infer your model's memory footprint. Explicit
memory=hints prevent silent OOM kills. - Fire-and-forget progress via actors. Don't block workers on external DB writes. A Ray actor as a shared counter is cheap and reliable.
- Environment-aware I/O wrappers. Ray Data's S3 integration has edge cases in local dev. Thin wrappers that detect the environment save hours of debugging.
If you're building on Ray and want to compare notes, or curious about how Mixpeek uses all of this to power multimodal search and extraction, we're always happy to talk.
If you want to see the output of this infrastructure in action, the retriever showcase has live multimodal search demos you can run against your own content -- no infrastructure required.
