LLM Chats

Out of the box, the LLM xpack provides wrappers for text generation and embedding LLMs. For text generation, you can use native wrappers for the OpenAI chat model and HuggingFace models running locally. Many other popular models, including Azure OpenAI, HuggingFace (when using their API) or Gemini can be used with the LiteLLM wrapper. To check the full list of providers supported by LiteLLM check LiteLLM documentation. Currently, Pathway provides wrappers for the following LLMs:

To use a wrapper, first create an instance of the wrapper, which you can then apply to a column containing prompts.

We create a Pathway table to be used in the examples below:

import pathway as pw
queries = pw.debug.table_from_markdown(
    """
questions | max_tokens
How many 'r' there are in 'strawberry'? | 400
""",
    split_on_whitespace=False,
)

UDFs

Each wrapper is a UDF (User Defined Function), which allows users to define their own functions to interact with Pathway objects. A UDF, in general, is any function that takes some input, processes it, and returns an output. In the context of the Pathway library, UDFs enable seamless integration of custom logic, such as invoking LLMs for specific tasks.

In particular a UDF can serve as a wrapper for LLM calls, allowing users to pass prompts or other inputs to a model and retrieve the corresponding outputs. This design makes it easy to interact with Pathway tables and columns while incorporating the power of LLMs.

OpenAIChat

For OpenAI, you create a wrapper using the OpenAIChat class.

from pathway.xpacks.llm import llms

model = llms.OpenAIChat(
    model="gpt-4o-mini",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
)
# Send queries from column `question` in table `queries` to OpenAI
responses = queries.select(result=model(llms.prompt_chat_single_qa(pw.this.questions)))
# Run the computations (including sending requests to OpenAI) and print the output table
pw.debug.compute_and_print(responses)

Message format

OpenAIChat expects messages to be in the format required by OpenAI API - that is a list of dictionaries, where each dictionary is one message in the conversation so far. For asking a single question, you can use pw.xpacks.llm.llm.prompt_chat_single_qa to wrap a string so that it matches the format expected by OpenAI API. Our example above presents that use case.

If you, however, want to have more control over messages sent to OpenAI, you can prepare the messages yourself.

messages = pw.debug.table_from_rows(
    pw.schema_from_types(questions=list[dict]),
    rows=[
        (
            [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "How many 'r' there are in 'strawberry'?"},
            ],
        )
    ],
)
responses = messages.select(result=model(pw.this.questions))
pw.debug.compute_and_print(responses)

Model parameters

OpenAI API takes a number of parameters, including model and api_key used in the code stubs above. OpenAIChat allows you to set their default value during the initialization of the class, but you can also override them during application.

model = llms.OpenAIChat(
    model="gpt-4o-mini",
    api_key=os.environ["OPENAI_API_KEY"], # Read OpenAI API key from environmental variables
    max_tokens=200, # Set default value of max_tokens to be 200
)
# As max_tokens is not set, value 200 will be used
responses = queries.select(result=model(llms.prompt_chat_single_qa(pw.this.questions)))
# Now value of max_tokens is taken from column `max_tokens`, overriding default value set when initializing OpenAIChat
responses = queries.select(result=model(llms.prompt_chat_single_qa(pw.this.questions), max_tokens(pw.this.max_tokens)))
pw.debug.compute_and_print(responses)

LiteLLM

Pathway has a wrapper for LiteLLM - LiteLLMChat. For example, to use Gemini with LiteLLM, create an instance of LiteLLMChat and then apply it to the column with messages to be sent over API.

from pathway.xpacks.llm import llms

model = llms.LiteLLMChat(
    model="gemini/gemini-pro", # Choose the model you want
    api_key=os.environ["GEMINI_API_KEY"], # Read GEMINI API key from environmental variables
)
# Ask Gemini questions from `prompt` column 
responses = queries.select(result=model(llms.prompt_chat_single_qa(pw.this.questions)))
pw.debug.compute_and_print(responses)

With the wrapper for LiteLLM, Pathway allows you to use many popular LLMs.

Hugging Face pipeline

For models from Hugging Face that you want to run locally, Pathway gives a separate wrapper called HFPipelineChat (for calling HuggingFace through API, use LiteLLM wrapper). When an instance of this wrapper is created, it initializes a HuggingFace pipeline, so any arguments to the pipeline - including the name of the model - must be set during the initialization of HFPipelineChat. Any parameters to pipeline.__call__ can be as before set during initialization or overridden during application.

from pathway.xpacks.llm import llms

model = llms.HFPipelineChat(
    model="gpt2", # Choose the model you want
)
responses = queries.select(result=model(pw.this.questions))
pw.debug.compute_and_print(responses)

Note that format of questions used in Hugging Face pipeline depends on the model. Some models, like gpt2, expect a prompt string, whereas conversation models also accept messages as a list of dicts. The model's prompt template will be used if a conversation with a list of dicts is passed. For more information, see pipeline docs.

For example for model TinyLlama/TinyLlama-1.1B-Chat-v1.0, you can use it with:

from pathway.xpacks.llm import llms

model = llms.HFPipelineChat(
    model="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
)
responses = queries.select(result=model(llms.prompt_chat_single_qa(pw.this.questions)))
pw.debug.compute_and_print(responses)

Cohere

Pathway has a wrapper for the Cohere Chat Services. The wrapper allows for augmenting the query with documents. The result contains cited documents along with the response.

from pathway.xpacks.llm import llms

model = llms.CohereChat()
queries_with_docs = pw.debug.table_from_rows(
    schema=pw.schema_from_types(questions=str, docs=list[dict]),
    rows=[
        (
            "What is RAG?",
            [
                {
                    "text": "Pathway is a high-throughput, low-latency data processing framework that handles live data & streaming for you."
                },
                {"text": "RAG stands for Retrieval Augmented Generation."},
            ],
        )
    ],
)

r = queries_with_docs.select(
    ret=model(llms.prompt_chat_single_qa(pw.this.questions), documents=pw.this.docs)
)
parsed_table = r.select(response=pw.this.ret[0], citations=pw.this.ret[1])
pw.debug.compute_and_print(parsed_table)

Wrappers are asynchronous

Wrapper for OpenAI and LiteLLM, both for chat and embedding, are asynchronous, and Pathway allows you to set three parameters to set their behavior. These are:

  • capacity, which sets the number of concurrent operations allowed,
  • retry_strategy, which sets the strategy for handling retries in case of failures,
  • cache_strategy, which defines the cache mechanism.

These three parameters need to be set during the initialization of the wrapper. You can read more about them in the UDFs guide.

model = llms.OpenAIChat(
    # maximum concurrent operations is 10
    capacity=10,
    # in case of failure, retry 5 times, each time waiting twice as long before retrying
    retry_strategy=pw.udfs.ExponentialBackoffRetryStrategy(max_retries=5, initial_delay=1000, backoff_factor=2),
    # if PATHWAY_PERSISTENT_STORAGE is set, then it is used to cache the calls
    cache_strategy=pw.udfs.DefaultCache(),
    # select the model
    model="gpt-4o-mini",
    # read OpenAI API key from environmental variables
    api_key=os.environ["OPENAI_API_KEY"],
)
responses = queries.select(result=model(prompt_chat_single_qa(pw.this.questions)))
pw.debug.compute_and_print(responses)