Introduction to the LLM xpack

The LLM xpack provides you all the tools you need to use Large Language Models in Pathway. Wrappers for most common LLM services and utilities are included, making working with LLMs as easy as it can be.

You can find ready-to-run LLM and RAG examples on our App Templates page.

In order to install Pathway along with LLM xpack functionality simply run:

`pip install "pathway[xpack-llm]"`

or

`pip install "pathway[all]"`

Queries table

Let's start with preparing a Pathway table containing queries that we want to send to the LLM.

import pathway as pw
query = pw.debug.table_from_markdown('''
message
How many 'r' there are in 'strawberry'?
''')

Wrappers for LLMs

Out of the box, the LLM xpack provides wrappers for text generation and embedding LLMs. For text generation, you can use native wrappers for the OpenAI chat model and HuggingFace models running locally. Many other popular models, including Azure OpenAI, HuggingFace (when using their API) or Gemini can be used with the LiteLLM wrapper. To check the full list of providers supported by LiteLLM check LiteLLM documentation.

UDFs

Each wrapper is a UDFClass (User Defined Function), which allows users to define their own functions to interact with Pathway objects. A UDF, in general, is any function that takes some input, processes it, and returns an output. In the context of the Pathway library, UDFs enable seamless integration of custom logic, such as invoking LLMs for specific tasks.

In particular a UDF can serve as a wrapper for LLM calls, allowing users to pass prompts or other inputs to a model and retrieve the corresponding outputs. This design makes it easy to interact with Pathway tables and columns while incorporating the power of LLMs.

OpenAIChat

Pathway comes with many predefined UDFs, including those that manage connections with OpenAI models.

To use a wrapper, first create an instance of the wrapper, which you can then apply to a column containing prompts. For OpenAI, you create a wrapper using the OpenAIChat class.

from pathway.xpacks.llm.llms import OpenAIChat

model = OpenAIChat(
    model="gpt-3.5-turbo",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# Send queries from column `messages` in table `query` to OpenAI
responses = query.select(result=model(pw.this.messages))

Preparing queries

OpenAIChat expects messages to be in the format required by OpenAI API - that is a list of dictionaries, where each dictionary is one message in the conversation so far. If you want to ask single questions use pw.xpacks.llm.llm.prompt_chat_single_qa to wrap them so that they match the format expected by OpenAI API.

from pathway.xpacks.llm.llms import prompt_chat_single_qa

# Column `prompt` holds strings with questions to be sent to OpenAI chat 
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))

Model Parameters

OpenAI API takes a number of parameters, including model and api_key used in the code stubs above. OpenAIChat allows you to set their default value during the initialization of the class, but you can also override them during application.

model = OpenAIChat(
    model="gpt-3.5-turbo",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
    max_tokens=200, # Set default value of max_tokens to be 200
)
# As max_tokens is not set, value 200 will be used
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))
# Now value of max_tokens is taken from column `max_tokens`, overriding default value set when initializing OpenAIChat
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt), max_tokens(pw.this.max_tokens)))

What about other models?

So far we focused on the wrapper for the OpenAI chat model, but other wrappers work in the same way!

Pathway has two more wrappers for chat models - LiteLLMChat and HFPipelineChat. For example, to use Gemini with LiteLLM, create an instance of LiteLLMChat and then apply it to the column with messages to be sent over API.

model = LiteLLMChat(
    model="gemini/gemini-pro", # Choose the model you want
    api_key=os.environ["GEMINI_API_KEY"], # Read GEMINI API key from environmental variables
)
# Ask Gemini questions from `prompt` column 
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))

With the wrapper for LiteLLM, Pathway allows you to use many popular LLMs. For models from HuggingFace that you want to run locally, Pathway gives a separate wrapper (for calling HuggingFace through API, use LiteLLM wrapper), called HFPipelineChat. When an instance of this wrapper is created, it initializes a HuggingFace pipeline, so any arguments to the pipeline - including the name of the model - must be set during the initialization of HFPipelineChat. Any parameters to pipeline.__call__ can be as before set during initialization or overridden during application.

model = HFPipelineChat(
    model="gpt2", # Choose the model you want
)
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))

Pathway also comes with wrappers for embedding models - OpenAIEmbedder, LiteLLMEmbedder and SentenceTransformersEmbedder. Each of them can be applied to a column of strings and returns a column with numpy arrays - the embeddings.

