Skip to content

Distributed Ingestion Pipeline with Ray

In this notebook, we demonstrate how to execute ingestion pipelines using Ray.

%pip install llama-index-ingestion-ray llama-index-embeddings-huggingface

Start a new cluster, or connect to an existing one. See https://docs.ray.io/en/latest/ray-core/configure.html for details about Ray cluster configurations.

import ray
ray.init()

For this notebook, we’ll load the PatronusAIFinanceBenchDataset llama-dataset from llamahub.

!llamaindex-cli download-llamadataset PatronusAIFinanceBenchDataset --download-dir ./data
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader(input_dir="./data/source_files").load_data()

First, we define our transformations. Each TransformComponent object is wrapped into a RayTransformComponent that encapsulates the transformation logic within stateful Ray Actors. All the transformation logic is performed using Ray Data. For more details about how to configure the hardware requirements and Actor Pool strategies, see ray.data.Dataset.map_batches documentation.

from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.ingestion.ray import RayTransformComponent
transformations = [
RayTransformComponent(
transform_class=SentenceSplitter,
chunk_size=1024,
chunk_overlap=20,
map_batches_kwargs={
"batch_size": 100, # Batch Size
"num_cpus": 1, # Request 1 CPU per actor
"compute": ray.data.ActorPoolStrategy(
size=20
), # Fixed Pool of 20 actors
},
),
RayTransformComponent(
transform_class=HuggingFaceEmbedding,
model_name="BAAI/bge-small-en-v1.5",
map_batches_kwargs={
"batch_size": 100,
# Fractional GPU Usage
# This tells Ray: "1 Actor needs 25% of a GPU".
# If you have 1 physical GPU, Ray autoscales to 4 Actors.
# If you have 4 physical GPUs, Ray autoscales to 16 Actors.
"num_gpus": 0.25,
},
),
]

Then, we create the ingestion pipeline.

from llama_index.ingestion.ray import RayIngestionPipeline
pipeline = RayIngestionPipeline(transformations=transformations)

We can finally run the pipeline with our Ray cluster.

