Distributed Ingestion Pipeline with Ray
In this notebook, we demonstrate how to execute ingestion pipelines using Ray.
%pip install llama-index-ingestion-ray llama-index-embeddings-huggingfaceStart a new cluster, or connect to an existing one. See https://docs.ray.io/en/latest/ray-core/configure.html for details about Ray cluster configurations.
import ray
ray.init()Load data
Section titled “Load data”For this notebook, we’ll load the PatronusAIFinanceBenchDataset llama-dataset from llamahub.
!llamaindex-cli download-llamadataset PatronusAIFinanceBenchDataset --download-dir ./datafrom llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader(input_dir="./data/source_files").load_data()Define the RayIngestionPipeline
Section titled “Define the RayIngestionPipeline”First, we define our transformations. Each TransformComponent object is wrapped into a RayTransformComponent that encapsulates the transformation logic within stateful Ray Actors. All the transformation logic is performed using Ray Data. For more details about how to configure the hardware requirements and Actor Pool strategies, see ray.data.Dataset.map_batches documentation.
from llama_index.embeddings.huggingface import HuggingFaceEmbeddingfrom llama_index.core.node_parser import SentenceSplitterfrom llama_index.ingestion.ray import RayTransformComponent
transformations = [ RayTransformComponent( transform_class=SentenceSplitter, chunk_size=1024, chunk_overlap=20, map_batches_kwargs={ "batch_size": 100, # Batch Size "num_cpus": 1, # Request 1 CPU per actor "compute": ray.data.ActorPoolStrategy( size=20 ), # Fixed Pool of 20 actors }, ), RayTransformComponent( transform_class=HuggingFaceEmbedding, model_name="BAAI/bge-small-en-v1.5", map_batches_kwargs={ "batch_size": 100, # Fractional GPU Usage # This tells Ray: "1 Actor needs 25% of a GPU". # If you have 1 physical GPU, Ray autoscales to 4 Actors. # If you have 4 physical GPUs, Ray autoscales to 16 Actors. "num_gpus": 0.25, }, ),]Then, we create the ingestion pipeline.
from llama_index.ingestion.ray import RayIngestionPipeline
pipeline = RayIngestionPipeline(transformations=transformations)Run the Pipeline
Section titled “Run the Pipeline”We can finally run the pipeline with our Ray cluster.
nodes = pipeline.run(documents=documents)2026-01-02 19:45:57,691 INFO logging.py:397 -- Registered dataset logger for dataset dataset_8_02026-01-02 19:45:57,692 INFO logging.py:405 -- dataset_8_0 registers for logging while another dataset dataset_2_0 is also logging. For performance reasons, we will not log to the dataset dataset_8_0 until it is the only active dataset.2026-01-02 19:45:57,694 INFO streaming_executor.py:178 -- Starting execution of Dataset dataset_8_0. Full logs are in /tmp/ray/session_2026-01-02_19-32-39_779796_94512/logs/ray-data2026-01-02 19:45:57,694 INFO streaming_executor.py:179 -- Execution plan of Dataset dataset_8_0: InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(TransformActor)] -> ActorPoolMapOperator[MapBatches(TransformActor)]2026-01-02 19:45:58,180 WARNING resource_manager.py:761 -- Cluster resources are not enough to run any task from ActorPoolMapOperator[MapBatches(TransformActor)]. The job may hang forever unless the cluster scales up.2026-01-02 19:45:58,296 INFO progress_bar.py:213 -- === Ray Data Progress {MapBatches(TransformActor)} ===2026-01-02 19:45:58,297 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 20 (running=0, restarting=0, pending=20); Queued blocks: 200 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?2026-01-02 19:45:58,298 INFO progress_bar.py:213 -- === Ray Data Progress {MapBatches(TransformActor)} ===2026-01-02 19:45:58,304 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?2026-01-02 19:45:58,305 INFO progress_bar.py:213 -- === Ray Data Progress {Running Dataset} ===2026-01-02 19:45:58,305 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.0B/4.4GiB object store (pending: 20 CPU, 0.25 GPU): Progress Completed 0 / ?[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m2026-01-02 19:46:03,340 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 20 (running=0, restarting=0, pending=20); Queued blocks: 200 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?2026-01-02 19:46:03,344 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?2026-01-02 19:46:03,345 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.0B/4.4GiB object store (pending: 20 CPU, 0.25 GPU): Progress Completed 0 / ?[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m[32m [repeated 20x across cluster][0m2026-01-02 19:46:08,362 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 23.0MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:08,363 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 40 (23.0MiB); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?2026-01-02 19:46:08,364 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 23.0MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 0 / ?2026-01-02 19:46:13,437 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 23.0MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:13,439 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 40 (23.0MiB); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?2026-01-02 19:46:13,440 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 23.0MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 0 / ?[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m2026-01-02 19:46:18,521 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 2; Actors: 2 (running=1, restarting=0, pending=1); Queued blocks: 34 (20.3MiB); Resources: 0.0 CPU, 0.2 GPU, 774.2KiB object store; [all objects local]: Progress Completed 564 / 45122026-01-02 19:46:18,523 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 20.8MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:18,524 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.25/1 GPU, 22.4MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 450 / 4500[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m2026-01-02 19:46:23,604 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 18.9MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:23,605 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 4; Actors: 3 (running=2, restarting=0, pending=1); Queued blocks: 27 (16.6MiB); Resources: 0.0 CPU, 0.5 GPU, 1.6MiB object store; [all objects local]: Progress Completed 1038 / 46132026-01-02 19:46:23,607 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.5/1 GPU, 20.5MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 1038 / 46132026-01-02 19:46:28,700 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 16.6MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:28,701 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 4; Actors: 3 (running=2, restarting=0, pending=1); Queued blocks: 23 (14.1MiB); Resources: 0.0 CPU, 0.5 GPU, 1.7MiB object store; [all objects local]: Progress Completed 1581 / 48652026-01-02 19:46:28,702 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.5/1 GPU, 18.3MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 1581 / 4865[36m(MapWorker(MapBatches(TransformActor)) pid=4897)[0m [2026-01-02 19:46:31,556 E 4897 5527] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m2026-01-02 19:46:33,731 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 14.6MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:33,732 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 6; Actors: 4 (running=3, restarting=0, pending=1); Queued blocks: 18 (10.7MiB); Resources: 0.0 CPU, 0.8 GPU, 2.7MiB object store; [all objects local]: Progress Completed 2009 / 50222026-01-02 19:46:33,733 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.75/1 GPU, 17.3MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 2009 / 5022[36m(MapWorker(MapBatches(TransformActor)) pid=6056)[0m [2026-01-02 19:46:37,795 E 6056 6092] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 142026-01-02 19:46:38,743 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 12.9MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:38,744 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 6; Actors: 4 (running=3, restarting=0, pending=1); Queued blocks: 15 (9.1MiB); Resources: 0.0 CPU, 0.8 GPU, 2.7MiB object store; [all objects local]: Progress Completed 2397 / 50462026-01-02 19:46:38,745 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.75/1 GPU, 15.6MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 2397 / 50462026-01-02 19:46:43,837 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 10.2MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:43,839 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 8; Actors: 4; Queued blocks: 9 (5.5MiB); Resources: 0.0 CPU, 1.0 GPU, 3.8MiB object store; [all objects local]: Progress Completed 3023 / 52572026-01-02 19:46:43,839 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 14.0MiB/4.4GiB object store: Progress Completed 3023 / 5257[36m(MapWorker(MapBatches(TransformActor)) pid=6154)[0m [2026-01-02 19:46:45,914 E 6154 6229] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 142026-01-02 19:46:48,841 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 8.0MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:48,850 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 8; Actors: 4; Queued blocks: 5 (3.1MiB); Resources: 0.0 CPU, 1.0 GPU, 3.7MiB object store; [all objects local]: Progress Completed 3523 / 52192026-01-02 19:46:48,851 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 11.8MiB/4.4GiB object store: Progress Completed 3523 / 5219[36m(MapWorker(MapBatches(TransformActor)) pid=6305)[0m [2026-01-02 19:46:53,768 E 6305 6340] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 142026-01-02 19:46:53,928 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 5.5MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:53,929 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 8; Actors: 4; Queued blocks: 1 (572.2KiB); Resources: 0.0 CPU, 1.0 GPU, 3.8MiB object store; [all objects local]: Progress Completed 4105 / 52972026-01-02 19:46:53,930 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 9.3MiB/4.4GiB object store: Progress Completed 4105 / 52972026-01-02 19:46:59,019 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 3.1MiB object store; [all objects local]: Progress Completed 5378 / 53782026-01-02 19:46:59,020 INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 5; Actors: 4; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 1.0 GPU, 3.8MiB object store; [all objects local]: Progress Completed 4673 / 53412026-01-02 19:46:59,021 INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 6.9MiB/4.4GiB object store: Progress Completed 4673 / 53412026-01-02 19:47:03,409 INFO streaming_executor.py:304 -- ✔️ Dataset dataset_8_0 execution finished in 66.02 seconds