pw.udfs
Methods and classes for controlling the behavior of UDFs (User-Defined Functions) in Pathway.
Typical use:
import pathway as pw
import asyncio
import time
t = pw.debug.table_from_markdown(
'''
a | b
1 | 2
3 | 4
5 | 6
'''
)
@pw.udf(
executor=pw.udfs.async_executor(
capacity=2, retry_strategy=pw.udfs.ExponentialBackoffRetryStrategy()
)
)
async def long_running_async_function(a: int, b: int) -> int:
await asyncio.sleep(0.1)
return a * b
result_1 = t.select(res=long_running_async_function(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result_1, include_id=False)
@pw.udf(executor=pw.udfs.async_executor())
def long_running_function(a: int, b: int) -> int:
time.sleep(0.1)
return a * b
result_2 = t.select(res=long_running_function(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result_2, include_id=False)
class pw.udfs.AsyncRetryStrategy
[source]Class representing strategy of delays or backoffs for the retries.
class pw.udfs.CacheStrategy
[source]Base class used to represent caching strategy.
class pw.udfs.DefaultCache(name=None, size_limit=1073741824)
[source]The default caching strategy. Persistence layer will be used if enabled. Otherwise, cache will be disabled.
class pw.udfs.DiskCache(name=None, size_limit=1073741824)
[source]On disk cache.
class pw.udfs.ExponentialBackoffRetryStrategy(max_retries=3, initial_delay=1_000, backoff_factor=2, jitter_ms=300)
[source]Retry strategy with exponential backoff with jitter and maximum retries.
class pw.udfs.FixedDelayRetryStrategy(max_retries=3, delay_ms=1000)
[source]Retry strategy with fixed delay and maximum retries.
class pw.udfs.InMemoryCache(max_size=None)
[source]In-memory LRU cache. It is not persisted between runs.
class pw.udfs.NoRetryStrategy
[source]class pw.udfs.UDF(*, return_type=..., deterministic=False, propagate_none=False, executor=AutoExecutor(), cache_strategy=None)
[source]Base class for Pathway UDF (user-defined functions).
Please use the wrapper udf
to create UDFs out of Python functions.
Please subclass this class to define UDFs using Python classes.
When subclassing this class, please implement the __wrapped__
function.
Example:
import pathway as pw
table = pw.debug.table_from_markdown(
'''
a | b
1 | 2
3 | 4
5 | 6
'''
)
class VerySophisticatedUDF(pw.UDF):
exponent: float
def __init__(self, exponent: float) -> None:
super().__init__()
self.exponent = exponent
def __wrapped__(self, a: int, b: int) -> float:
intermediate = (a * b) ** self.exponent
return round(intermediate, 2)
func = VerySophisticatedUDF(1.5)
res = table.select(result=func(table.a, table.b))
pw.debug.compute_and_print(res, include_id=False)
pw.udfs.async_executor(*, capacity=None, timeout=None, retry_strategy=None)
sourceReturns the asynchronous executor for Pathway UDFs.
Can be applied to a regular or an asynchronous function. If applied to a regular
function, it is executed in asyncio
loop’s run_in_executor
.
The asynchronous UDFs are asynchronous within a single batch with batch defined as all entries with equal processing times assigned. The UDFs are started for all entries in the batch and the execution of further batches is blocked until all UDFs for a given batch have finished.
- Parameters
- capacity (
int
|None
) – Maximum number of concurrent operations allowed. Defaults to None, indicating no specific limit. - timeout (
float
|None
) – Maximum time (in seconds) to wait for the function result. When bothtimeout
andretry_strategy
are used, timeout applies to a single retry. Defaults to None, indicating no time limit. - retry_strategy (
AsyncRetryStrategy
|None
) – Strategy for handling retries in case of failures. Defaults to None, meaning no retries.
- capacity (
Example:
import pathway as pw
import asyncio
import time
t = pw.debug.table_from_markdown(
'''
a | b
1 | 2
3 | 4
5 | 6
'''
)
@pw.udf(
executor=pw.udfs.async_executor(
capacity=2, retry_strategy=pw.udfs.ExponentialBackoffRetryStrategy()
)
)
async def long_running_async_function(a: int, b: int) -> int:
await asyncio.sleep(0.1)
return a * b
result_1 = t.select(res=long_running_async_function(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result_1, include_id=False)
@pw.udf(executor=pw.udfs.async_executor())
def long_running_function(a: int, b: int) -> int:
time.sleep(0.1)
return a * b
result_2 = t.select(res=long_running_function(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result_2, include_id=False)
pw.udfs.async_options(capacity=None, timeout=None, retry_strategy=None, cache_strategy=None)
sourceDecorator applying async options to a provided function. Regular function will be wrapped to run in async executor.
- Parameters
- capacity (
int
|None
) – Maximum number of concurrent operations. Defaults to None, indicating no specific limit. - timeout (
float
|None
) – Maximum time (in seconds) to wait for the function result. When bothtimeout
andretry_strategy
are used, timeout applies to a single retry. Defaults to None, indicating no time limit. - retry_strategy (
AsyncRetryStrategy
|None
) – Strategy for handling retries in case of failures. Defaults to None, meaning no retries. - cache_strategy (
CacheStrategy
|None
) – Defines the caching mechanism. If set to None and a persistency is enabled, operations will be cached using the persistence layer. Defaults to None.
- capacity (
- Returns
Coroutine
pw.udfs.auto_executor()
sourceReturns the automatic executor of Pathway UDF. It deduces whether the execution should be synchronous or asynchronous from the function signature. If the function is a coroutine, then the execution is asynchronous. Otherwise, it is synchronous.
Example:
import pathway as pw
import asyncio
import time
t = pw.debug.table_from_markdown(
'''
a | b
1 | 2
3 | 4
5 | 6
'''
)
@pw.udf(executor=pw.udfs.auto_executor())
def mul(a: int, b: int) -> int:
return a * b
result_1 = t.select(res=mul(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result_1, include_id=False)
@pw.udf(executor=pw.udfs.auto_executor())
async def long_running_async_function(a: int, b: int) -> int:
await asyncio.sleep(0.1)
return a * b
result_2 = t.select(res=long_running_async_function(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result_2, include_id=False)
pw.udfs.coerce_async(func)
sourceWraps a regular function to be executed in async executor. It acts as a noop if the provided function is already a coroutine.
pw.udfs.sync_executor()
sourceReturns the synchronous executor for Pathway UDFs.
Example:
import pathway as pw
t = pw.debug.table_from_markdown(
'''
a | b
1 | 2
3 | 4
5 | 6
'''
)
@pw.udf(executor=pw.udfs.sync_executor())
def mul(a: int, b: int) -> int:
return a * b
result = t.select(res=mul(pw.this.a, pw.this.b))
pw.debug.compute_and_print(result, include_id=False)
pw.udfs.udf(fun, /, *, return_type=Ellipsis, deterministic=False, propagate_none=False, executor=AutoExecutor(), cache_strategy=None)
sourceCreate a Python UDF (user-defined function) out of a callable.
Output column type deduced from type-annotations of a function. Can be applied to a regular or asynchronous function.
- Parameters
- return_type (
Any
) – The return type of the function. Can be passed here or as a return type annotation. Defaults to...
, meaning that the return type will be inferred from type annotation. - deterministic (
bool
) – Whether the provided function is deterministic. In this context, it means that the function always returns the same value for the same arguments. If it is not deterministic, Pathway will memoize the results until the row deletion. If your function is deterministic, you’re strongly encouraged to set it to True as it will improve the performance. Defaults to False, meaning that the function is not deterministic and its results will be kept. - executor (
Executor
) – Defines the executor of the UDF. It determines if the execution is synchronous or asynchronous. Defaults to AutoExecutor(), meaning that the execution strategy will be inferred from the function annotation. By default, if the function is a coroutine, then it is executed asynchronously. Otherwise it is executed synchronously. - cache_strategy (
CacheStrategy
|None
) – Defines the caching mechanism. Defaults to None.
- return_type (
Example:
import pathway as pw
import asyncio
table = pw.debug.table_from_markdown(
'''
age | owner | pet
10 | Alice | dog
9 | Bob | dog
| Alice | cat
7 | Bob | dog
'''
)
@pw.udf
def concat(left: str, right: str) -> str:
return left + "-" + right
@pw.udf(propagate_none=True)
def increment(age: int) -> int:
assert age is not None
return age + 1
res1 = table.select(
owner_with_pet=concat(table.owner, table.pet), new_age=increment(table.age)
)
pw.debug.compute_and_print(res1, include_id=False)
@pw.udf
async def sleeping_concat(left: str, right: str) -> str:
await asyncio.sleep(0.1)
return left + "-" + right
res2 = table.select(col=sleeping_concat(table.owner, table.pet))
pw.debug.compute_and_print(res2, include_id=False)
pw.udfs.with_cache_strategy(func, cache_strategy)
sourceReturns a function with applied cache strategy.
- Parameters
cache_strategy (CacheStrategy
) – Defines the caching mechanism. - Returns
Callable/Coroutine
pw.udfs.with_capacity(func, capacity)
sourceLimits the number of simultaneous calls of the specified function. Regular function will be wrapped to run in async executor.
- Parameters
capacity (int
) – Maximum number of concurrent operations. - Returns
Coroutine
pw.udfs.with_retry_strategy(func, retry_strategy)
sourceReturns an asynchronous function with applied retry strategy. Regular function will be wrapped to run in async executor.
- Parameters
retry_strategy (AsyncRetryStrategy
) – Defines how failures will be handled. - Returns
Coroutine
pw.udfs.with_timeout(func, timeout)
sourceLimits the time spent waiting on the result of the function. If the time limit is exceeded, the task is canceled and an Error is raised. Regular function will be wrapped to run in async executor.
- Parameters
timeout (float
) – Maximum time (in seconds) to wait for the function result. Defaults to None, indicating no time limit. - Returns
Coroutine