pw.io.deltalake
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
pw.io.deltalake.read(uri, schema, *, mode='streaming', s3_connection_settings=None, autocommit_duration_ms=1500, persistent_id=None, debug_data=None)
sourceReads a table from Delta Lake. Currently, local and S3 lakes are supported. The table doesn’t have to be append only, however, the deletion vectors are not supported yet.
Note that the connector requires primary key fields to be specified in the schema.
You can specify the fields to be used in the primary key with pw.column_definition
function.
- Parameters
- uri (
str
|PathLike
) – URI of the Delta Lake source that must be read. - schema (
type
[Schema
]) – Schema of the resulting table. - mode (
str
) – Denotes how the engine polls the new data from the source. Currently"streaming"
and"static"
are supported. If set to"streaming"
the engine will wait for the updates in the specified lake. It will track new row additions and reflect these events in the state. On the other hand, the"static"
mode will only consider the available data and ingest all of it in one commit. The default value is"streaming"
. - s3_connection_settings (
AwsS3Settings
|MinIOSettings
|WasabiS3Settings
|DigitalOceanS3Settings
|None
) – Configuration for S3 credentials when using S3 storage. In addition to the access key and secret access key, you can specify a custom endpoint, which is necessary for buckets hosted outside of Amazon AWS. If the custom endpoint is left blank, the authorized user’s credentials for S3 will be used. - persistent_id (
str
|None
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time. - autocommit_duration_ms (
int
|None
) – The maximum time between two commits. Everyautocommit_duration_ms
milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - debug_data (
Any
) – Static data replacing original one when debug mode is active.
- uri (
Examples:
Consider an example with a stream of changes on a simple key-value table, streamed by
another Pathway program with pw.io.deltalake.write
method.
Let’s start writing Pathway code. First, the schema of the table needs to be created:
import pathway as pw
class KVSchema(pw.Schema):
key: str = pw.column_definition(primary_key=True)
value: str
Then, this table must be written into a Delta Lake storage. In the example, it can
be created from the static data with pw.debug.table_from_markdown
method and
saved into the locally located lake:
output_table = pw.debug.table_from_markdown("key value \n one Hello \n two World")
lake_path = "./local-lake"
pw.io.deltalake.write(output_table, lake_path)
Now the producer code can be run with with a simple pw.run
:
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
After that, you can read this table with Pathway as well. It requires the specification
of the URI and the schema that was created above. In addition, you can use the "static"
mode, so that the program finishes after the data is read:
input_table = pw.io.deltalake.read(lake_path, KVSchema, mode="static")
Please note that the table doesn’t necessary have to be created by Pathway: an append-only Delta Table created in any other way will also be processed correctly.
Finally, you can check that the resulting table contains the same set of rows by
displaying it with pw.debug.compute_and_print
:
pw.debug.compute_and_print(input_table, include_id=False)
Please note that you can use the same communication approach if S3 is used as a
data storage. To do this, specify an S3 path starting with s3://
or s3a://
, and provide the credentials object as a parameter. If no credentials
are provided but the path starts with s3://
or s3a://
, Pathway will use the
credentials of the currently authenticated user.
pw.io.deltalake.write(table, uri, *, s3_connection_settings=None, min_commit_frequency=60000)
sourceWrites the stream of changes from table
into Delta Lake https://delta.io/_ data
storage at the location specified by uri
. Supported storage types are S3 and the
local filesystem.
The storage type is determined by the URI: paths starting with s3://
or s3a://
are for S3 storage, while all other paths use the filesystem.
If the specified storage location doesn’t exist, it will be created. The schema of
the new table is inferred from the table
’s schema. The output table must include
two additional integer columns: time
, representing the computation minibatch,
and diff
, indicating the type of change (1
for row addition and -1
for row deletion).
- Parameters
- table (
Table
) – Table to be written. - uri (
str
|PathLike
) – URI of the target Delta Lake. - s3_connection_settings (
AwsS3Settings
|MinIOSettings
|WasabiS3Settings
|DigitalOceanS3Settings
|None
) – Configuration for S3 credentials when using S3 storage. In addition to the access key and secret access key, you can specify a custom endpoint, which is necessary for buckets hosted outside of Amazon AWS. If the custom endpoint is left blank, the authorized user’s credentials for S3 will be used. - min_commit_frequency (
int
|None
) – Specifies the minimum time interval between two data commits in storage, measured in milliseconds. If set to None, finalized minibatches will be committed as soon as possible. Keep in mind that each commit in Delta Lake creates a new file and writes an entry in the transaction log. Therefore, it is advisable to limit the frequency of commits to reduce the overhead of processing the resulting table. Note that to further optimize performance and reduce the number of chunks in the table, you can use vacuum or optimize operations afterwards.
- table (
- Returns
None
Example:
Consider a table access_log
that needs to be output to a Delta Lake storage
located locally at the folder ./logs/access-log
. It can be done as follows:
pw.io.deltalake.write(access_log, "./logs/access-log")
Please note that if there is no filesystem object at this path, the corresponding folder will be created. However, if you run this code twice, the new data will be appended to the storage created during the first run.
It is also possible to save the table to S3 storage. To save the table to the
access-log
path within the logs
bucket in the eu-west-3
region,
modify the code as follows:
pw.io.deltalake.write(
access_log,
"s3://logs/access-log/",
s3_connection_settings=pw.io.s3.AwsS3Settings(
bucket_name="logs",
region="eu-west-3",
access_key=os.environ["S3_ACCESS_KEY"],
secret_access_key=os.environ["S3_SECRET_ACCESS_KEY"],
)
)
Note that it is not necessary to specify the credentials explicitly if you are logged into S3. Pathway can deduce them for you. For an authorized user, the code can be simplified as follows:
pw.io.deltalake.write(access_log, "s3://logs/access-log/")