pw.xpacks.llm.question_answering

class pw.xpacks.llm.question_answering.AdaptiveRAGQuestionAnswerer(llm, indexer, *, default_llm_name=None, short_prompt_template=prompts.prompt_short_qa, long_prompt_template=prompts.prompt_qa, summarize_template=prompts.prompt_summarize, n_starting_documents=2, factor=2, max_iterations=4, strict_prompt=False)

[source]

Builds the logic and the API for adaptive RAG application.

It allows to build a RAG app with Pathway vector store and Pathway components. Gives the freedom to choose between two question answering strategies, short (concise), and long (detailed) response, that can be set during the post request. Allows for LLM agnosticity with freedom to choose from proprietary or open-source LLMs.

It differs from BaseRAGQuestionAnswerer in adaptive choosing the number of chunks used as a context of a question. First, only n_starting_documents chunks are used, and then the number is increased until an answer is found.

  • Parameters
    • llm (BaseChat) – LLM instance for question answering. See https://pathway.com/developers/api-docs/pathway-xpacks-llm/llms for available models.
    • indexer (VectorStoreServer | DocumentStore) – Indexing object for search & retrieval to be used for context augmentation.
    • default_llm_name (str | None) – Default LLM model to be used in queries, only used if model parameter in post request is not specified. Omitting or setting this to None will default to the model name set during LLM’s initialization.
    • short_prompt_template (UDF) – Template for document question answering with short response. A pw.udf function is expected. Defaults to pathway.xpacks.llm.prompts.prompt_short_qa.
    • long_prompt_template (UDF) – Template for document question answering with long response. A pw.udf function is expected. Defaults to pathway.xpacks.llm.prompts.prompt_qa.
    • summarize_template (UDF) – Template for text summarization. Defaults to pathway.xpacks.llm.prompts.prompt_summarize.
    • n_starting_documents (int) – Number of documents embedded in the first query.
    • factor (int) – Factor by which a number of documents increases in each next query, if an answer is not found.
    • max_iterations (int) – Number of times to ask a question, with the increasing number of documents.
    • strict_prompt (bool) – If LLM should be instructed strictly to return json. Increases performance in small open source models, not needed in OpenAI GPT models.

Example:

import pathway as pw  
from pathway.xpacks.llm import embedders, splitters, llms, parsers  
from pathway.xpacks.llm.vector_store import VectorStoreServer  
from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy  
from pathway.xpacks.llm.question_answering import AdaptiveRAGQuestionAnswerer  
my_folder = pw.io.fs.read(
    path="/PATH/TO/MY/DATA/*",  # replace with your folder
    format="binary",
    with_metadata=True)  
sources = [my_folder]  
app_host = "0.0.0.0"  
app_port = 8000  
parser = parsers.ParseUnstructured()  
text_splitter = splitters.TokenCountSplitter(max_tokens=400)  
embedder = embedders.OpenAIEmbedder(cache_strategy=DiskCache())  
vector_server = VectorStoreServer(  
    *sources,
    embedder=embedder,
    splitter=text_splitter,
    parser=parser,
)
chat = llms.OpenAIChat(  
    model=DEFAULT_GPT_MODEL,
    retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6),
    cache_strategy=DiskCache(),
    temperature=0.05,
)
app = AdaptiveRAGQuestionAnswerer(  
    llm=chat,
    indexer=vector_server,
)
app.build_server(host=app_host, port=app_port)  
app.run_server()

answer_query(pw_ai_queries)

sourceCreate RAG response with adaptive retrieval.

build_server(host, port, **rest_kwargs)

sourceAdds HTTP connectors to input tables, connects them with table transformers.

serve_callable(route, schema=None, retry_strategy=None, cache_strategy=None, **additional_endpoint_kwargs)

sourceServe additional endpoints by wrapping callables. Expects an endpoint route. Schema is optional, adding schema type will enforce the

webserver to check arguments.

Beware that if Schema is not set, incorrect types may cause runtime error.

Example:

@rag_app.serve_callable(route="/agent")  
async def some_func(user_query: str) -> str:
    # define your agent, or custom RAG using any framework or plain Python
    # ...
    messages = [{"role": "user", "content": user_query}]
    result = agent.invoke(messages)
    return result

summarize_query(summarize_queries)

sourceFunction for summarizing given texts.

class pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer(llm, indexer, *, default_llm_name=None, short_prompt_template=prompts.prompt_short_qa, long_prompt_template=prompts.prompt_qa, summarize_template=prompts.prompt_summarize, search_topk=6)

[source]

Builds the logic and the API for basic RAG application.

