Advanced Ingestion Pipeline
In this notebook, we implement an IngestionPipeline
with the following features
- MongoDB transformation caching
- Automatic vector databse insertion
- A custom transformation
Redis Cache Setup
Section titled βRedis Cache SetupβAll node + transformation combinations will have their outputs cached, which will save time on duplicate runs.
from llama_index.core.ingestion.cache import RedisCachefrom llama_index.core.ingestion import IngestionCache
ingest_cache = IngestionCache( cache=RedisCache.from_host_and_port(host="127.0.0.1", port=6379), collection="my_test_cache",)
Vector DB Setup
Section titled βVector DB SetupβFor this example, we use weaviate as a vector store.
!pip install weaviate-client
import weaviate
auth_config = weaviate.AuthApiKey(api_key="...")
client = weaviate.Client(url="https://...", auth_client_secret=auth_config)
from llama_index.vector_stores.weaviate import WeaviateVectorStore
vector_store = WeaviateVectorStore( weaviate_client=client, index_name="CachingTest")
Transformation Setup
Section titled βTransformation Setupβfrom llama_index.core.node_parser import TokenTextSplitterfrom llama_index.embeddings.huggingface import HuggingFaceEmbedding
text_splitter = TokenTextSplitter(chunk_size=512)embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
/home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html from .autonotebook import tqdm as notebook_tqdmDownloading (β¦)lve/main/config.json: 100%|ββββββββββ| 743/743 [00:00<00:00, 3.51MB/s]Downloading pytorch_model.bin: 100%|ββββββββββ| 134M/134M [00:03<00:00, 34.6MB/s]Downloading (β¦)okenizer_config.json: 100%|ββββββββββ| 366/366 [00:00<00:00, 2.20MB/s]Downloading (β¦)solve/main/vocab.txt: 100%|ββββββββββ| 232k/232k [00:00<00:00, 2.47MB/s]Downloading (β¦)/main/tokenizer.json: 100%|ββββββββββ| 711k/711k [00:00<00:00, 7.34MB/s]Downloading (β¦)cial_tokens_map.json: 100%|ββββββββββ| 125/125 [00:00<00:00, 620kB/s]
Custom Transformation
Section titled βCustom Transformationβimport refrom llama_index.core.schema import TransformComponent
class TextCleaner(TransformComponent): def __call__(self, nodes, **kwargs): for node in nodes: node.text = re.sub(r"[^0-9A-Za-z ]", "", node.text) return nodes
Running the pipeline
Section titled βRunning the pipelineβfrom llama_index.core.ingestion import IngestionPipeline
pipeline = IngestionPipeline( transformations=[ TextCleaner(), text_splitter, embed_model, TitleExtractor(), ], vector_store=vector_store, cache=ingest_cache,)
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("../data/paul_graham/").load_data()
nodes = pipeline.run(documents=documents)
Using our populated vector store
Section titled βUsing our populated vector storeβimport os
# needed for the LLM in the query engineos.environ["OPENAI_API_KEY"] = "sk-..."
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store( vector_store=vector_store, embed_model=embed_model,)
query_engine = index.as_query_engine()
print(query_engine.query("What did the author do growing up?"))
The author worked on writing and programming growing up. They wrote short stories and also tried programming on an IBM 1401 computer using an early version of Fortran.
Re-run Ingestion to test Caching
Section titled βRe-run Ingestion to test CachingβThe next code block will execute almost instantly due to caching.
pipeline = IngestionPipeline( transformations=[TextCleaner(), text_splitter, embed_model], cache=ingest_cache,)
nodes = pipeline.run(documents=documents)
Clear the cache
Section titled βClear the cacheβingest_cache.clear()