embedder = OpenAIEmbedder(
    model="text-embedding-ada-002", # model for embedding
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# calculate embedding for column `text` in table `documents`
responses = documents.select(result=embedder(pw.this.text))

Asynchrony

Wrapper for OpenAI and LiteLLM, both for chat and embedding, are asynchronous, and Pathway allows you to set three parameters to set their behavior. These are:

  • capacity, which sets the number of concurrent operations allowed,
  • retry_strategy, which sets the strategy for handling retries in case of failures,
  • cache_strategy, which defines the cache mechanism.

These three parameters need to be set during the initialization of the wrapper. You can read more about them in the UDFs guide.

model = OpenAIChat(
    # maximum concurrent operations is 10
    capacity=10,
    # in case of failure, retry 5 times, each time waiting twice as long before retrying
    retry_strategy=pw.udfs.ExponentialBackoffRetryStrategy(max_retries=5, initial_delay=1000, backoff_factor=2),
    # if PATHWAY_PERSISTENT_STORAGE is set, then it is used to cache the calls
    cache_strategy=pw.udfs.DefaultCache(),
    # select the model
    model="gpt-3.5-turbo",
    # read OpenAI API key from environmental variables
    api_key=os.environ["OPENAI_API_KEY"],
)
responses = query.select(result=model(prompt_chat_single_qa(pw.this.prompt)))

Creating a Pathway LLM pipeline

You can now combine these wrappers to create an LLM pipeline using Pathway. To learn how to do this, read our tutorial.

Preparing documents for LLMs

The Pathway xpack for LLMs provides tools for preparing your documents and texts in order to use them with LLMs. You can use one of our UDFs like ParseUnstructured for parsing your documents into texts and TokenCountSplitter for dividing texts into smaller chunks.

Parsing documents

Use the ParseUnstructured class to parse documents in Pathway. Underneath, it utilizes the Unstructured library to parse your documents. To use it, you need to read the contents of a file into a Pathway Table using any connector of your choice. Then, apply an instance of the ParseUnstructured class to get a Pathway Table with parsed content of documents. ParseUnstructured has an argument mode which takes one of three values: single, paged or elements. If set to single, the whole document is returned as one string, if set to paged then there is a string for each page in the document, and if set to elements then Unstructured's division into elements is preserved. The mode argument can be set either during initialization or execution of ParseUnstructured.

import os
import pathway as pw
from pathway.xpacks.llm.parsers import ParseUnstructured

files = pw.io.fs.read(
    os.environ.get("DATA_DIR"),
    mode="streaming",
    format="binary",
    autocommit_duration_ms=50,
)
parser = ParseUnstructured(mode="elements")
documents = files.select(elements=parser(pw.this.data))

ParseUnstructured for a document returns a list of tuples with parsed text and associated metadata returned from Unstructured. If you want to have each string with text in another row of the table, you should use the flatten function.

documents = documents.flatten(pw.this.elements) # flatten list into multiple rows
documents = documents.select(text=pw.this.elements[0], metadata=pw.this.elements[1]) # extract text and metadata from tuple

Splitting texts

Once you have some texts in a Pathway Table, you can use the TokenCountSplitter class to divide them into smaller chunks. It tries to split the text in such a way that each part has between min_token and max_token tokens, but so that sentences are not cut in half.

TokenCountSplitter has three parameters - min_token, max_token and encoding - and each of them can be overridden during the call of the function. min_token and max_token, as mentioned above, set the minimum and maximum length of each chunk, whereas encoding is the name of the tiktoken encoding to be used.

from pathway.xpacks.llm.splitters import TokenCountSplitter

splitter = TokenCountSplitter(min_tokens=100, max_tokens=300, encoding)
texts = documents.select(chunk=splitter(pw.this.text))

TokenCountSplitter returns data in the same format as ParseUnstructured - that is for each row it returns a list of tuples, where each tuple consists of a string with the text of a chunk and a dictionary with associated metadata.

Ready-to-use Document Store

With these tools it is easy to create in Pathway a pipeline serving as a DocumentStore, which automatically indexes documents and gets updated upon new data.

To make interaction with DocumentStore easier you can also use DocumentStoreServer that handles API calls.

You can learn more about Document Store in Pathway in a dedicated tutorial and check out a QA app example in the llm-app repository.

Integrating with LlamaIndex and LangChain

Vector Store offer integrations with both LlamaIndex and LangChain. These allow you to incorporate Vector Store Client in your LlamaIndex and LangChain pipelines or use LlamaIndex and LangChain components in the Vector Store. Read more about the integrations in the article on LlamaIndex and on LangChain.

Rerankers

Rerankers evaluate the relevance of documents to a given query, commonly used in a two-stage retrieval process. Initially, a vector store retrieves a broad set of documents, typically more than needed for query context, many of which may be irrelevant. This occurs because indexing flattens a document's entire meaning into a single vector, which can reduce accuracy. Rerankers refine this by reassessing each document’s relevance. Running a reranker is more expensive than running an index-based retrieval, but it usually provides a significant improvement in accuracy.

Pathway offers three rerankers:

Additionally, once you rank the documents, you can use rerank_topk_filter to choose k best documents.

Modular RAGs

To combine all the pieces into a RAG (Retrieval Augmented Generation), you can use one of modular RAGs available in the LLM xpack. BaseRAGQuestionAnswerer is a standard RAG, that given a query obtains k best documents from the vector store, and sends them along the question to the LLM chat. AdaptiveRAGQuestionAnswerer tries to limit the number of documents sent to the LLM chat to save tokens - to do that it initially sends only a small number of documents to the chat, which is increased until an answer is found. To read more, why that can save tokens without sacrificing accuracy, check our showcase on Adaptive RAG.

Both these RAGs are customizable with an LLM model used to answer questions, a vector store for retrieving documents and templates for embedding context chunks in the question.

Discuss tricks & tips for RAG

Join our Discord community and dive into discussions on tricks and tips for mastering Retrieval Augmented Generation