---
title: Distributed Ingestion Pipeline with Ray
 | Developer Documentation
---

In this notebook, we demonstrate how to execute ingestion pipelines using Ray.

```
%pip install llama-index-ingestion-ray llama-index-embeddings-huggingface
```

Start a new cluster, or connect to an existing one. See <https://docs.ray.io/en/latest/ray-core/configure.html> for details about Ray cluster configurations.

```
import ray


ray.init()
```

### Load data

For this notebook, we’ll load the `PatronusAIFinanceBenchDataset` llama-dataset from [llamahub](https://llamahub.ai).

```
!llamaindex-cli download-llamadataset PatronusAIFinanceBenchDataset --download-dir ./data
```

```
from llama_index.core import SimpleDirectoryReader


documents = SimpleDirectoryReader(input_dir="./data/source_files").load_data()
```

### Define the RayIngestionPipeline

First, we define our transformations. Each `TransformComponent` object is wrapped into a `RayTransformComponent` that encapsulates the transformation logic within stateful [Ray Actors](https://docs.ray.io/en/latest/ray-core/actors.html). All the transformation logic is performed using [Ray Data](https://docs.ray.io/en/latest/data/data.html). For more details about how to configure the hardware requirements and Actor Pool strategies, see [ray.data.Dataset.map\_batches documentation](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html).

```
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.ingestion.ray import RayTransformComponent


transformations = [
    RayTransformComponent(
        transform_class=SentenceSplitter,
        chunk_size=1024,
        chunk_overlap=20,
        map_batches_kwargs={
            "batch_size": 100,  # Batch Size
            "num_cpus": 1,  # Request 1 CPU per actor
            "compute": ray.data.ActorPoolStrategy(
                size=20
            ),  # Fixed Pool of 20 actors
        },
    ),
    RayTransformComponent(
        transform_class=HuggingFaceEmbedding,
        model_name="BAAI/bge-small-en-v1.5",
        map_batches_kwargs={
            "batch_size": 100,
            # Fractional GPU Usage
            # This tells Ray: "1 Actor needs 25% of a GPU".
            # If you have 1 physical GPU, Ray autoscales to 4 Actors.
            # If you have 4 physical GPUs, Ray autoscales to 16 Actors.
            "num_gpus": 0.25,
        },
    ),
]
```

Then, we create the ingestion pipeline.

```
from llama_index.ingestion.ray import RayIngestionPipeline


pipeline = RayIngestionPipeline(transformations=transformations)
```

### Run the Pipeline

We can finally run the pipeline with our Ray cluster.

```
nodes = pipeline.run(documents=documents)
```

```
2026-01-02 19:45:57,691  INFO logging.py:397 -- Registered dataset logger for dataset dataset_8_0
2026-01-02 19:45:57,692  INFO logging.py:405 -- dataset_8_0 registers for logging while another dataset dataset_2_0 is also logging. For performance reasons, we will not log to the dataset dataset_8_0 until it is the only active dataset.
2026-01-02 19:45:57,694  INFO streaming_executor.py:178 -- Starting execution of Dataset dataset_8_0. Full logs are in /tmp/ray/session_2026-01-02_19-32-39_779796_94512/logs/ray-data
2026-01-02 19:45:57,694  INFO streaming_executor.py:179 -- Execution plan of Dataset dataset_8_0: InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(TransformActor)] -> ActorPoolMapOperator[MapBatches(TransformActor)]
2026-01-02 19:45:58,180  WARNING resource_manager.py:761 -- Cluster resources are not enough to run any task from ActorPoolMapOperator[MapBatches(TransformActor)]. The job may hang forever unless the cluster scales up.
2026-01-02 19:45:58,296  INFO progress_bar.py:213 -- === Ray Data Progress {MapBatches(TransformActor)} ===
2026-01-02 19:45:58,297  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 20 (running=0, restarting=0, pending=20); Queued blocks: 200 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:45:58,298  INFO progress_bar.py:213 -- === Ray Data Progress {MapBatches(TransformActor)} ===
2026-01-02 19:45:58,304  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:45:58,305  INFO progress_bar.py:213 -- === Ray Data Progress {Running Dataset} ===
2026-01-02 19:45:58,305  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.0B/4.4GiB object store (pending: 20 CPU, 0.25 GPU): Progress Completed 0 / ?
[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m
2026-01-02 19:46:03,340  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 20 (running=0, restarting=0, pending=20); Queued blocks: 200 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:46:03,344  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:46:03,345  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.0B/4.4GiB object store (pending: 20 CPU, 0.25 GPU): Progress Completed 0 / ?
[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m[32m [repeated 20x across cluster][0m
2026-01-02 19:46:08,362  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 23.0MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:08,363  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 40 (23.0MiB); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:46:08,364  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 23.0MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 0 / ?
2026-01-02 19:46:13,437  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 23.0MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:13,439  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 1 (running=0, restarting=0, pending=1); Queued blocks: 40 (23.0MiB); Resources: 0.0 CPU, 0.0B object store; [all objects local]: Progress Completed 0 / ?
2026-01-02 19:46:13,440  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 23.0MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 0 / ?
[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m
2026-01-02 19:46:18,521  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 2; Actors: 2 (running=1, restarting=0, pending=1); Queued blocks: 34 (20.3MiB); Resources: 0.0 CPU, 0.2 GPU, 774.2KiB object store; [all objects local]: Progress Completed 564 / 4512
2026-01-02 19:46:18,523  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 20.8MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:18,524  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.25/1 GPU, 22.4MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 450 / 4500
[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m
2026-01-02 19:46:23,604  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 18.9MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:23,605  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 4; Actors: 3 (running=2, restarting=0, pending=1); Queued blocks: 27 (16.6MiB); Resources: 0.0 CPU, 0.5 GPU, 1.6MiB object store; [all objects local]: Progress Completed 1038 / 4613
2026-01-02 19:46:23,607  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.5/1 GPU, 20.5MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 1038 / 4613
2026-01-02 19:46:28,700  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 16.6MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:28,701  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 4; Actors: 3 (running=2, restarting=0, pending=1); Queued blocks: 23 (14.1MiB); Resources: 0.0 CPU, 0.5 GPU, 1.7MiB object store; [all objects local]: Progress Completed 1581 / 4865
2026-01-02 19:46:28,702  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.5/1 GPU, 18.3MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 1581 / 4865
[36m(MapWorker(MapBatches(TransformActor)) pid=4897)[0m [2026-01-02 19:46:31,556 E 4897 5527] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
[33m(raylet)[0m [1m[33mwarning[39m[0m[1m:[0m [1m`VIRTUAL_ENV=/home/flobacho/llama_index/llama-index-integrations/ingestion/llama-index-ingestion-ray/.venv` does not match the project environment path `.venv` and will be ignored; use `--active` to target the active environment instead[0m
2026-01-02 19:46:33,731  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 14.6MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:33,732  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 6; Actors: 4 (running=3, restarting=0, pending=1); Queued blocks: 18 (10.7MiB); Resources: 0.0 CPU, 0.8 GPU, 2.7MiB object store; [all objects local]: Progress Completed 2009 / 5022
2026-01-02 19:46:33,733  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.75/1 GPU, 17.3MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 2009 / 5022
[36m(MapWorker(MapBatches(TransformActor)) pid=6056)[0m [2026-01-02 19:46:37,795 E 6056 6092] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
2026-01-02 19:46:38,743  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 12.9MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:38,744  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 6; Actors: 4 (running=3, restarting=0, pending=1); Queued blocks: 15 (9.1MiB); Resources: 0.0 CPU, 0.8 GPU, 2.7MiB object store; [all objects local]: Progress Completed 2397 / 5046
2026-01-02 19:46:38,745  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 0.75/1 GPU, 15.6MiB/4.4GiB object store (pending: 0.25 GPU): Progress Completed 2397 / 5046
2026-01-02 19:46:43,837  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 10.2MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:43,839  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 8; Actors: 4; Queued blocks: 9 (5.5MiB); Resources: 0.0 CPU, 1.0 GPU, 3.8MiB object store; [all objects local]: Progress Completed 3023 / 5257
2026-01-02 19:46:43,839  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 14.0MiB/4.4GiB object store: Progress Completed 3023 / 5257
[36m(MapWorker(MapBatches(TransformActor)) pid=6154)[0m [2026-01-02 19:46:45,914 E 6154 6229] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
2026-01-02 19:46:48,841  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 8.0MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:48,850  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 8; Actors: 4; Queued blocks: 5 (3.1MiB); Resources: 0.0 CPU, 1.0 GPU, 3.7MiB object store; [all objects local]: Progress Completed 3523 / 5219
2026-01-02 19:46:48,851  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 11.8MiB/4.4GiB object store: Progress Completed 3523 / 5219
[36m(MapWorker(MapBatches(TransformActor)) pid=6305)[0m [2026-01-02 19:46:53,768 E 6305 6340] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
2026-01-02 19:46:53,928  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 5.5MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:53,929  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 8; Actors: 4; Queued blocks: 1 (572.2KiB); Resources: 0.0 CPU, 1.0 GPU, 3.8MiB object store; [all objects local]: Progress Completed 4105 / 5297
2026-01-02 19:46:53,930  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 9.3MiB/4.4GiB object store: Progress Completed 4105 / 5297
2026-01-02 19:46:59,019  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 0; Actors: 0; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 3.1MiB object store; [all objects local]: Progress Completed 5378 / 5378
2026-01-02 19:46:59,020  INFO progress_bar.py:215 -- MapBatches(TransformActor): Tasks: 5; Actors: 4; Queued blocks: 0 (0.0B); Resources: 0.0 CPU, 1.0 GPU, 3.8MiB object store; [all objects local]: Progress Completed 4673 / 5341
2026-01-02 19:46:59,021  INFO progress_bar.py:215 -- Running Dataset: dataset_8_0. Active & requested resources: 0/20 CPU, 1/1 GPU, 6.9MiB/4.4GiB object store: Progress Completed 4673 / 5341
2026-01-02 19:47:03,409  INFO streaming_executor.py:304 -- ✔️  Dataset dataset_8_0 execution finished in 66.02 seconds
```