nodes = pipeline.run(documents=documents)
2026-01-02 19:45:57,691 INFO logging.py:397 -- Registered dataset logger for dataset dataset_8_0
2026-01-02 19:45:57,692 INFO logging.py:405 -- dataset_8_0 registers for logging while another dataset dataset_2_0 is also logging. For performance reasons, we will not log to the dataset dataset_8_0 until it is the only active dataset.
2026-01-02 19:45:57,694 INFO streaming_executor.py:178 -- Starting execution of Dataset dataset_8_0. Full logs are in /tmp/ray/session_2026-01-02_19-32-39_779796_94512/logs/ray-data
2026-01-02 19:45:57,694 INFO streaming_executor.py:179 -- Execution plan of Dataset dataset_8_0: InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(TransformActor)] -> ActorPoolMapOperator[MapBatches(TransformActor)]
2026-01-02 19:45:58,180 WARNING resource_manager.py:761 -- Cluster resources are not enough to run any task from ActorPoolMapOperator[MapBatches(TransformActor)]. The job may hang forever unless the cluster scales up.
2026-01-02 19:45:58,296 INFO progress_bar.py:213 -- === Ray Data Progress {MapBatches(TransformActor)} ===
2026-01-02 19:45:58,297 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 20 (running=0, restarting=0, pending=20); Queued blocks: 200 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:45:58,298 INFO progress_bar.py:213 -- === Ray Data Progress {MapBatches(TransformActor)} ===
2026-01-02 19:45:58,304 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:45:58,305 INFO progress_bar.py:213 -- === Ray Data Progress {Running Dataset} ===
2026-01-02 19:45:58,305 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.0B/4.4GiB object store (pending: 20 CPU, 0.25 GPU): Progress Completed 0 / ?
(raylet) warning: `VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
2026-01-02 19:46:03,340 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 20 (running=0, restarting=0, pending=20); Queued blocks: 200 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:46:03,344 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:46:03,345 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.0B/4.4GiB object store (pending: 20 CPU, 0.25 GPU): Progress Completed 0 / ?
(raylet) warning: `VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead [repeated 20x across cluster]
2026-01-02 19:46:08,362 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 23.0MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:08,363 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 40 (23.0MiB); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:46:08,364 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 23.0MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 0 / ?
2026-01-02 19:46:13,437 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 23.0MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:13,439 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 40 (23.0MiB); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:46:13,440 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 23.0MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 0 / ?
(raylet) warning: `VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
2026-01-02 19:46:18,521 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 2; Actors: 2 (running=1, restarting=0, pending=1); Queued blocks: 34 (20.3MiB); Resources: 0.0 CPU, 0.2 GPU, 774.2KiB object store; [all objects local]: Progress Completed 564 / 4512
2026-01-02 19:46:18,523 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 20.8MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:18,524 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.25/1 GPU, 22.4MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 450 / 4500
(raylet) warning: `VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
2026-01-02 19:46:23,604 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 18.9MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:23,605 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 4; Actors: 3 (running=2, restarting=0, pending=1); Queued blocks: 27 (16.6MiB); Resources: 0.0 CPU, 0.5 GPU, 1.6MiB object store; [all objects local]: Progress Completed 1038 / 4613
2026-01-02 19:46:23,607 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.5/1 GPU, 20.5MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 1038 / 4613
2026-01-02 19:46:28,700 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 16.6MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:28,701 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 4; Actors: 3 (running=2, restarting=0, pending=1); Queued blocks: 23 (14.1MiB); Resources: 0.0 CPU, 0.5 GPU, 1.7MiB object store; [all objects local]: Progress Completed 1581 / 4865
2026-01-02 19:46:28,702 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.5/1 GPU, 18.3MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 1581 / 4865
(MapWorker(MapBatches(TransformActor)) pid=4897) [2026-01-02 19:46:31,556 E 4897 5527] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
(raylet) warning: `VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead
2026-01-02 19:46:33,731 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 14.6MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:33,732 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 6; Actors: 4 (running=3, restarting=0, pending=1); Queued blocks: 18 (10.7MiB); Resources: 0.0 CPU, 0.8 GPU, 2.7MiB object store; [all objects local]: Progress Completed 2009 / 5022
2026-01-02 19:46:33,733 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.75/1 GPU, 17.3MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 2009 / 5022
(MapWorker(MapBatches(TransformActor)) pid=6056) [2026-01-02 19:46:37,795 E 6056 6092] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
2026-01-02 19:46:38,743 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 12.9MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:38,744 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 6; Actors: 4 (running=3, restarting=0, pending=1); Queued blocks: 15 (9.1MiB); Resources: 0.0 CPU, 0.8 GPU, 2.7MiB object store; [all objects local]: Progress Completed 2397 / 5046
2026-01-02 19:46:38,745 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.75/1 GPU, 15.6MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 2397 / 5046
2026-01-02 19:46:43,837 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 10.2MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:43,839 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 8; Actors: 4; Queued blocks: 9 (5.5MiB); Resources: 0.0 CPU, 1.0 GPU, 3.8MiB object store; [all objects local]: Progress Completed 3023 / 5257
2026-01-02 19:46:43,839 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 14.0MiB/4.4GiB object store: Progress Completed 3023 / 5257
(MapWorker(MapBatches(TransformActor)) pid=6154) [2026-01-02 19:46:45,914 E 6154 6229] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
2026-01-02 19:46:48,841 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 8.0MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:48,850 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 8; Actors: 4; Queued blocks: 5 (3.1MiB); Resources: 0.0 CPU, 1.0 GPU, 3.7MiB object store; [all objects local]: Progress Completed 3523 / 5219
2026-01-02 19:46:48,851 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 11.8MiB/4.4GiB object store: Progress Completed 3523 / 5219
(MapWorker(MapBatches(TransformActor)) pid=6305) [2026-01-02 19:46:53,768 E 6305 6340] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
2026-01-02 19:46:53,928 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 5.5MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:53,929 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 8; Actors: 4; Queued blocks: 1 (572.2KiB); Resources: 0.0 CPU, 1.0 GPU, 3.8MiB object store; [all objects local]: Progress Completed 4105 / 5297
2026-01-02 19:46:53,930 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 9.3MiB/4.4GiB object store: Progress Completed 4105 / 5297
2026-01-02 19:46:59,019 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 3.1MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:59,020 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 5; Actors: 4; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 1.0 GPU, 3.8MiB object store; [all objects local]: Progress Completed 4673 / 5341
2026-01-02 19:46:59,021 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 6.9MiB/4.4GiB object store: Progress Completed 4673 / 5341
2026-01-02 19:47:03,409 INFO streaming_executor.py:304 -- ✔️ Dataset dataset_8_0 execution finished in 66.02 seconds