pw.io.python
class pw.io.python.ConnectorSubject
[source]Custom python connector can be created by extending this class and implementing
run()
function responsible for filling the buffer with data.
This function will be started by pathway engine in a separate thread.
In order to send a message next()
method can be used.
If the subject won’t delete records, set the class property deletions_enabled
to False
as it may help to improve the performance.
Example:
import pathway as pw
from pathway.io.python import ConnectorSubject
class MySchema(pw.Schema):
a: int
b: str
class MySubject(ConnectorSubject):
def run(self) -> None:
for i in range(4):
self.next(a=i, b=f"x{i}")
@property
def _deletions_enabled(self) -> bool:
return False
s = MySubject()
table = pw.io.python.read(s, schema=MySchema)
pw.debug.compute_and_print(table, include_id=False)
close()
sourceSends a sentinel message.
Should be called to indicate that no new messages will be sent.
commit()
sourceSends a commit message.
end()
sourceJoins a thread running run()
.
Should not be called directly.
next(**kwargs)
sourceSends a message to the enigne.
The arguments should be compatible with the schema passed to read()
.
Values for all fields should be passed to this method unless they have a default value
specified in the schema.
Example:
import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
a: pw.DateTimeNaive
b: bytes
c: int
class InputSubject(pw.io.python.ConnectorSubject):
def run(self):
self.next(a=pd.Timestamp("2021-03-21T18:34:12"), b="abc".encode(), c=3)
self.next(a=pd.Timestamp("2022-04-01T11:12:12"), b="def".encode(), c=42)
t = pw.io.python.read(InputSubject(), schema=InputSchema)
pw.debug.compute_and_print(t, include_id=False)
next_bytes(message)
sourceSends a message.
- Parameters
message (bytes
) – a message represented as bytes.
next_json(message)
sourceSends a message.
- Parameters
message (dict
) – Dict representing json.
next_str(message)
sourceSends a message.
- Parameters
message (str
) – a message represented as a string.
on_stop()
sourceCalled after the end of the run()
function.
start()
sourceRuns a separate thread with function feeding data into buffer.
Should not be called directly.
class pw.io.python.InteractiveCsvPlayer(csv_file='')
[source]close()
sourceSends a sentinel message.
Should be called to indicate that no new messages will be sent.
commit()
sourceSends a commit message.
end()
sourceJoins a thread running run()
.
Should not be called directly.
next(**kwargs)
sourceSends a message to the enigne.
The arguments should be compatible with the schema passed to read()
.
Values for all fields should be passed to this method unless they have a default value
specified in the schema.
Example:
import pathway as pw
import pandas as pd
class InputSchema(pw.Schema):
a: pw.DateTimeNaive
b: bytes
c: int
class InputSubject(pw.io.python.ConnectorSubject):
def run(self):
self.next(a=pd.Timestamp("2021-03-21T18:34:12"), b="abc".encode(), c=3)
self.next(a=pd.Timestamp("2022-04-01T11:12:12"), b="def".encode(), c=42)
t = pw.io.python.read(InputSubject(), schema=InputSchema)
pw.debug.compute_and_print(t, include_id=False)
next_bytes(message)
sourceSends a message.
- Parameters
message (bytes
) – a message represented as bytes.
next_json(message)
sourceSends a message.
- Parameters
message (dict
) – Dict representing json.
next_str(message)
sourceSends a message.
- Parameters
message (str
) – a message represented as a string.
on_stop()
sourceCalled after the end of the run()
function.
start()
sourceRuns a separate thread with function feeding data into buffer.
Should not be called directly.
pw.io.python.read(subject, *, schema=None, format=None, autocommit_duration_ms=1500, debug_data=None, value_columns=None, primary_key=None, types=None, default_values=None, persistent_id=None, name='python')
sourceReads a table from a ConnectorSubject.
- Parameters
- subject (
ConnectorSubject
) – An instance of aConnectorSubject
. - schema (
type
[Schema
] |None
) – Schema of the resulting table. - format (
str
|None
) – Deprecated. Pass values of proper types tosubject
’snext
instead. Format of the data produced by a subject, “json”, “raw” or “binary”. In case of a “raw”/”binary” format, table with single “data” column will be produced. - debug_data – Static data replacing original one when debug mode is active.
- autocommit_duration_ms (
int
|None
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph - value_columns (
list
[str
] |None
) – Columns to extract for a table. [will be deprecated soon] - primary_key (
list
[str
] |None
) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated randomly. [will be deprecated soon] - types (
dict
[str
,PathwayType
] |None
) – Dictionary containing the mapping between the columns and the data types (pw.Type
) of the values of those columns. This parameter is optional, and if not provided the default type ispw.Type.ANY
. [will be deprecated soon] - default_values (
dict
[str
,Any
] |None
) – dictionary containing default values for columns replacing blank entries. The default value of the column must be specified explicitly, otherwise there will be no default value. [will be deprecated soon] - persistent_id (
str
|None
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time.
- subject (
- Returns
Table – The table read.