pw.io.python
class ConnectorObserver
[source]An abstract class for creating custom Python writers.
At least on_change method must be implemented.
Use with write()
.
abstract on_change(key, row, time, is_addition)
sourceCalled on every change in the table. It is called on table entries in order of increasing processing time. For entries with the same processing time (the same batch) the method can be called in any order. The function must accept:
- Parameters
- key (
Pointer
) – the key of the changed row; - row (
dict
[str
,Any
]) – the changed row as a dict mapping from the field name to the value; - time (
int
) – the processing time of the modification, also can be referred as minibatch ID of the change; - is_addition (
bool
) – boolean value, equals to true if the row is inserted into the table, false otherwise. Please note that update is basically two operations: the deletion of the old value and the insertion of a new value, which happen within a single batch;
- key (
on_end()
sourceCalled when the stream of changes ends.
on_time_end(time)
sourceCalled when a processing time is closed.
- Parameters
time (int
) – The finished processing time.
class ConnectorSubject(datasource_name='python')
[source]An abstract class allowing to create custom python input connectors. Use with
read()
.
Custom python connector can be created by extending this class and implementing
run()
function responsible for filling the buffer with data.
This function will be started by pathway engine in a separate thread. When
the run()
function terminates, the connector will be considered finished
and pathway won’t wait for new messages from it.
In order to send a message next()
method can be used.
If the subject won’t delete records, set the class property deletions_enabled
to False
as it may help to improve the performance.
close()
sourceSends a sentinel message.
Should be called to indicate that no new messages will be sent.
commit()
sourceSends a commit message.
end()
sourceJoins a thread running run()
.
Should not be called directly.
next(**kwargs)
sourceSends a message to the engine.
The arguments should be compatible with the schema passed to read()
.
Values for all fields should be passed to this method unless they have a default value
specified in the schema.
Example:
import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
a: pw.DateTimeNaive
b: bytes
c: int
class InputSubject(pw.io.python.ConnectorSubject):
def run(self):
self.next(a=pd.Timestamp("2021-03-21T18:34:12"), b="abc".encode(), c=3)
self.next(a=pd.Timestamp("2022-04-01T11:12:12"), b="def".encode(), c=42)
t = pw.io.python.read(InputSubject(), schema=InputSchema)
pw.debug.compute_and_print(t, include_id=False)
next_bytes(message)
sourceSends a message.
- Parameters
message (bytes
) – a message represented as bytes.
next_json(message)
sourceSends a message.
- Parameters
message (dict
) – Dict representing json.
next_str(message)
sourceSends a message.
- Parameters
message (str
) – a message represented as a string.
on_persisted_run()
sourceThis method is called by Rust core to notify that the state will be persisted in this run.
on_stop()
sourceCalled after the end of the run()
function.
final seek(state)
sourceCalled by Rust core on start to resume reading from the last stopping point.
start()
sourceRuns a separate thread with function feeding data into buffer.
Should not be called directly.
class InteractiveCsvPlayer(csv_file='')
[source]close()
sourceSends a sentinel message.
Should be called to indicate that no new messages will be sent.
commit()
sourceSends a commit message.
end()
sourceJoins a thread running run()
.
Should not be called directly.
next(**kwargs)
sourceSends a message to the engine.
The arguments should be compatible with the schema passed to read()
.
Values for all fields should be passed to this method unless they have a default value
specified in the schema.
Example:
import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
a: pw.DateTimeNaive
b: bytes
c: int
class InputSubject(pw.io.python.ConnectorSubject):
def run(self):
self.next(a=pd.Timestamp("2021-03-21T18:34:12"), b="abc".encode(), c=3)
self.next(a=pd.Timestamp("2022-04-01T11:12:12"), b="def".encode(), c=42)
t = pw.io.python.read(InputSubject(), schema=InputSchema)
pw.debug.compute_and_print(t, include_id=False)
next_bytes(message)
sourceSends a message.
- Parameters
message (bytes
) – a message represented as bytes.
next_json(message)
sourceSends a message.
- Parameters
message (dict
) – Dict representing json.
next_str(message)
sourceSends a message.
- Parameters
message (str
) – a message represented as a string.
on_persisted_run()
sourceThis method is called by Rust core to notify that the state will be persisted in this run.
on_stop()
sourceCalled after the end of the run()
function.
seek(state)
sourceCalled by Rust core on start to resume reading from the last stopping point.
start()
sourceRuns a separate thread with function feeding data into buffer.
Should not be called directly.
read(subject, *, schema=None, format=None, autocommit_duration_ms=1500, debug_data=None, name=None, **kwargs)
sourceReads a table from a ConnectorSubject.
- Parameters
- subject (
ConnectorSubject
) – An instance of aConnectorSubject
. - schema (
type
[Schema
] |None
) – Schema of the resulting table. - format (
str
|None
) – Deprecated. Pass values of proper types tosubject
’snext
instead. Format of the data produced by a subject, “json”, “raw” or “binary”. In case of a “raw”/”binary” format, table with single “data” column will be produced. - debug_data – Static data replacing original one when debug mode is active.
- autocommit_duration_ms (
int
|None
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the connector’s progress.
- subject (
- Returns
Table – The table read.
Example:
import pathway as pw
from pathway.io.python import ConnectorSubject
class MySchema(pw.Schema):
a: int
b: str
class MySubject(ConnectorSubject):
def run(self) -> None:
for i in range(4):
self.next(a=i, b=f"x{i}")
@property
def _deletions_enabled(self) -> bool:
return False
s = MySubject()
table = pw.io.python.read(s, schema=MySchema)
pw.debug.compute_and_print(table, include_id=False)
write(table, observer, *, name=None, sort_by=None)
sourceWrites stream of changes from a table to a Python observer.
- Parameters
- table (
Table
) – The table to write. - observer (
ConnectorObserver
) – An instance of aConnectorObserver
. - name (
str
|None
) – A unique name for the writer. If provided, this name will be used in logs and monitoring dashboards. Additionally, if persistence is enabled, it will be used as the name for the snapshot that stores the writer’s progress. - sort_by (
Optional
[Iterable
[ColumnReference
]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
Example:
import pathway as pw
table = pw.debug.table_from_markdown('''
| pet | owner | age | __time__ | __diff__
1 | dog | Alice | 10 | 0 | 1
2 | cat | Alice | 8 | 2 | 1
3 | dog | Bob | 7 | 4 | 1
2 | cat | Alice | 8 | 6 | -1
''')
class Observer(pw.io.python.ConnectorObserver):
def on_change(self, key: pw.Pointer, row: dict, time: int, is_addition: bool):
print(f"{row}, {time}, {is_addition}")
def on_end(self):
print("End of stream.")
pw.io.python.write(table, Observer())
pw.run(monitoring_level=pw.MonitoringLevel.NONE)