pw.io.python

class pw.io.python.ConnectorSubject

[source]

An abstract class allowing to create custom python connectors.

Custom python connector can be created by extending this class and implementing run() function responsible for filling the buffer with data. This function will be started by pathway engine in a separate thread. When the run() function terminates, the connector will be considered finished and pathway won’t wait for new messages from it.

In order to send a message next() method can be used.

If the subject won’t delete records, set the class property deletions_enabled to False as it may help to improve the performance.

Example:

import pathway as pw
from pathway.io.python import ConnectorSubject

class MySchema(pw.Schema):
    a: int
    b: str


class MySubject(ConnectorSubject):
    def run(self) -> None:
        for i in range(4):
            self.next(a=i, b=f"x{i}")
    @property
    def _deletions_enabled(self) -> bool:
        return False


s = MySubject()

table = pw.io.python.read(s, schema=MySchema)
pw.debug.compute_and_print(table, include_id=False)

close()

sourceSends a sentinel message.

Should be called to indicate that no new messages will be sent.

commit()

sourceSends a commit message.

end()

sourceJoins a thread running run().

Should not be called directly.

next(**kwargs)

sourceSends a message to the enigne.

The arguments should be compatible with the schema passed to read(). Values for all fields should be passed to this method unless they have a default value specified in the schema.

Example:

import pathway as pw
import pandas as pd

class InputSchema(pw.Schema):
    a: pw.DateTimeNaive
    b: bytes
    c: int

class InputSubject(pw.io.python.ConnectorSubject):
    def run(self):
        self.next(a=pd.Timestamp("2021-03-21T18:34:12"), b="abc".encode(), c=3)
        self.next(a=pd.Timestamp("2022-04-01T11:12:12"), b="def".encode(), c=42)

t = pw.io.python.read(InputSubject(), schema=InputSchema)
pw.debug.compute_and_print(t, include_id=False)

next_bytes(message)

sourceSends a message.

  • Parameters
    message (bytes) – a message represented as bytes.

next_json(message)

sourceSends a message.

  • Parameters
    message (dict) – Dict representing json.

next_str(message)

sourceSends a message.

  • Parameters
    message (str) – a message represented as a string.

on_persisted_run()

sourceThis method is called by Rust core to notify that the state will be persisted in this run.

on_stop()

sourceCalled after the end of the run() function.

final seek(state)

sourceCalled by Rust core on start to resume reading from the last stopping point.

start()

sourceRuns a separate thread with function feeding data into buffer.

Should not be called directly.

class pw.io.python.InteractiveCsvPlayer(csv_file='')

[source]

close()

sourceSends a sentinel message.

Should be called to indicate that no new messages will be sent.

commit()

sourceSends a commit message.

end()

sourceJoins a thread running run().

Should not be called directly.

next(**kwargs)

sourceSends a message to the enigne.

The arguments should be compatible with the schema passed to read(). Values for all fields should be passed to this method unless they have a default value specified in the schema.

Example:

import pathway as pw
import pandas as pd

class InputSchema(pw.Schema):
    a: pw.DateTimeNaive
    b: bytes
    c: int

class InputSubject(pw.io.python.ConnectorSubject):
    def run(self):
        self.next(a=pd.Timestamp("2021-03-21T18:34:12"), b="abc".encode(), c=3)
        self.next(a=pd.Timestamp("2022-04-01T11:12:12"), b="def".encode(), c=42)

t = pw.io.python.read(InputSubject(), schema=InputSchema)
pw.debug.compute_and_print(t, include_id=False)

next_bytes(message)

sourceSends a message.

  • Parameters
    message (bytes) – a message represented as bytes.

next_json(message)

sourceSends a message.

  • Parameters
    message (dict) – Dict representing json.

next_str(message)

sourceSends a message.

  • Parameters
    message (str) – a message represented as a string.

on_persisted_run()

sourceThis method is called by Rust core to notify that the state will be persisted in this run.

on_stop()

sourceCalled after the end of the run() function.

seek(state)

sourceCalled by Rust core on start to resume reading from the last stopping point.

start()

sourceRuns a separate thread with function feeding data into buffer.

Should not be called directly.

pw.io.python.read(subject, *, schema=None, format=None, autocommit_duration_ms=1500, debug_data=None, value_columns=None, primary_key=None, types=None, default_values=None, persistent_id=None, name='python')

sourceReads a table from a ConnectorSubject.

  • Parameters
    • subject (ConnectorSubject) – An instance of a ConnectorSubject.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • format (str | None) – Deprecated. Pass values of proper types to subject’s next instead. Format of the data produced by a subject, “json”, “raw” or “binary”. In case of a “raw”/”binary” format, table with single “data” column will be produced.
    • debug_data – Static data replacing original one when debug mode is active.
    • autocommit_duration_ms (int | None) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph
    • value_columns (list[str] | None) – Columns to extract for a table. [will be deprecated soon]
    • primary_key (list[str] | None) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated randomly. [will be deprecated soon]
    • types (dict[str, PathwayType] | None) – Dictionary containing the mapping between the columns and the data types (pw.Type) of the values of those columns. This parameter is optional, and if not provided the default type is pw.Type.ANY. [will be deprecated soon]
    • default_values (dict[str, Any] | None) – dictionary containing default values for columns replacing blank entries. The default value of the column must be specified explicitly, otherwise there will be no default value. [will be deprecated soon]
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
  • Returns
    Table – The table read.