Ingestion Pipeline
An IngestionPipeline
uses a concept of Transformations
that are applied to input data. These Transformations
are applied to your input data, and the resulting nodes are either returned or inserted into a vector database (if given). Each node+transformation pair is cached, so that subsequent runs (if the cache is persisted) with the same node+transformation combination can use the cached result and save you time.
To see an interactive example of IngestionPipeline
being put in use, check out the RAG CLI.
Usage Pattern
Section titled āUsage PatternāThe simplest usage is to instantiate an IngestionPipeline
like so:
from llama_index.core import Documentfrom llama_index.embeddings.openai import OpenAIEmbeddingfrom llama_index.core.node_parser import SentenceSplitterfrom llama_index.core.extractors import TitleExtractorfrom llama_index.core.ingestion import IngestionPipeline, IngestionCache
# create the pipeline with transformationspipeline = IngestionPipeline( transformations=[ SentenceSplitter(chunk_size=25, chunk_overlap=0), TitleExtractor(), OpenAIEmbedding(), ])
# run the pipelinenodes = pipeline.run(documents=[Document.example()])
Note that in a real-world scenario, you would get your documents from SimpleDirectoryReader
or another reader from Llama Hub.
Connecting to Vector Databases
Section titled āConnecting to Vector DatabasesāWhen running an ingestion pipeline, you can also chose to automatically insert the resulting nodes into a remote vector store.
Then, you can construct an index from that vector store later on.
from llama_index.core import Documentfrom llama_index.embeddings.openai import OpenAIEmbeddingfrom llama_index.core.node_parser import SentenceSplitterfrom llama_index.core.extractors import TitleExtractorfrom llama_index.core.ingestion import IngestionPipelinefrom llama_index.vector_stores.qdrant import QdrantVectorStore
import qdrant_client
client = qdrant_client.QdrantClient(location=":memory:")vector_store = QdrantVectorStore(client=client, collection_name="test_store")
pipeline = IngestionPipeline( transformations=[ SentenceSplitter(chunk_size=25, chunk_overlap=0), TitleExtractor(), OpenAIEmbedding(), ], vector_store=vector_store,)
# Ingest directly into a vector dbpipeline.run(documents=[Document.example()])
# Create your indexfrom llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(vector_store)
Calculating embeddings in a pipeline
Section titled āCalculating embeddings in a pipelineāNote that in the above example, embeddings are calculated as part of the pipeline. If you are connecting your pipeline to a vector store, embeddings must be a stage of your pipeline or your later instantiation of the index will fail.
You can omit embeddings from your pipeline if you are not connecting to a vector store, i.e. just producing a list of nodes.
Caching
Section titled āCachingāIn an IngestionPipeline
, each node + transformation combination is hashed and cached. This saves time on subsequent runs that use the same data.
The following sections describe some basic usage around caching.
Local Cache Management
Section titled āLocal Cache ManagementāOnce you have a pipeline, you may want to store and load the cache.
# savepipeline.persist("./pipeline_storage")
# load and restore statenew_pipeline = IngestionPipeline( transformations=[ SentenceSplitter(chunk_size=25, chunk_overlap=0), TitleExtractor(), ],)new_pipeline.load("./pipeline_storage")
# will run instantly due to the cachenodes = pipeline.run(documents=[Document.example()])
If the cache becomes too large, you can clear it
# delete all context of the cachecache.clear()
Remote Cache Management
Section titled āRemote Cache ManagementāWe support multiple remote storage backends for caches
RedisCache
MongoDBCache
FirestoreCache
Here as an example using the RedisCache
:
from llama_index.core import Documentfrom llama_index.embeddings.openai import OpenAIEmbeddingfrom llama_index.core.node_parser import SentenceSplitterfrom llama_index.core.extractors import TitleExtractorfrom llama_index.core.ingestion import IngestionPipeline, IngestionCachefrom llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
ingest_cache = IngestionCache( cache=RedisCache.from_host_and_port(host="127.0.0.1", port=6379), collection="my_test_cache",)
pipeline = IngestionPipeline( transformations=[ SentenceSplitter(chunk_size=25, chunk_overlap=0), TitleExtractor(), OpenAIEmbedding(), ], cache=ingest_cache,)
# Ingest directly into a vector dbnodes = pipeline.run(documents=[Document.example()])
Here, no persist step is needed, since everything is cached as you go in the specified remote collection.
Async Support
Section titled āAsync SupportāThe IngestionPipeline
also has support for async operation
nodes = await pipeline.arun(documents=documents)
Document Management
Section titled āDocument ManagementāAttaching a docstore
to the ingestion pipeline will enable document management.
Using the document.doc_id
or node.ref_doc_id
as a grounding point, the ingestion pipeline will actively look for duplicate documents.
It works by:
- Storing a map of
doc_id
->document_hash
- If a vector store is attached:
- If a duplicate
doc_id
is detected, and the hash has changed, the document will be re-processed and upserted - If a duplicate
doc_id
is detected and the hash is unchanged, the node is skipped
- If a duplicate
- If only a vector store is not attached:
- Checks all existing hashes for each node
- If a duplicate is found, the node is skipped
- Otherwise, the node is processed
NOTE: If we do not attach a vector store, we can only check for and remove duplicate inputs.
from llama_index.core.ingestion import IngestionPipelinefrom llama_index.core.storage.docstore import SimpleDocumentStore
pipeline = IngestionPipeline( transformations=[...], docstore=SimpleDocumentStore())
A full walkthrough is found in our demo notebook.
Also check out another guide using Redis as our entire ingestion stack.
Parallel Processing
Section titled āParallel ProcessingāThe run
method of IngestionPipeline
can be executed with parallel processes.
It does so by making use of multiprocessing.Pool
distributing batches of nodes
to across processors.
To execute with parallel processing, set num_workers
to the number of processes
youād like use:
from llama_index.core.ingestion import IngestionPipeline
pipeline = IngestionPipeline( transformations=[...],)pipeline.run(documents=[...], num_workers=4)