Base class to build RAG app with Pathway vector store and Pathway components. Gives the freedom to choose between two question answering strategies, short (concise), and long (detailed) response, that can be set during the post request. Allows for LLM agnosticity with freedom to choose from proprietary or open-source LLMs.

  • Parameters
    • llm (BaseChat) – LLM instance for question answering. See https://pathway.com/developers/api-docs/pathway-xpacks-llm/llms for available models.
    • indexer (VectorStoreServer | DocumentStore) – Indexing object for search & retrieval to be used for context augmentation.
    • default_llm_name (str | None) – Default LLM model to be used in queries, only used if model parameter in post request is not specified. Omitting or setting this to None will default to the model name set during LLM’s initialization.
    • short_prompt_template (UDF) – Template for document question answering with short response. A pw.udf function is expected. Defaults to pathway.xpacks.llm.prompts.prompt_short_qa.
    • long_prompt_template (UDF) – Template for document question answering with long response. A pw.udf function is expected. Defaults to pathway.xpacks.llm.prompts.prompt_qa.
    • summarize_template (UDF) – Template for text summarization. Defaults to pathway.xpacks.llm.prompts.prompt_summarize.
    • search_topk (int) – Top k parameter for the retrieval. Adjusts number of chunks in the context.

Example:

import pathway as pw  
from pathway.xpacks.llm import embedders, splitters, llms, parsers  
from pathway.xpacks.llm.vector_store import VectorStoreServer  
from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy  
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer  
from pathway.xpacks.llm.servers import QASummaryRestServer 
my_folder = pw.io.fs.read(
    path="/PATH/TO/MY/DATA/*",  # replace with your folder
    format="binary",
    with_metadata=True)  
sources = [my_folder]  
app_host = "0.0.0.0"  
app_port = 8000  
parser = parsers.ParseUnstructured()  
text_splitter = splitters.TokenCountSplitter(max_tokens=400)  
embedder = embedders.OpenAIEmbedder(cache_strategy=DiskCache())  
vector_server = VectorStoreServer(  
    *sources,
    embedder=embedder,
    splitter=text_splitter,
    parser=parser,
)
chat = llms.OpenAIChat(  
    model=DEFAULT_GPT_MODEL,
    retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6),
    cache_strategy=DiskCache(),
    temperature=0.05,
)
rag = BaseRAGQuestionAnswerer(  
    llm=chat,
    indexer=vector_server,
)
app = QASummaryRestServer(app_host, app_port, rag)  
app.run_server()

answer_query(pw_ai_queries)

sourceMain function for RAG applications that answer questions based on available information.

build_server(host, port, **rest_kwargs)

sourceAdds HTTP connectors to input tables, connects them with table transformers.

serve_callable(route, schema=None, retry_strategy=None, cache_strategy=None, **additional_endpoint_kwargs)

sourceServe additional endpoints by wrapping callables. Expects an endpoint route. Schema is optional, adding schema type will enforce the

webserver to check arguments.

Beware that if Schema is not set, incorrect types may cause runtime error.

Example:

@rag_app.serve_callable(route="/agent")  
async def some_func(user_query: str) -> str:
    # define your agent, or custom RAG using any framework or plain Python
    # ...
    messages = [{"role": "user", "content": user_query}]
    result = agent.invoke(messages)
    return result

summarize_query(summarize_queries)

sourceFunction for summarizing given texts.

class pw.xpacks.llm.question_answering.DeckRetriever(llm, indexer, *, default_llm_name=None, short_prompt_template=prompts.prompt_short_qa, long_prompt_template=prompts.prompt_qa, summarize_template=prompts.prompt_summarize, search_topk=6)

[source]

Class for slides search.

answer_query(pw_ai_queries)

sourceReturn similar docs from the index.

build_server(host, port, **rest_kwargs)

sourceAdds HTTP connectors to input tables, connects them with table transformers.

serve_callable(route, schema=None, retry_strategy=None, cache_strategy=None, **additional_endpoint_kwargs)

sourceServe additional endpoints by wrapping callables. Expects an endpoint route. Schema is optional, adding schema type will enforce the

webserver to check arguments.

Beware that if Schema is not set, incorrect types may cause runtime error.

Example:

@rag_app.serve_callable(route="/agent")  
async def some_func(user_query: str) -> str:
    # define your agent, or custom RAG using any framework or plain Python
    # ...
    messages = [{"role": "user", "content": user_query}]
    result = agent.invoke(messages)
    return result

summarize_query(summarize_queries)

sourceFunction for summarizing given texts.

class pw.xpacks.llm.question_answering.RAGClient(host=None, port=None, url=None, timeout=90, additional_headers=None)

[source]

