Data Indexing
This showcase shows how to use Pathway to deploy a live data indexing pipeline, which can be queried like a typical vector store. However, under the hood, Pathway updates the index on each data change, always giving up-to-date answers.
Pathway Vectorstore enables building a document index on top of your documents without the complexity of ETL pipelines, managing different containers for storing, embedding, and serving. It allows for easy to manage, always up-to-date, LLM pipelines accesible using a RESTful API and with integrations to popular LLM toolkits such as Langchain and LlamaIndex.
In this article, we will use a simple document processing pipeline that:
- Monitors several data sources (files, S3 folders, cloud storages) for data changes.
- Parses, splits and embeds the documents.
- Builds a vector index for the data.
However, If you prefer not to create the pipeline from the ground up and would like to check out the functionality,
take a look at our managed pipelines
in action.
We will connect to the index using a VectorStore
client, which allows retrieval of semantically similar documents.
Prerequisites
Install the pathway
package. You can also install the unstructured
package to use the most powerful unstructured.io
-based parser.
Then download sample data.
!pip install pathway litellm
!pip install unstructured[all-docs]
!mkdir -p sample_documents
![ -f sample_documents/repo_readme.md ] || wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'sample_documents/repo_readme.md'
import logging
import sys
import time
logging.basicConfig(stream=sys.stderr, level=logging.WARN, force=True)
Building the data pipeline
First, make sure you have an API key with an LLM provider such as OpenAI.
import getpass
import os
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
We will now assemble the data vectorization pipeline, using a simple UTF8
file parser, a character splitter and an embedder from the Pathway LLM xpack.
First, we define the data sources. We use the files-based one for simplicity, but any supported pathway
connector, such as s3 or Google Drive will also work.
Then, we define the embedder and splitter.
Last, we assemble the data pipeline. We will start it running in a background thread to be able to query it immediately from the demonstration. Please note that in a production deployment, the server will run in another process, possibly on another machine. For the quick-start, we keep the server and client as different threads of the same Python process.
import pathway as pw
data_sources = []
data_sources.append(
pw.io.fs.read(
"./sample_documents",
format="binary",
mode="streaming",
with_metadata=True,
)
)
from pathway.xpacks.llm.embedders import OpenAIEmbedder
from pathway.xpacks.llm.splitters import TokenCountSplitter
from pathway.xpacks.llm.vector_store import VectorStoreClient, VectorStoreServer
PATHWAY_PORT = 8765
text_splitter = TokenCountSplitter()
embedder = OpenAIEmbedder(api_key=os.environ["OPENAI_API_KEY"])
vector_server = VectorStoreServer(
*data_sources,
embedder=embedder,
splitter=text_splitter,
)
vector_server.run_server(host="127.0.0.1", port=PATHWAY_PORT, threaded=True, with_cache=False)
time.sleep(30) # Workaround for Colab - messages from threads are not visible unless a cell is running
We now instantiate and configure the client
client = VectorStoreClient(
host="127.0.0.1",
port=PATHWAY_PORT,
)
And we can start asking queries
query = "What is Pathway?"
docs = client(query)
docs
Your turn! Now make a change to the source documents or make a fresh one and retry the query!
Integrations
Langchain
You can use a Pathway Vector Store in LangChain pipelines with PathwayVectorClient
and configure a VectorStoreServer
using LangChain components. For more information see our article or LangChain documentation.
!pip install langchain
!pip install langchain-openai
!pip install langchain-community
from langchain_community.vectorstores import PathwayVectorClient
client = PathwayVectorClient(host="127.0.0.1", port=PATHWAY_PORT)
docs = client.similarity_search("What is Pathway?")
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
embeddings_model = OpenAIEmbeddings(openai_api_key=os.environ["OPENAI_API_KEY"])
vector_server = VectorStoreServer.from_langchain_components(
*data_sources,
embedder=embeddings_model,
splitter=text_splitter,
)
vector_server.run_server(host="127.0.0.1", port=PATHWAY_PORT+1, threaded=True, with_cache=False)
time.sleep(30) # colab workaround
client = VectorStoreClient(
host="127.0.0.1",
port=PATHWAY_PORT+1,
)
client.query("pathway")
LlamaIndex
Pathway is fully integrated with LlamaIndex! We show below how to instantiate a Llama-Index retriever that queries the Pathway VectorStoreServer and how to configure a server using LlamaIndex components.
For more information see Pathway Retriever
cookbook.
!pip install llama-index llama-index-retrievers-pathway llama-index-embeddings-openai
from llama_index.retrievers.pathway import PathwayRetriever
pr = PathwayRetriever(host="127.0.0.1", port=PATHWAY_PORT)
pr.retrieve(str_or_query_bundle="What is Pathway?")
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter
embed_model = OpenAIEmbedding(embed_batch_size=10)
transformations_example = [
TokenTextSplitter(
chunk_size=150,
chunk_overlap=10,
separator=" ",
),
embed_model,
]
vector_server = VectorStoreServer.from_llamaindex_components(
*data_sources,
transformations=transformations_example,
)
vector_server.run_server(host="127.0.0.1", port=PATHWAY_PORT+2, threaded=True, with_cache=False)
time.sleep(30) # colab workaround
client = VectorStoreClient(
host="127.0.0.1",
port=PATHWAY_PORT+2,
)
client.query("pathway")
Advanced topics
Getting information on indexed files
PathwayVectorClient.get_vectorstore_statistics()
gives essential statistics on the state of the vector store, like the number of indexed files and the timestamp of the last updated one. You can use it in your chains to tell the user how fresh your knowledge base is.
client.get_vectorstore_statistics()
You can also use PathwayVectorClient.get_input_files()
to get the list of indexed files along with the associated metadata.
client.get_input_files()
Filtering based on file metadata
We support document filtering using jmespath expressions, for instance:
docs = client(query, metadata_filter="modified_at >= `1702672093`")
docs = client(query, metadata_filter="owner == `james`")
docs = client(query, metadata_filter="contains(path, 'repo_readme')")
docs = client(query, metadata_filter="owner == `james` && modified_at >= `1702672093`")
docs = client(query, metadata_filter="owner == `james` || modified_at >= `1702672093`")
Configuring the parser
The vectorization pipeline supports pluggable parsers. If not provided, defaults to UTF-8
parser. You can find available parsers here.
An example parser that can read PDFs, Word documents and other formats is provided with parsers.ParseUnstructured
:
from pathway.xpacks.llm import parsers
vector_server = VectorStoreServer(
*data_sources,
parser=parsers.ParseUnstructured(),
embedder=embeddings_model,
splitter=text_splitter,
)
Configuring the cache
The Pathway vectorizing pipeline comes with an embeddings cache:
vector_server.run_server(..., with_cache=True)
The default cache configuration is the locally hosted disk cache, stored in the ./Cache
directory. However, it can be customized by explicitly specifying the caching backend chosen among several persistent backend options.
Running in production
A production deployment will typically run the server in a separate process. We recommend running the Pathway data indexing pipeline in a container-based deployment environment like Docker or Kubernetes. For more info, see Pathway's deployment guide.
Join our Discord community and dive into discussions on tricks and tips for mastering Retrieval Augmented Generation