pw.io.s3

class pw.io.s3.AwsS3Settings(*, bucket_name=None, access_key=None, secret_access_key=None, with_path_style=False, region=None, endpoint=None, session_token=None)

[source]

Stores Amazon S3 connection settings. You may also use this class to store configuration settings for any custom S3 installation, however you will need to specify the region and the endpoint.

  • Parameters
    • bucket_name – Name of S3 bucket.
    • access_key – Access key for the bucket.
    • secret_access_key – Secret access key for the bucket.
    • with_path_style – Whether to use path-style requests.
    • region – Region of the bucket.
    • endpoint – Custom endpoint in case of self-hosted storage.
    • session_token – Session token, an alternative way to authenticate to S3.

classmethod new_from_path(s3_path)

sourceConstructs settings from S3 path. The engine will look for the credentials in environment variables and in local AWS profiles. It will also automatically detect the region of the bucket.

This method may fail if there are no credentials or they are incorrect. It may also fail if the bucket does not exist.

  • Parameters
    s3_path (str) – full path to the object in the form s3://<bucket_name>/<path>.
  • Returns
    Configuration object.

class pw.io.s3.DigitalOceanS3Settings(bucket_name, *, access_key=None, secret_access_key=None, region=None)

[source]

Stores Digital Ocean S3 connection settings.

  • Parameters
    • bucket_name – Name of Digital Ocean S3 bucket.
    • access_key – Access key for the bucket.
    • secret_access_key – Secret access key for the bucket.
    • region – Region of the bucket.

class pw.io.s3.WasabiS3Settings(bucket_name, *, access_key=None, secret_access_key=None, region=None)

[source]

Stores Wasabi S3 connection settings.

  • Parameters
    • bucket_name – Name of Wasabi S3 bucket.
    • access_key – Access key for the bucket.
    • secret_access_key – Secret access key for the bucket.
    • region – Region of the bucket.

