pw.xpacks.llm.llms

Pathway UDFs for calling LLMs

This momdule contains UDFs for calling LLMs chat services:

  1. wrappers over LLM APIs
  2. prompt building tools

class pw.xpacks.llm.llms.BaseChat(*args, **kwargs)

[source]

Base class for the LLM chat instances.

Subclasses need to implement __wrapped__ for use in Pathway.

Constructor arguments are passed to the UDF() constructor. Refer to UDF() for more information.

__call__(*args, **kwargs)

sourceCall self as a function.

class pw.xpacks.llm.llms.CohereChat(capacity=None, retry_strategy=None, cache_strategy=None, model='command', **cohere_kwargs)

[source]

Pathway wrapper for Cohere Chat services. Returns answer and cited docs as tuple[str, list[dict]]. Cited docs is empty list if there are no citations.

Model defaults to command.

The capacity, retry_strategy and cache_strategy need to be specified during object construction. All other arguments can be overridden during application.

  • Parameters
    • capacity (int | None) – Maximum number of concurrent operations allowed. Defaults to None, indicating no specific limit.
    • retry_strategy (AsyncRetryStrategy | None) – Strategy for handling retries in case of failures. Defaults to None, meaning no retries.
    • cache_strategy (CacheStrategy | None) – Defines the caching mechanism. To enable caching, a valid CacheStrategy should be provided. See Cache strategy for more information. Defaults to None.
    • model (str | None) – name of the model to use. Check the available models for details.
import pathway as pw
from pathway.xpacks.llm import llms
from pathway.udfs import ExponentialBackoffRetryStrategy
chat = llms.CohereChat(retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6))
t = pw.debug.table_from_markdown('''
txt
Wazzup?
''')
r = t.select(ret=chat(llms.prompt_chat_single_qa(t.txt)))
parsed_table = r.select(response=pw.this.ret[0], citations=pw.this.ret[1])
parsed_table
docs = [{"text": "Pathway is a high-throughput, low-latency data processing framework that handles live data & streaming for you."},
{"text": "RAG stands for Retrieval Augmented Generation."}]
t_with_docs = t.select(*pw.this, docs=docs)
r = t_with_docs.select(ret=chat(llms.prompt_chat_single_qa("what is rag?"), documents=pw.this.docs))
parsed_table = r.select(response=pw.this.ret[0], citations=pw.this.ret[1])
parsed_table

__call__(messages, documents=None, **kwargs)

sourceSends messages to Cohere Chat and returns response.

  • Parameters
    • messages (ColumnExpression[list[dict] | pw.Json]) – Column with messages to send. to the LLM
    • documents (ColumnExpression[list[dict] | pw.Json | tuple] | None) – Column with context documents to be sent to Cohere Chat. This argument is optional.
    • **kwargs – override for defaults set in the constructor.

class pw.xpacks.llm.llms.HFPipelineChat(model='gpt2', call_kwargs={}, device='cpu', **pipeline_kwargs)

[source]

Pathway wrapper for HuggingFace Pipeline.

  • Parameters
    • model (str | None) – ID of the model to be used
    • call_kwargs (dict) – kwargs that will be passed to each call of HuggingFace Pipeline. These can be overridden during each application. For possible arguments check the HuggingFace documentation.
    • device (str) – defines which device will be used to run the Pipeline
    • pipeline_kwargs – kwargs accepted during initialization of HuggingFace Pipeline. For possible arguments check the HuggingFace documentation.

call_kwargs can be overridden during application, all other arguments need to be specified during class construction.

Example:

import pathway as pw
from pathway.xpacks.llm import llms
chat = llms.HFPipelineChat(model="gpt2")
t = pw.debug.table_from_markdown('''
txt
Wazzup?
''')
r = t.select(ret=chat(llms.prompt_chat_single_qa(t.txt)))
r

__call__(messages, **kwargs)

sourceSends messages to the LLM and returns response.

  • Parameters
    • messages (ColumnExpression[list[dict] | pw.Json]) – Column with messages to send to the LLM
    • **kwargs – override for defaults set in the constructor

