Create your own real-time RAG with Pathway
In this guide, you will learn how to construct a dynamic, real-time RAG App using Pathway and OpenAI. Retrieval-Augmented Generation (RAG) is a powerful approach that combines the strengths of information retrieval and generative language models to provide accurate and contextually relevant answers to user queries.
Indexing the documents in real-time is the only way to make sure the answers you get from your RAG pipeline are based on the latest version of your documents and are not outdated. Pathway allows you to easily index in real-time your documents so your RAG is always up-to-date.
You can find ready-to-run LLM and RAG examples on our App Templates page.
RAG architecture
Here's how the RAG structure works:
- Document Indexing: The process begins with a collection of documents that are indexed and stored in a searchable format. Indexing involves analyzing the content of each document to identify key terms and phrases, which are then organized for efficient retrieval.
- User Query: A user inputs a query, which could be a question or a request for information. This query serves as the starting point for the RAG process.
- Document Retrieval: The retrieval system takes the user's query and searches through the indexed documents to find the most relevant pieces of information. This step uses advanced algorithms to quickly identify and retrieve documents that are likely to contain the answer to the query.
- Context Building: The retrieved documents are then used to build a context. This context includes the relevant information extracted from the documents, which will be used to generate a response.
- Prompt Construction: A prompt is constructed by combining the user's query with the context built from the retrieved documents. This prompt serves as the input for the generative language model.
- Answer Generation: The generative language model processes the prompt and generates a coherent and accurate response. This model is trained to understand and produce human-like text, ensuring that the response is informative and contextually appropriate.
- Final Output: The generated response is presented to the user.
By integrating retrieval and generation, RAG ensures that the responses are not only accurate but also contextually relevant, making it a robust solution for complex information retrieval tasks.
Prerequisites
Before you start, ensure you have the necessary package installed. You can install it using the following command:
pip install pathway[xpack-llm] python-dotenv
You can learn more about Pathway installation in the relevant article.
You will also need an OpenAI API key, which should be set into a .env
file:
OPENAI_API_KEY="sk-..."
Imports
Import the Pathway and the various components from the xpacks.llm
package for handling language models.
import pathway as pw
from pathway.stdlib.indexing.nearest_neighbors import BruteForceKnnFactory
from pathway.xpacks.llm import llms
from pathway.xpacks.llm.document_store import DocumentStore
from pathway.xpacks.llm.embedders import OpenAIEmbedder
from pathway.xpacks.llm.parsers import UnstructuredParser
from pathway.xpacks.llm.splitters import TokenCountSplitter
You will also need os
and dotenv
to load the API key from the .env
file:
from dotenv import load_dotenv
import os
load_dotenv()
Document Indexing
The first step is to index your documents. You need to ingest them into the pipeline using Pathway connectors.
This example will focus on PDF documents stored locally on the file system, in a ./data/
directory.
You can load the documents from a specified directory using the file system connector.
documents = pw.io.fs.read("./data/", format="binary", with_metadata=True)
You can find the list of Pathway connectors here.
Now, you need to index those documents.
This can be easily done in Pathway using a DocumentStore which stores and manages the documents and their embeddings.
The DocumentStore
handles the parsing, post-processing and splitting. It then indexes the documents and provide a retrieval function retrieve_query
.
The document store requires the following parameters:
- Text Splitter: Splits documents into manageable chunks based on token count.
- Embedder: Converts text into embeddings, which are numerical representations that capture semantic meaning.
- Retriever Factory: Uses embeddings to find the most relevant documents for a given query.
- Parser: Extracts and structures text from documents.
In this example, you will do a token-based chunking compatible with OpenAI's embedding models.
You can use Pathway OpenAIEmbedder
to use OpenAI Embedding services.
For the retrieval of the documents, let's use a simple brute force strategy.
text_splitter = TokenCountSplitter(
min_tokens=100, max_tokens=500, encoding_name="cl100k_base"
)
embedder = OpenAIEmbedder(api_key=os.environ["OPENAI_API_KEY"])
retriever_factory = BruteForceKnnFactory(
embedder=embedder,
)
parser = UnstructuredParser(
chunking_mode="by_title",
chunking_kwargs={
"max_characters": 3000,
"new_after_n_chars": 2000,
},
)
With all components ready, you can create the document store:
document_store = DocumentStore(
docs=documents,
retriever_factory=retriever_factory,
parser=parser,
splitter=text_splitter,
)
You can learn more about document indexing in Pathway in our dedicated article.
User queries
To receive the users' queries and forward the answers, let's use a lightweight HTTP server.
You can configure the HTTP server using the PathwayWebserver class.
webserver = pw.io.http.PathwayWebserver(host="0.0.0.0", port=8011)
You also need to define the input schema, using pw.Schema, which helps to enforce the structure of the data being processed by Pathway.
In our example, the queries will have a single field messages
.
class QuerySchema(pw.Schema):
messages: str
Once everything is ready, you can create the server using Pathway REST connector.
queries, writer = pw.io.http.rest_connector(
webserver=webserver,
schema=QuerySchema,
autocommit_duration_ms=50,
delete_completed_queries=False,
)
Now users can send queries to Pathway via HTTP requests.
For document retrieval, you need to format the queries to fit the schema expected by the document store:
query
: the query sent by the user,k
: the number of documents to retrieve for the query,metadata_filter
(optional): filters using metadata,filepath_globalpattern
(optional): path pattern used to filter files.
Let's only retrieve a single document per query, with no filtering:
queries = queries.select(
query = pw.this.messages,
k = 1,
metadata_filter = None,
filepath_globpattern = None,
)
Note that here we use a single value (k=1
) for all the queries, but this is something that can be adapted per query.
For example, this can be specified by the used in the query with an additional parameter k
in QuerySchema
.
Document retrieval
Now that we have our index and our queries, you need to retrieve the most relevant documents for each query. This is easily done using the document store:
retrieved_documents = document_store.retrieve_query(queries)
The retrieved documents are stored in the column result
.
This column contains a JSON containing both the content and other metadata.
Let's rename this column docs
for clarity reasons.
retrieved_documents = retrieved_documents.select(docs=pw.this.result)
Combine the original queries with the retrieved documents to create a context for generating answers.
queries_context = queries + retrieved_documents
Answer Generation
Using the query and its associated documents, you can build the prompt and generate an answer using an LLM.
Build the context and the prompt
The context is the information the LLM needs to answer the question: it's the content of the retrieved documents.
Create a context from the retrieved documents and build a prompt that includes both the context and the user's query. This prompt will be used to generate the final answer.
The context will simply be the content of the documents put one after the other.
The content is stored in the text
part of the JSON.
def get_context(documents):
content_list = []
for doc in documents:
content_list.append(str(doc["text"]))
return " ".join(content_list)
Note: this function will be applied on a single row, not a table.
Also, in our example there is no need to store all the content in a list since there is only a single retrieved document.
This was done to be sure it works with higher values of k
.
You can build a UDF to create the prompt:
@pw.udf
def build_prompts_udf(documents, query) -> str:
context = get_context(documents)
prompt = (
f"Given the following documents : \n {context} \nanswer this query: {query}"
)
return prompt
prompts = queries_context+queries_context.select(
result=build_prompts_udf(pw.this.docs, pw.this.query)
)
Define the model
Pathway provides wrappers for most of the LLM providers.
You will use the OpenAIChat
to send prompt to OpenAI.
You need to specify the language model and your API key:
model = llms.OpenAIChat(
model="gpt-4o-mini",
api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
This model
is a UDF and can be directly applied to a column.
Generate the answers using an LLM API
Using OpenAIChat
, you can ask OpenAI to answer your prompts using the wanted LLM.
Use the prompt_chat_single_qa
to transforms the questions into prompts compatible with model
.
response = prompts.select(
*pw.this.without(pw.this.query, pw.this.prompts, pw.this.docs),
result=model(
llms.prompt_chat_single_qa(pw.this.prompts),
),
)
Returning Answers
Send the generated answers back to the user via the HTTP server using the writer.
writer(responses)
Run the pipeline and try it out!
That's it!
Now that your pipeline is ready, you need to run it using pw.run()
pw.run()
Start the Python script to launch the web server and begin processing queries.
python main.py
You can send a test query using curl to see the pipeline in action.
curl --data '{ "messages": "What is the value of X?"}' http://localhost:8011
You can use more advanced tools like Streamlit to have a UI.
What about real-time?
Some one of might remember that you were promised "real-time RAG", not a regular static one. No worries! Pathway automatically adapts to changes, ensuring your RAG stays updated in real time.
With Pathway, static and real-time data are treated the same way: your pipeline is already working on data stream!
Once your documents are preprocessed and indexed, Pathway automatically detects any changes in the document directory and updates the document store accordingly. This real-time reactivity ensures that the RAG's responses are always based on the most recent and relevant information available.
Conclusions and Pathway wrappers
In this tutorial, you have learned to create a RAG pipeline from scratch with Pathway: you have implemented the pipeline step-by-step. The RAG is live by default, updating the index whenever the documentation changes.
In addition to our ready-to-run templates, Pathway provides pre-build wrapper to use pre-selected prompts, servers, or even entire RAG pipeline.
Instead of doing everything from scratch, take a look at the documentation, you might find what you are trying to do!