Connector for interacting with the Pathway RAG applications. Either (host and port) or url must be set.

  • Parameters
    • host (-) – The host of the RAG service.
    • port (-) – The port of the RAG service.
    • url (-) – The URL of the RAG service.
    • timeout (-) – Timeout for requests in seconds. Defaults to 90.
    • additional_headers (-) – Additional headers for the requests.

pw_ai_answer(prompt, filters=None, model=None)

sourceReturn RAG answer based on a given prompt and optional filter.

  • Parameters
    • prompt (-) – Question to be asked.
    • filters (-) – Optional metadata filter for the documents. Defaults to None, which means there will be no filter.
    • model (-) – Optional LLM model. If None, app default will be used by the server.

pw_ai_summary(text_list, model=None)

sourceSummarize a list of texts.

  • Parameters
    • text_list (-) – List of texts to summarize.
    • model (-) – Optional LLM model. If None, app default will be used by the server.

pw_list_documents(filters=None, keys=['path'])

sourceList indexed documents from the vector store with optional filtering.

  • Parameters
    • filters (-) – Optional metadata filter for the documents.
    • keys (-) – List of metadata keys to be included in the response. Defaults to \["path"\]. Setting to None will retrieve all available metadata.

retrieve(query, k=3, metadata_filter=None, filepath_globpattern=None)

sourceRetrieve closest documents from the vector store based on a query.

  • Parameters
    • query (-) – The query string.
    • k (-) – The number of results to retrieve.
    • metadata_filter (-) – Optional metadata filter for the documents. Defaults to None, which means there will be no filter.
    • filepath_globpattern (-) – Glob pattern for file paths.

statistics()

sourceRetrieve stats from the vector store.

pw.xpacks.llm.question_answering.answer_with_geometric_rag_strategy(questions, documents, llm_chat_model, n_starting_documents, factor, max_iterations, strict_prompt=False)

sourceFunction for querying LLM chat while providing increasing number of documents until an answer is found. Documents are taken from documents argument. Initially first n_starting_documents documents are embedded in the query. If the LLM chat fails to find an answer, the number of documents is multiplied by factor and the question is asked again.

  • Parameters
    • questions (ColumnReference[str]) – Column with questions to be asked to the LLM chat.
    • documents (ColumnReference[list[str]]) – Column with documents to be provided along with a question to the LLM chat.
    • llm_chat_model (BaseChat) – Chat model which will be queried for answers
    • n_starting_documents (int) – Number of documents embedded in the first query.
    • factor (int) – Factor by which a number of documents increases in each next query, if an answer is not found.
    • max_iterations (int) – Number of times to ask a question, with the increasing number of documents.
    • strict_prompt (bool) – If LLM should be instructed strictly to return json. Increases performance in small open source models, not needed in OpenAI GPT models.
  • Returns
    A column with answers to the question. If answer is not found, then None is returned.

Example:

import pandas as pd
import pathway as pw
from pathway.xpacks.llm.llms import OpenAIChat
from pathway.xpacks.llm.question_answering import answer_with_geometric_rag_strategy
chat = OpenAIChat()
df = pd.DataFrame(
    {
        "question": ["How do you connect to Kafka from Pathway?"],
        "documents": [
            [
                "`pw.io.csv.read reads a table from one or several files with delimiter-separated values.",
                "`pw.io.kafka.read` is a seneralized method to read the data from the given topic in Kafka.",
            ]
        ],
    }
)
t = pw.debug.table_from_pandas(df)
answers = answer_with_geometric_rag_strategy(t.question, t.documents, chat, 1, 2, 2)

pw.xpacks.llm.question_answering.answer_with_geometric_rag_strategy_from_index(questions, index, documents_column, llm_chat_model, n_starting_documents, factor, max_iterations, metadata_filter=None, strict_prompt=False)

sourceFunction for querying LLM chat while providing increasing number of documents until an answer is found. Documents are taken from index. Initially first n_starting_documents documents are embedded in the query. If the LLM chat fails to find an answer, the number of documents is multiplied by factor and the question is asked again.

  • Parameters
    • questions (ColumnReference[str]) – Column with questions to be asked to the LLM chat.
    • index (DataIndex) – Index from which closest documents are obtained.
    • documents_column (str | ColumnReference) – name of the column in table passed to index, which contains documents.
    • llm_chat_model (BaseChat) – Chat model which will be queried for answers
    • n_starting_documents (int) – Number of documents embedded in the first query.
    • factor (int) – Factor by which a number of documents increases in each next query, if an answer is not found.
    • max_iterations (int) – Number of times to ask a question, with the increasing number of documents.
    • strict_prompt (bool) – If LLM should be instructed strictly to return json. Increases performance in small open source models, not needed in OpenAI GPT models.
  • Returns
    A column with answers to the question. If answer is not found, then None is returned.