class pw.xpacks.llm.llms.LiteLLMChat(capacity=None, retry_strategy=None, cache_strategy=None, model=None, **litellm_kwargs)

[source]

Pathway wrapper for LiteLLM Chat services.

Model has to be specified either in constructor call or in each application, no default is provided. The capacity, retry_strategy and cache_strategy need to be specified during object construction. All other arguments can be overridden during application.

  • Parameters
    • capacity (int | None) – Maximum number of concurrent operations allowed. Defaults to None, indicating no specific limit.
    • retry_strategy (AsyncRetryStrategy | None) – Strategy for handling retries in case of failures. Defaults to None, meaning no retries.
    • cache_strategy (CacheStrategy | None) – Defines the caching mechanism. To enable caching, a valid CacheStrategy should be provided. See Cache strategy for more information. Defaults to None.
    • model (str | None) – ID of the model to use. Check the providers supported by LiteLLM for details on which models work with the LiteLLM API.
    • api_base – API endpoint to be used for the call.
    • api_version – API version to be used for the call. Only for Azure models.
    • num_retries – The number of retries if the API call fails.
    • context_window_fallback_dict – Mapping of fallback models to be used in case of context window error
    • fallbacks – List of fallback models to be used if the initial call fails
    • metadata – Additional data to be logged when the call is made.

For more information on provider specific arguments check the LiteLLM documentation.

Any arguments can be provided either to the constructor or in the UDF call. To specify the model in the UDF call, set it to None in the constructor.

Example:

import pathway as pw
from pathway.xpacks.llm import llms
from pathway.udfs import ExponentialBackoffRetryStrategy
chat = llms.LiteLLMChat(model=None, retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6))
t = pw.debug.table_from_markdown('''
txt     | model
Wazzup? | gpt-3.5-turbo
''')
r = t.select(ret=chat(llms.prompt_chat_single_qa(t.txt), model=t.model))
r

__call__(messages, **kwargs)

sourceSends messages to the LLM and returns response.

  • Parameters
    • messages (ColumnExpression[list[dict] | pw.Json]) – Column with messages to send to the LLM
    • **kwargs – override for defaults set in the constructor

class pw.xpacks.llm.llms.OpenAIChat(capacity=None, retry_strategy=None, cache_strategy=None, model='gpt-3.5-turbo', **openai_kwargs)

[source]

Pathway wrapper for OpenAI Chat services.

