---
title: Redis Ingestion Pipeline
 | Developer Documentation
---

This walkthrough shows how to use Redis for both the vector store, cache, and docstore in an Ingestion Pipeline.

## Dependencies

Install and start redis, setup OpenAI API key

```
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
```

```
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
```

```
338c889086e8649aa80dfb79ebff4fffc98d72fc6d988ac158c6662e9e0cf04b
```

```
import os


os.environ["OPENAI_API_KEY"] = "sk-..."
os.environ["TOKENIZERS_PARALLELISM"] = "false"
```

## Create Seed Data

```
# Make some test data
!rm -rf test_redis_data
!mkdir -p test_redis_data
!echo "This is a test file: one!" > test_redis_data/test1.txt
!echo "This is a test file: two!" > test_redis_data/test2.txt
```

```
from llama_index.core import SimpleDirectoryReader


# load documents with deterministic IDs
documents = SimpleDirectoryReader(
    "./test_redis_data", filename_as_id=True
).load_data()
```

## Run the Redis-Based Ingestion Pipeline

With a vector store attached, the pipeline will handle upserting data into your vector store.

However, if you only want to handle duplcates, you can change the strategy to `DUPLICATES_ONLY`.

```
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import (
    DocstoreStrategy,
    IngestionPipeline,
    IngestionCache,
)
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.redis import RedisVectorStore


from redisvl.schema import IndexSchema




embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")


custom_schema = IndexSchema.from_dict(
    {
        "index": {"name": "redis_vector_store", "prefix": "doc"},
        # customize fields that are indexed
        "fields": [
            # required fields for llamaindex
            {"type": "tag", "name": "id"},
            {"type": "tag", "name": "doc_id"},
            {"type": "text", "name": "text"},
            # custom vector field for bge-small-en-v1.5 embeddings
            {
                "type": "vector",
                "name": "vector",
                "attrs": {
                    "dims": 384,
                    "algorithm": "hnsw",
                    "distance_metric": "cosine",
                },
            },
        ],
    }
)
```

```
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(),
        embed_model,
    ],
    docstore=RedisDocumentStore.from_host_and_port(
        "localhost", 6379, namespace="document_store"
    ),
    vector_store=RedisVectorStore(
        schema=custom_schema,
        redis_url="redis://localhost:6379",
    ),
    cache=IngestionCache(
        cache=RedisCache.from_host_and_port("localhost", 6379),
        collection="redis_cache",
    ),
    docstore_strategy=DocstoreStrategy.UPSERTS,
)
```

```
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
```

```
Ingested 2 Nodes
```

## Confirm documents are ingested

We can create a vector index using our vector store, and quickly ask which documents are seen.

```
from llama_index.core import VectorStoreIndex


index = VectorStoreIndex.from_vector_store(
    pipeline.vector_store, embed_model=embed_model
)
```

```
print(
    index.as_query_engine(similarity_top_k=10).query(
        "What documents do you see?"
    )
)
```

```
I see two documents.
```

## Add data and Ingest

Here, we can update an existing file, as well as add a new one!

```
!echo "This is a test file: three!" > test_redis_data/test3.txt
!echo "This is a NEW test file: one!" > test_redis_data/test1.txt
```

```
documents = SimpleDirectoryReader(
    "./test_redis_data", filename_as_id=True
).load_data()


nodes = pipeline.run(documents=documents)


print(f"Ingested {len(nodes)} Nodes")
```

```
13:32:07 redisvl.index.index INFO   Index already exists, not overwriting.
Ingested 2 Nodes
```

```
index = VectorStoreIndex.from_vector_store(
    pipeline.vector_store, embed_model=embed_model
)


response = index.as_query_engine(similarity_top_k=10).query(
    "What documents do you see?"
)


print(response)


for node in response.source_nodes:
    print(node.get_text())
```

```
You see three documents: test3.txt, test1.txt, and test2.txt.
This is a test file: three!
This is a NEW test file: one!
This is a test file: two!
```

As we can see, the data was deduplicated and upserted correctly! Only three nodes are in the index, even though we ran the full pipeline twice.
