pw.io.python
class pw.io.python.ConnectorSubject
[source]An abstract class allowing to create custom python connectors.
Custom python connector can be created by extending this class and implementing
run()
function responsible for filling the buffer with data.
This function will be started by pathway engine in a separate thread.
In order to send a message next()
method can be used.
If the subject won’t delete records, set the class property deletions_enabled
to False
as it may help to improve the performance.
Example:
import pathway as pw
from pathway.io.python import ConnectorSubject
class MySchema(pw.Schema):
a: int
b: str
class MySubject(ConnectorSubject):
def run(self) -> None:
for i in range(4):
self.next(a=i, b=f"x{i}")
@property
def _deletions_enabled(self) -> bool:
return False
s = MySubject()
table = pw.io.python.read(s, schema=MySchema)
pw.debug.compute_and_print(table, include_id=False)
close()
sourceSends a sentinel message.
Should be called to indicate that no new messages will be sent.
commit()
sourceSends a commit message.
end()
sourceJoins a thread running run()
.
Should not be called directly.
next(**kwargs)
sourceSends a message to the enigne.
The arguments should be compatible with the schema passed to read()
.
Values for all fields should be passed to this method unless they have a default value
specified in the schema.
Example:
import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
a: pw.DateTimeNaive
b: bytes
c: int
class InputSubject(pw.io.python.ConnectorSubject):
def run(self):
self.next(a=pd.Timestamp("2021-03-21T18:34:12"), b="abc".encode(), c=3)
self.next(a=pd.Timestamp("2022-04-01T11:12:12"), b="def".encode(), c=42)
t = pw.io.python.read(InputSubject(), schema=InputSchema)
pw.debug.compute_and_print(t, include_id=False)
next_bytes(message)
sourceSends a message.
- Parameters
message (bytes
) – a message represented as bytes.
next_json(message)
sourceSends a message.
- Parameters
message (dict
) – Dict representing json.
next_str(message)
sourceSends a message.
- Parameters
message (str
) – a message represented as a string.
on_persisted_run()
sourceThis method is called by Rust core to notify that the state will be persisted in this run.
on_stop()
sourceCalled after the end of the run()
function.
final seek(state)
sourceCalled by Rust core on start to resume reading from the last stopping point.
start()
sourceRuns a separate thread with function feeding data into buffer.
Should not be called directly.
class pw.io.python.InteractiveCsvPlayer(csv_file='')
[source]close()
sourceSends a sentinel message.
Should be called to indicate that no new messages will be sent.
commit()
sourceSends a commit message.
end()
sourceJoins a thread running run()
.
Should not be called directly.
next(**kwargs)
sourceSends a message to the enigne.
The arguments should be compatible with the schema passed to read()
.
Values for all fields should be passed to this method unless they have a default value
specified in the schema.
Example:
import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
a: pw.DateTimeNaive
b: bytes
c: int
class InputSubject(pw.io.python.ConnectorSubject):
def run(self):
self.next(a=pd.Timestamp("2021-03-21T18:34:12"), b="abc".encode(), c=3)
self.next(a=pd.Timestamp("2022-04-01T11:12:12"), b="def".encode(), c=42)
t = pw.io.python.read(InputSubject(), schema=InputSchema)
pw.debug.compute_and_print(t, include_id=False)
next_bytes(message)
sourceSends a message.
- Parameters
message (bytes
) – a message represented as bytes.
next_json(message)
sourceSends a message.
- Parameters
message (dict
) – Dict representing json.
next_str(message)
sourceSends a message.
- Parameters
message (str
) – a message represented as a string.
on_persisted_run()
sourceThis method is called by Rust core to notify that the state will be persisted in this run.
on_stop()
sourceCalled after the end of the run()
function.
seek(state)
sourceCalled by Rust core on start to resume reading from the last stopping point.
start()
sourceRuns a separate thread with function feeding data into buffer.
Should not be called directly.
pw.io.python.read(subject, *, schema=None, format=None, autocommit_duration_ms=1500, debug_data=None, value_columns=None, primary_key=None, types=None, default_values=None, persistent_id=None, name='python')
sourceReads a table from a ConnectorSubject.
- Parameters
- subject (
ConnectorSubject
) – An instance of aConnectorSubject
. - schema (
type
[Schema
] |None
) – Schema of the resulting table. - format (
str
|None
) – Deprecated. Pass values of proper types tosubject
’snext
instead. Format of the data produced by a subject, “json”, “raw” or “binary”. In case of a “raw”/”binary” format, table with single “data” column will be produced. - debug_data – Static data replacing original one when debug mode is active.
- autocommit_duration_ms (
int
|None
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph - value_columns (
list
[str
] |None
) – Columns to extract for a table. [will be deprecated soon] - primary_key (
list
[str
] |None
) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated randomly. [will be deprecated soon] - types (
dict
[str
,PathwayType
] |None
) – Dictionary containing the mapping between the columns and the data types (pw.Type
) of the values of those columns. This parameter is optional, and if not provided the default type ispw.Type.ANY
. [will be deprecated soon] - default_values (
dict
[str
,Any
] |None
) – dictionary containing default values for columns replacing blank entries. The default value of the column must be specified explicitly, otherwise there will be no default value. [will be deprecated soon] - persistent_id (
str
|None
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time.
- subject (
- Returns
Table – The table read.