pw.io.iceberg
pw.io.iceberg.read(catalog_uri, namespace, table_name, schema, *, mode='streaming', warehouse=None, autocommit_duration_ms=1500, persistent_id=None, debug_data=None)
sourceReads a table from Apache Iceberg. If ran in a streaming mode, the connector tracks new row additions and old row deletions and reflects them in the table read.
Note that the connector requires primary key fields to be specified in the schema.
You can specify the fields to be used in the primary key with pw.column_definition
function.
- Parameters
- catalog_uri (
str
) – URI of the Iceberg REST catalog. - namespace (
list
[str
]) – The name of the namespace containing the table read. - table_name (
str
) – The name of the table to be read. - schema (
type
[Schema
]) – Schema of the resulting table. - mode (
str
) – Denotes how the engine polls the new data from the source. Currently"streaming"
and"static"
are supported. If set to"streaming"
the engine will wait for the updates in the specified lake. It will track new row additions and reflect these events in the state. On the other hand, the"static"
mode will only consider the available data and ingest all of it in one commit. The default value is"streaming"
. - warehouse (
str
|None
) – Optional, path to the Iceberg storage warehouse. - autocommit_duration_ms (
int
|None
) – The maximum time between two commits. Everyautocommit_duration_ms
milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - persistent_id (
str
|None
) – (unstable) An identifier, under which the state of the table will be persisted orNone
, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for theirpersistent_id
. This way it’s possible to configure the start of computations from the moment they were terminated last time. - debug_data (
Any
) – Static data replacing original one when debug mode is active.
- catalog_uri (
- Returns
Table – Table read from the Iceberg source.
Example:
Consider a users data table stored in the Iceberg storage. The table is located in the
app
namespace and is named users
. The catalog URI is http://localhost:8181
.
Below is an example of how to read this table into Pathway.
First, the schema of the table needs to be created. The schema doesn’t have to contain all the columns of the table, you can only specify the ones that are needed for the computation:
import pathway as pw
class InputSchema(pw.Schema):
user_id: int = pw.column_definition(primary_key=True)
name: str
Then, this table must be read from the Iceberg storage.
input_table = pw.io.iceberg.read(
catalog_uri="http://localhost:8181/",
namespace=["app"],
table_name="users",
schema=InputSchema,
mode="static",
)
Don’t forget to run your program with pw.run
once you define all necessary
computations. Note that you can also change the mode to "streaming"
if you want
the changes in the table to be reflected in your computational pipeline.
pw.io.iceberg.write(table, catalog_uri, namespace, table_name, *, warehouse=None, min_commit_frequency=60000)
sourceWrites the stream of changes from table
into Iceberg
data storage. The data storage must be defined with the REST catalog URI, the namespace,
and the table name.
If the namespace or the table doesn’t exist, they will be created by the connector.
The schema of the new table is inferred from the table
’s schema. The output table
must include two additional integer columns: time
, representing the computation
minibatch, and diff
, indicating the type of change (1
for row addition and
-1
for row deletion).
- Parameters
- table (
Table
) – Table to be written. - catalog_uri (
str
) – URI of the Iceberg REST catalog. - namespace (
list
[str
]) – The name of the namespace containing the target table. If the namespace doesn’t exist, it will be created by the connector. - table_name (
str
) – The name of the table to be written. If a table with such a name doesn’t exist, it will be created by the connector. - warehouse (
str
|None
) – Optional, path to the Iceberg storage warehouse. - min_commit_frequency (
int
|None
) – Specifies the minimum time interval between two data commits in storage, measured in milliseconds. If set toNone
, finalized minibatches will be committed as soon as possible. Keep in mind that each commit in Iceberg creates a new Parquet file and writes an entry in the transaction log. Therefore, it is advisable to limit the frequency of commits to reduce the overhead of processing the resulting table.
- table (
- Returns
None
Example:
Consider a users data table stored locally in a file called users.txt
in CSV format.
The Iceberg output connector provides the capability to place this table into
Iceberg storage, defined by the catalog with URI http://localhost:8181
. The target
table is users
, located in the app
namespace.
First, the table must be read. To do this, you need to define the schema. For simplicity, consider that it consists of two fields: the user ID and the name.
The schema definition may look as follows:
import pathway as pw
class InputSchema(pw.Schema):
user_id: int = pw.column_definition(primary_key=True)
name: str
Using this schema, you can read the table from the input file. You need to use the
pw.io.csv.read
connector. Here, you can use the static mode since the text file
with the users doesn’t change dynamically.
users = pw.io.csv.read("./users.txt", schema=InputSchema, mode="static")
Once the table is read, you can use pw.io.iceberg.write
to save this table into
Iceberg storage.
pw.io.iceberg.write(
users,
catalog_uri="http://localhost:8181/",
namespace=["app"],
table_name="users",
)
Don’t forget to run your program with pw.run
once you define all necessary
computations. After execution, you will be able to see the users’ data in the
Iceberg storage.