Iterative Computations
Pathway allows you to perform iterative computations on tables. The tables can be updated over time but the data is still split into batches properly after performing an iteration.
The iteration is performed until convergence or until the limit of steps is reached (by default there is no limit but you can specify it). The iteration is incremental - the next iteration step only gets elements that have been updated in the previous step. It allows Pathway to reduce the amount of computations performed and effectively have a different number of iteration steps for different elements.
You could replace the iteration by unrolling the computations using, e.g. a for
loop, but then you would get the same amount of steps for each element and possibly a lot more computations performed.
In the sections below you can find the example applications of iteration in Pathway with a more detailed explanation of how this mechanism works.
Collatz conjecture
There is a hypothesis in mathematics that the function presented below applied iteratively to any positive integer eventually transforms it to 1. The function is
The sequence is , for and the conjecture says that for every positive integer there exists such that . You can read more about it on Wikipedia.
You will now learn how to use Pathway to test the Collatz conjecture. Let's import Pathway first and define a function that performs a single step of the iterative computation. It can return a single table, a tuple or a dictionary of tables.
import pathway as pw
def collatz_step(t: pw.Table) -> pw.Table:
t = t.with_columns(
a=pw.if_else(
pw.this.a == 1,
1,
pw.if_else(pw.this.a % 2 == 0, pw.this.a // 2, 3 * pw.this.a + 1),
)
)
# t.debug("collatz")
return t
Let's define a table with initial numbers to be checked and apply collatz_step
iteratively on them.
table = pw.debug.table_from_markdown(
"""
n
3
13
42
"""
).with_columns(a=pw.this.n)
res = pw.iterate(collatz_step, t=table)
pw.debug.compute_and_print(res)
| n | a
^X1MXHYY... | 3 | 1
^YYY4HAB... | 13 | 1
^Z3QWT29... | 42 | 1
The main part of the computation is the pw.iterate
function. It takes a function as an argument - the collatz_step
in this case. You need to pass all tables required by collatz_step
as keyword arguments of pw.iterate
. In general, pw.iterate
can return a single table, a tuple or a named tuple of tables. It depends on what is returned from the user-defined function passed to pw.iterate
. In this case, the collatz_step
returns a single table and that's why pw.iterate
also returns a single table.
As for the mechanism, inside the iteration records of the returned table t
that have changed their values are passed to the beginning of the function and flow through the dataflow in collatz_step
again as new records of t
. The iteration_limit
argument of pw.iterate
was not set. That's why the computation continues until records stop updating. If the Collatz conjecture was not true for these numbers, the computation would not finish. Fortunately, it has finished, and this way we checked that it is true at least for these three numbers. You can check more numbers if you wish but don't expect anything fancy as all numbers up to have been checked already.
If you uncomment the t.debug("collatz")
line, you can analyze the flow of the data. You can see that the maximal number of steps was (for ) and the minimal number of steps was (for ). While counting steps, also a final step that transforms 1 to 1 was included. It is a necessary step that allows Pathway to say that nothing has changed. The pw.iterate
can only stop if there is no change in some iteration. If there was no change in one iteration, there won't be changes in any further iterations (functions passed to iterate
should be deterministic to avoid problems).
Newton's method
In this example, you are going to use a Newton method to find roots of a cubic polynomial. Let's consider polynomials of the form . Note that you can convert any third-degree polynomial for the purpose of root finding to this form by dividing by the coefficient of . The derivative of the polynomial is . Now you can write the formula for the solution update: for where is an initial guess (in the example you'll provide it by yourself). With this knowledge, let's write a code for the iteration step. In the code, a Horner's scheme is used for better speed and numerical stability.
def newton(table: pw.Table) -> pw.Table:
f = ((pw.this.x + pw.this.a) * pw.this.x + pw.this.b) * pw.this.x + pw.this.c
f_prime = (3 * pw.this.x + 2 * pw.this.a) * pw.this.x + pw.this.b
table = table.with_columns(x=pw.this.x - f / f_prime)
# table.debug("newton")
return table
Let's define a table with polynomials' coefficients and starting points. There are three different polynomials in total.
table = pw.debug.table_from_markdown(
"""
a | b | c | x0
-2 | 1 | 1 | 0
-2 | 1 | 1 | 2
-1 | -2 | 1 | 0
-1 | -2 | 1 | 1
1 | -3 | -1 | -1
1 | -3 | -1 | 0
1 | -3 | -1 | 1
"""
).with_columns(x=pw.cast(float, pw.this.x0))
The table and iteration step are defined, so you can set up a computation and print the results.
result = pw.iterate(newton, table=table)
pw.debug.compute_and_print(result)
| a | b | c | x0 | x
^X1MXHYY... | -2 | 1 | 1 | 0 | -0.4655712318767681
^YYY4HAB... | -2 | 1 | 1 | 2 | -0.4655712318767681
^Z3QWT29... | -1 | -2 | 1 | 0 | 0.4450418679126288
^3CZ78B4... | -1 | -2 | 1 | 1 | 0.4450418679126288
^3HN31E1... | 1 | -3 | -1 | -1 | -0.31110781746598193
^3S2X6B2... | 1 | -3 | -1 | 0 | -0.31110781746598193
^A984WV0... | 1 | -3 | -1 | 1 | 1.4811943040920157
By using the fact that the floating point numbers have finite precision, you don't need to specify the iteration_limit
. If you were running some computations with slower convergence, specifying it could have been useful. Note that also a Newton method may sometimes diverge and hence never finish if you choose a wrong starting point.
You can analyze the example by uncommenting the table.debug("newton")
line. You will then see that the maximal number of steps was . This number of steps was required by for . It was not a good starting point. As a result, the root estimate was changing a lot. All other computations have finished in at most steps.
Connected components in a graph
In all cases you've seen so far, the rows were independent from each other. Those cases can be solved easily without pw.iterate
- for example by writing a Python UDF that would get values from a single row and process them until convergence. For longer computations, a UDF would be slower as pw.iterate
is implemented natively in Rust. Nevertheless, it'd be possible to do it that way.
Now you'll see how to find connected components in a graph using pw.iterate
. Doing this is not possible with a Python UDF when each row represents a vertex or an edge in a graph. One way of solving the problem is to find for each vertex the lowest vertex label among vertices it has a path to. Let's call this vertex with the lowest label a representative. If the vertices' labels are different, each vertex in a single connected component has the same representative, and vertices from different connected components have different representatives. Then it is easy to check if the vertices belong to the same connected component.
Let's create an example graph - a chain of six vertices and a single vertex. A repr
column is also created. It contains a representative of a connected component, initially set to the vertex itself. The labels of vertices are different and hence can be used to create IDs of rows in a table by using with_id_from
. It is needed later to have a deterministic way of assigning IDs to records.
vertices = pw.debug.table_from_markdown(
"""
a
1
2
3
4
5
6
7
""",
id_from=["a"],
).with_columns(repr=pw.this.a)
edges = pw.debug.table_from_markdown(
"""
a | b
2 | 3
3 | 4
4 | 5
5 | 6
6 | 1
"""
)
The edges are undirected but it is easier to use them in a directed fashion. For this purpose, reversed edges are added to the graph. Also self-loops are useful.
edges = pw.Table.concat_reindex(
edges,
edges.select(a=pw.this.b, b=pw.this.a),
vertices.select(a=pw.this.a, b=pw.this.a),
)
Now you can define an iteration step. In each step for each vertex, the representatives of neighbors are checked and the lowest label among them is be chosen as a new representative. Self-loops are needed to include the current representative in the computation. It'd be possible to preserve it differently, but this way it is cleaner. The process ends when there are no new updates - which means that all neighbors have the same representative assigned.
def cc(vertices: pw.Table, edges: pw.Table) -> pw.Table:
edges_with_repr = edges.join(vertices, edges.b == vertices.a).select(
edges.a, vertices.repr
)
vertices_updated = edges_with_repr.groupby(pw.this.a).reduce(
pw.this.a, repr=pw.reducers.min(pw.this.repr)
)
# vertices_updated.debug("vertices")
vertices_updated = vertices_updated.with_universe_of(vertices)
return vertices_updated
In an iteration step, the edges
table is joined with the vertices
table to get the representatives of neighbors in the graph. Then groupby
is performed on edges_with_repr
to get a minimal representative for each vertex. A new ID is assigned based on column a
- vertex label. It is assigned in exactly the same way it is done above when creating a table. It allows you to have the same set of keys in the vertices_updated
table as in the vertices
table. However, Pathway is not that clever to deduce that the keys are exactly the same in these two tables. That's why it has to be additionally told they are the same, by using with_universe_of
.
Preserving the set of keys is important in iterate
. The iteration can stop only stop if there are no updates in any of the records. The records correspondence between iterations is determined using their IDs. If a record with one ID disappears and a record with a new ID appears, Pathway decides that something is still changing and the computation has to continue (even if the contents of the two rows are the same). It is possible to change the set of keys used in iterate
but in the end the set of keys has to stop changing anyway. You can see that in the next example on computing shortest distances in a graph.
You can now start the computations. Note that you pass two tables to pw.iterate
but only one is updated (and hence returned from pw.iterate
). The edges
table is an auxiliary table that only helps to perform the computations and pw.iterate
doesn't change the table's content. However, the edges
table can also change its content (when it is created by a streaming source). Then the updates to edges
result in updates inside iteration (you can see an example later in this tutorial).
result = pw.iterate(cc, vertices=vertices, edges=edges)
pw.debug.compute_and_print(result)
| a | repr
^YYY4HAB... | 1 | 1
^Z3QWT29... | 2 | 1
^3CZ78B4... | 3 | 1
^3HN31E1... | 4 | 1
^3S2X6B2... | 5 | 1
^A984WV0... | 6 | 1
^6A0QZMJ... | 7 | 7
Vertices from to got as their representative and hence they belong to the same connected component. Vertex has as a representative and is a one-vertex connected component.
As in the previous examples, you can uncomment the vertices_updated.debug("vertices")
line to see how the computations were performed. You can also see the updates on a diagram below. Note that not all vertices update at once and that the vertex that was not updated for a few iterations may start being updated (it applies for example to vertex ).
Now let's consider a graph that is updated over time and compute its connected components. The graph is presented below with deletions marked using red color and insertions using green color. Each event has a time associated with it. The initial vertices (at time ) are black-colored.
You can simulate a stream by adding __time__
and __diff__
columns to table_from_markdown
. The __time__
column simulates the time the record arrives to Pathway and the __diff__
determines whether the record is inserted (+1
, the default) or deleted (-1
).
vertices = pw.debug.table_from_markdown(
"""
a | __time__
1 | 2
2 | 2
3 | 2
4 | 2
5 | 2
6 | 2
7 | 2
8 | 2
9 | 6
10 | 8
""",
id_from=["a"],
).with_columns(repr=pw.this.a)
edges = pw.debug.table_from_markdown(
"""
a | b | __time__ | __diff__
1 | 2 | 2 | 1
2 | 3 | 2 | 1
2 | 8 | 2 | 1
8 | 4 | 2 | 1
8 | 5 | 2 | 1
8 | 6 | 2 | 1
4 | 7 | 2 | 1
2 | 8 | 4 | -1
9 | 10 | 8 | 1
3 | 9 | 10 | 1
2 | 3 | 12 | -1
""",
id_from=["a", "b"],
)
edges = pw.Table.concat_reindex(
edges,
edges.select(a=pw.this.b, b=pw.this.a),
vertices.select(a=pw.this.a, b=pw.this.a),
)
You can now run the computations. To see the updates over time pw.debug.compute_and_print_update_stream
was used. Apart from ordinary columns, it returns __time__
and __diff__
columns which say respectively, when the record was produced and whether it is an insertion or a deletion.
result = pw.iterate(cc, vertices=pw.iterate_universe(vertices), edges=edges)
pw.debug.compute_and_print_update_stream(result)
| a | repr | __time__ | __diff__
^YYY4HAB... | 1 | 1 | 2 | 1
^Z3QWT29... | 2 | 1 | 2 | 1
^3CZ78B4... | 3 | 1 | 2 | 1
^3HN31E1... | 4 | 1 | 2 | 1
^3S2X6B2... | 5 | 1 | 2 | 1
^A984WV0... | 6 | 1 | 2 | 1
^6A0QZMJ... | 7 | 1 | 2 | 1
^SN0FH7F... | 8 | 1 | 2 | 1
^3HN31E1... | 4 | 1 | 4 | -1
^3S2X6B2... | 5 | 1 | 4 | -1
^A984WV0... | 6 | 1 | 4 | -1
^6A0QZMJ... | 7 | 1 | 4 | -1
^SN0FH7F... | 8 | 1 | 4 | -1
^3HN31E1... | 4 | 4 | 4 | 1
^3S2X6B2... | 5 | 4 | 4 | 1
^A984WV0... | 6 | 4 | 4 | 1
^6A0QZMJ... | 7 | 4 | 4 | 1
^SN0FH7F... | 8 | 4 | 4 | 1
^9KM937R... | 9 | 9 | 6 | 1
^QECDZJF... | 10 | 9 | 8 | 1
^9KM937R... | 9 | 9 | 10 | -1
^QECDZJF... | 10 | 9 | 10 | -1
^9KM937R... | 9 | 1 | 10 | 1
^QECDZJF... | 10 | 1 | 10 | 1
^3CZ78B4... | 3 | 1 | 12 | -1
^9KM937R... | 9 | 1 | 12 | -1
^QECDZJF... | 10 | 1 | 12 | -1
^3CZ78B4... | 3 | 3 | 12 | 1
^9KM937R... | 9 | 3 | 12 | 1
^QECDZJF... | 10 | 3 | 12 | 1
Initially (at time ), all vertices have the same representative () and hence belong to a single connected component. At time the edge is deleted and vertices from to get a new representative. At time a new node is added. It is a new connected component. At time a new node is added along with an edge . As a result, gets as a representative. At time a new edge is added. In effect, is propagated as a representative of and . They are in the same connected component as . However, at time the edge is removed. The connected component represented by is split into two components - one with as a representative and one with as a representative.
Shortest paths in a graph
Another interesting use of pw.iterate
is computing shortest paths in a graph. In this example, you are going to find the shortest paths from one vertex to all accessible vertices in a graph. To demonstrate the ability of pw.iterate
to work on tables with changing sets of rows, the initial answer only contains one vertex - a starting point. It grows when paths to new vertices are found. The graph is finite so the set of vertices finally stops growing and the distances finally stop updating so the iteration can finish.
To find the shortest paths, the Bellman-Ford algorithm is used. The example graph does not contain negative cycles. If it did, the iteration would not stop. To solve the problem, you could limit the number of iterations to #vertices-1
and check if one more step does any updates to the distances. If it does, it means that there is a negative cycle in the graph.
Let's define the graph. In contrast to the connected components example, this time the edges are directed. They are directed from a
to b
.
vertices = pw.debug.table_from_markdown(
"""
a | dist
1 | 0
""",
id_from=["a"],
)
edges = pw.debug.table_from_markdown(
"""
a | b | weight
1 | 2 | 2
2 | 3 | 3
3 | 2 | 2
1 | 3 | 10
2 | 5 | 5
3 | 4 | 3
4 | 5 | -2
5 | 3 | 1
4 | 8 | 4
4 | 1 | -4
6 | 7 | 2
7 | 6 | 3
"""
)
edges = pw.Table.concat_reindex(
edges, vertices.select(a=pw.this.a, b=pw.this.a, weight=0)
)
To make the updates easier, self-loops with weight zero are added. Thanks to them, there is no need to compare distances computed in the previous and the current iteration (the only vertex that would need them is the starting vertex).
In a single iteration step, for each vertex, the length of paths via each of its neighbors is computed and the smallest one is chosen. Computing path length from the starting point via each neighbor is done using a join
and taking the minimal length is done by a groupby
, reduce
pair.
def bellman_ford(vertices: pw.Table, edges: pw.Table) -> pw.Table:
edges_with_dist = edges.join(vertices, edges.a == vertices.a).select(
a=edges.b, dist=edges.weight + vertices.dist
)
vertices_updated = edges_with_dist.groupby(pw.this.a).reduce(
pw.this.a, dist=pw.reducers.min(pw.this.dist)
)
# vertices_updated.debug("vertices")
return vertices_updated
Now you can use the bellman_ford
in iterate. Note the pw.iterate_universe
wrapper for the vertices
table. It is needed to tell pw.iterate
that the vertices table changes its set of IDs.
res = pw.iterate(bellman_ford, vertices=pw.iterate_universe(vertices), edges=edges)
pw.debug.compute_and_print(res)
| a | dist
^YYY4HAB... | 1 | 0
^Z3QWT29... | 2 | 2
^3CZ78B4... | 3 | 5
^3HN31E1... | 4 | 8
^3S2X6B2... | 5 | 6
^SN0FH7F... | 8 | 12
As in the previous examples, you can uncomment vertices_updated.debug("vertices")
to see the flow of data inside iteration. Also, note that the final result does not contain distances to vertices and . It is caused by the fact that the initial set only contains the starting vertex and only the accessible (in the same connected component) vertices are added to it. Vertices and form a separate connected component.
If you wish, you can try simulating changes in the graph over time by adding __time__
and __diff__
columns to pw.debug.table_from_markdown
. Then you can see the changes of the result over time by using pw.debug.compute_and_print_update_stream(res)
instead of pw.debug.compute_and_print(res)
. If you have problems, see how it was implemented for the connected components above.
Summary
In this tutorial, you have learned how the iteration works in Pathway and what are its applications. It can be useful in problems you encounter as well!