pw.io.s3.read(path, format, *, aws_s3_settings=None, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, downloader_threads_count=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

sourceReads a table from one or several objects in Amazon S3 bucket in the given format.

In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in Amazon S3 bucket.
    • aws_s3_settings (AwsS3Settings | None) – Connection parameters for the S3 account and the bucket.
    • format (str) – Format of data to be read. Currently csv, json, plaintext, plaintext_by_object and binary formats are supported. The difference between plaintext and plaintext_by_object is how the input is tokenized: if the plaintext option is chosen, it’s split by the newlines. Otherwise, the files are split in full and one row will correspond to one file. In case the binary format is specified, the data is read as raw bytes without UTF-8 parsing.
    • schema (type[Schema] | None) – Schema of the resulting table. Not required for plaintext_by_object and binary formats: if they are chosen, the contents of the read objects are stored in the column data.
    • mode (str) – If set to streaming, the engine waits for the new objects under the given path prefix. Set it to static, it only considers the available data and ingest all of it. Default value is streaming.
    • csv_settings (CsvParserSettings | None) – Settings for the CSV parser. This parameter is used only in case the specified format is csv.
    • json_field_paths (dict[str, str] | None) – If the format is json, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • downloader_threads_count (int | None) – The number of threads created to download the contents of the bucket under the given path. It defaults to the number of cores available on the machine. It is recommended to increase the number of threads if your bucket contains many small files.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Let’s consider an object store, which is hosted in Amazon S3. The store contains datasets in the respective bucket and is located in the region eu-west-3. The goal is to read the dataset, located under the path animals/ in this bucket.

Let’s suppose that the format of the dataset rows is jsonlines.

Then, the code may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.s3.read(
    "animals/",
    aws_s3_settings=pw.io.s3.AwsS3Settings(
        bucket_name="datasets",
        region="eu-west-3",
        access_key=os.environ["S3_ACCESS_KEY"],
        secret_access_key=os.environ["S3_SECRET_ACCESS_KEY"],
    ),
    format="json",
    schema=InputSchema,
)

In case you are dealing with a public bucket, the parameters access_key and secret_access_key can be omitted. In this case, the read part will look as follows:

t = pw.io.s3.read(
    "animals/",
    aws_s3_settings=pw.io.s3.AwsS3Settings(
        bucket_name="datasets",
        region="eu-west-3",
    ),
    format="json",
    schema=InputSchema,
)

It’s not obligatory to choose one of the available input tokenizations. You can also read the objects in full, thus creating a pw.Table where each row corresponds to a single object read in full. To do that, you need to specify binary as a format:

t = pw.io.s3.read(
    "animals/",
    aws_s3_settings=pw.io.s3.AwsS3Settings(
        bucket_name="datasets",
        region="eu-west-3",
    ),
    format="binary",
)

Similarly you can also enable the UTF-8 parsing of the objects read, resulting in having a table of plaintext files:

t = pw.io.s3.read(
    "animals/",
    aws_s3_settings=pw.io.s3.AwsS3Settings(
        bucket_name="datasets",
        region="eu-west-3",
    ),
    format="plaintext_by_object",
)

Note that it’s also possible to infer the bucket name and credentials from the path, if it’s given in a full form with s3:// prefix. For instance, in the example above you can also connect as follows:

t = pw.io.s3.read("s3://datasets/animals", format="binary")

Note that you need to be logged in S3 for the credentials auto-detection to work.

Finally, you can also read the data from self-hosted S3 buckets, or generally those where the endpoint path differs from the standard AWS paths. To do that, you can make use of the endpoint field of pw.io.s3.AwsS3Settings class. One of the natural examples for that may be the min.io S3 buckets. That is, if you have a min.io S3 bucket instance, one of the ways to connect to it via this connector would be the first to create the settings object with the custom endpoint and path style:

custom_settings = pw.io.s3.AwsS3Settings(
    endpoint="avv749.stackhero-network.com",
    bucket_name="datasets",
    access_key=os.environ["MINIO_S3_ACCESS_KEY"],
    secret_access_key=os.environ["MINIO_S3_SECRET_ACCESS_KEY"],
    with_path_style=True,
    region="eu-west-3",
)

And you can connect with the usage of this created custom settings format:

t = pw.io.s3.read(
    "animals/",
    aws_s3_settings=custom_settings,
    format="binary",
)

Please note that the min.io connection via generic S3 connector is given only as an example: you may use pw.io.minio.read connector which wouldn’t require any custom settings object creation from you.

pw.io.s3.read_from_digital_ocean(path, do_s3_settings, format, *, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, downloader_threads_count=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

sourceReads a table from one or several objects in Digital Ocean S3 bucket.

In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in S3 bucket.
    • do_s3_settings (DigitalOceanS3Settings) – Connection parameters for the account and the bucket.
    • format (str) – Format of data to be read. Currently csv, json, plaintext, plaintext_by_object and binary formats are supported. The difference between plaintext and plaintext_by_object is how the input is tokenized: if the plaintext option is chosen, it’s split by the newlines. Otherwise, the files are split in full and one row will correspond to one file. In case the binary format is specified, the data is read as raw bytes without UTF-8 parsing.
    • schema (type[Schema] | None) – Schema of the resulting table. Not required for plaintext_by_object and binary formats: if they are chosen, the contents of the read objects are stored in the column data.
    • mode (str) – If set to streaming, the engine waits for the new objects under the given path prefix. Set it to static, it only considers the available data and ingest all of it. Default value is streaming.
    • csv_settings (CsvParserSettings | None) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (dict[str, str] | None) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • downloader_threads_count (int | None) – The number of threads created to download the contents of the bucket under the given path. It defaults to the number of cores available on the machine. It is recommended to increase the number of threads if your bucket contains many small files.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Let’s consider an object store, which is hosted in Digital Ocean S3. The store contains CSV datasets in the respective bucket and is located in the region ams3. The goal is to read the dataset, located under the path animals/ in this bucket.

Then, the code may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.s3.read_from_digital_ocean(
    "animals/",
    do_s3_settings=pw.io.s3.DigitalOceanS3Settings(
        bucket_name="datasets",
        region="ams3",
        access_key=os.environ["DO_S3_ACCESS_KEY"],
        secret_access_key=os.environ["DO_S3_SECRET_ACCESS_KEY"],
    ),
    format="csv",
    schema=InputSchema,
)

Please note that this connector is interoperable with the AWS S3 connector, therefore all examples concerning different data formats in pw.io.s3.read also work with Digital Ocean version.

pw.io.s3.read_from_wasabi(path, wasabi_s3_settings, format, *, schema=None, mode='streaming', csv_settings=None, json_field_paths=None, downloader_threads_count=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

sourceReads a table from one or several objects in Wasabi S3 bucket.

In case the prefix of S3 path is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in S3 bucket.
    • wasabi_s3_settings (WasabiS3Settings) – Connection parameters for the account and the bucket.
    • format (str) – Format of data to be read. Currently csv, json, plaintext, plaintext_by_object and binary formats are supported. The difference between plaintext and plaintext_by_object is how the input is tokenized: if the plaintext option is chosen, it’s split by the newlines. Otherwise, the files are split in full and one row will correspond to one file. In case the binary format is specified, the data is read as raw bytes without UTF-8 parsing.
    • schema (type[Schema] | None) – Schema of the resulting table. Not required for plaintext_by_object and binary formats: if they are chosen, the contents of the read objects are stored in the column data.
    • mode (str) – If set to streaming, the engine waits for the new objects under the given path prefix. Set it to static, it only considers the available data and ingest all of it. Default value is streaming.
    • csv_settings (CsvParserSettings | None) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (dict[str, str] | None) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • downloader_threads_count (int | None) – The number of threads created to download the contents of the bucket under the given path. It defaults to the number of cores available on the machine. It is recommended to increase the number of threads if your bucket contains many small files.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Let’s consider an object store, which is hosted in Wasabi S3. The store contains CSV datasets in the respective bucket and is located in the region us-west-1. The goal is to read the dataset, located under the path animals/ in this bucket.

Then, the code may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
  owner: str
  pet: str
t = pw.io.s3.read_from_wasabi(
    "animals/",
    wasabi_s3_settings=pw.io.s3.WasabiS3Settings(
        bucket_name="datasets",
        region="us-west-1",
        access_key=os.environ["WASABI_S3_ACCESS_KEY"],
        secret_access_key=os.environ["WASABI_S3_SECRET_ACCESS_KEY"],
    ),
    format="csv",
    schema=InputSchema,
)

Please note that this connector is interoperable with the AWS S3 connector, therefore all examples concerning different data formats in pw.io.s3.read also work with Wasabi version.