The capacity, retry_strategy and cache_strategy need to be specified during object construction. All other arguments can be overridden during application.

  • Parameters
    • capacity (int | None) – Maximum number of concurrent operations allowed. Defaults to None, indicating no specific limit.
    • retry_strategy (AsyncRetryStrategy | None) – Strategy for handling retries in case of failures. Defaults to None, meaning no retries.
    • cache_strategy (CacheStrategy | None) – Defines the caching mechanism. To enable caching, a valid CacheStrategy should be provided. See Cache strategy for more information. Defaults to None.
    • model (str | None) – ID of the model to use. See the model endpoint compatibility table for details on which models work with the Chat API.
    • frequency_penalty – Number between -2.0 and 2.0. Positive values penalize new tokens based on their existing frequency in the text so far, decreasing the model’s likelihood to repeat the same line verbatim.
      See more information about frequency and presence penalties.
    • function_call – Deprecated in favor of tool_choice.
      Controls which (if any) function is called by the model. none means the model will not call a function and instead generates a message. auto means the model can pick between generating a message or calling a function. Specifying a particular function via {“name”: “my_function”} forces the model to call that function.
      none is the default when no functions are present. auto is the default if functions are present.
    • functions – Deprecated in favor of tools.
      A list of functions the model may generate JSON inputs for.
    • logit_bias – Modify the likelihood of specified tokens appearing in the completion.
      Accepts a JSON object that maps tokens (specified by their token ID in the tokenizer) to an associated bias value from -100 to 100. Mathematically, the bias is added to the logits generated by the model prior to sampling. The exact effect will vary per model, but values between -1 and 1 should decrease or increase likelihood of selection; values like -100 or 100 should result in a ban or exclusive selection of the relevant token.
    • logprobs – Whether to return log probabilities of the output tokens or not. If true, returns the log probabilities of each output token returned in the content of message. This option is currently not available on the gpt-4-vision-preview model.
    • max_tokens – The maximum number of [tokens](/tokenizer) that can be generated in the chat completion.
      The total length of input tokens and generated tokens is limited by the model’s context length. Example Python code for counting tokens.
    • n – How many chat completion choices to generate for each input message. Note that you will be charged based on the number of generated tokens across all of the choices. Keep n as 1 to minimize costs.
    • presence_penalty – Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the text so far, increasing the model’s likelihood to talk about new topics.
      See more information about frequency and presence penalties.
    • response_format – An object specifying the format that the model must output. Compatible with gpt-4-1106-preview and gpt-3.5-turbo-1106.
      Setting to { “type”: “json_object” } enables JSON mode, which guarantees the message the model generates is valid JSON.
      Important: when using JSON mode, you must also instruct the model to produce JSON yourself via a system or user message. Without this, the model may generate an unending stream of whitespace until the generation reaches the token limit, resulting in a long-running and seemingly “stuck” request. Also note that the message content may be partially cut off if finish_reason=”length”, which indicates the generation exceeded max_tokens or the conversation exceeded the max context length.
    • seed – This feature is in Beta. If specified, our system will make a best effort to sample deterministically, such that repeated requests with the same seed and parameters should return the same result. Determinism is not guaranteed, and you should refer to the system_fingerprint response parameter to monitor changes in the backend.
    • stop – Up to 4 sequences where the API will stop generating further tokens.
    • stream – If set, partial message deltas will be sent, like in ChatGPT. Tokens will be sent as data-only server-sent events as they become available, with the stream terminated by a data: [DONE] message. Example Python code.
    • temperature – What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic.
      We generally recommend altering this or top_p but not both.
    • tool_choice – Controls which (if any) function is called by the model. none means the model will not call a function and instead generates a message. auto means the model can pick between generating a message or calling a function. Specifying a particular function via {“type: “function”, “function”: {“name”: “my_function”}} forces the model to call that function.
      none is the default when no functions are present. auto is the default if functions are present.
    • tools – A list of tools the model may call. Currently, only functions are supported as a tool. Use this to provide a list of functions the model may generate JSON inputs for.
    • top_logprobs – An integer between 0 and 5 specifying the number of most likely tokens to return at each token position, each with an associated log probability. logprobs must be set to true if this parameter is used.
    • top_p – An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered.
      We generally recommend altering this or temperature but not both.
    • user – A unique identifier representing your end-user, which can help OpenAI to monitor and detect abuse. Learn more.
    • extra_headers – Send extra headers
    • extra_query – Add additional query parameters to the request
    • extra_body – Add additional JSON properties to the request
    • timeout – Override the client-level default timeout for this request, in seconds

Any arguments can be provided either to the constructor or in the UDF call. To specify the model in the UDF call, set it to None in the constructor.

Example:

import pathway as pw
from pathway.xpacks.llm import llms
from pathway.udfs import ExponentialBackoffRetryStrategy
chat = llms.OpenAIChat(model=None, retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6))
t = pw.debug.table_from_markdown('''
txt     | model
Wazzup? | gpt-3.5-turbo
''')
r = t.select(ret=chat(llms.prompt_chat_single_qa(t.txt), model=t.model))
r

__call__(messages, **kwargs)

sourceSends messages to OpenAI Chat and returns response.

  • Parameters
    • messages (ColumnExpression[list[dict] | pw.Json]) – Column with messages to send to OpenAIChat
    • **kwargs – override for defaults set in the constructor

pw.xpacks.llm.llms.prompt_chat_single_qa(question)

sourceCreate chat prompt messages for single question answering. A string with a question is converted into one-element list with a dictionary with keys role and content.

  • Parameters
    question (ColumnExpression[str]) – a column with questions to be transformed into prompts

Example:

import pathway as pw
from pathway.xpacks.llm import llms
t = pw.debug.table_from_markdown('''
txt
Wazzup?
''')
r = t.select(prompt=llms.prompt_chat_single_qa(t.txt))
pw.debug.compute_and_print(r, include_id=False)