pw.io.minio

class pw.io.minio.MinIOSettings(endpoint, bucket_name, access_key, secret_access_key, *, with_path_style=True, region=None)

[source]

Stores MinIO bucket connection settings.

  • Parameters
    • endpoint – Endpoint for the bucket.
    • bucket_name – Name of a bucket.
    • access_key – Access key for the bucket.
    • secret_access_key – Secret access key for the bucket.
    • region – Region of the bucket.
    • with_path_style – Whether to use path-style addresses for bucket access. It defaults to True as this is the most widespread way to access MinIO, but can be overridden in case of a custom configuration.

pw.io.minio.read(path, minio_settings, format, *, schema=None, mode='streaming', with_metadata=False, csv_settings=None, json_field_paths=None, downloader_threads_count=None, persistent_id=None, autocommit_duration_ms=1500, debug_data=None)

sourceReads a table from one or several objects from S3 bucket in MinIO.

In case the prefix is specified, and there are several objects lying under this prefix, their order is determined according to their modification times: the smaller the modification time is, the earlier the file will be passed to the engine.

  • Parameters
    • path (str) – Path to an object or to a folder of objects in MinIO S3 bucket.
    • minio_settings (MinIOSettings) – Connection parameters for the MinIO account and the bucket.
    • format (str) – Format of data to be read. Currently csv, json, plaintext, plaintext_by_object and binary formats are supported. The difference between plaintext and plaintext_by_object is how the input is tokenized: if the plaintext option is chosen, it’s split by the newlines. Otherwise, the files are split in full and one row will correspond to one file. In case the binary format is specified, the data is read as raw bytes without UTF-8 parsing.
    • schema (type[Schema] | None) – Schema of the resulting table. Not required for plaintext_by_object and binary formats: if they are chosen, the contents of the read objects are stored in the column data.
    • mode (str) – If set to streaming, the engine waits for the new objects under the given path prefix. Set it to static, it only considers the available data and ingest all of it. Default value is streaming.
    • with_metadata (bool) – When set to true, the connector will add an additional column named _metadata to the table. This column will be a JSON field that will contain an optional field modified_at. Additionally, the column will also have an optional field named owner containing an ID of the object owner. Finally, the column will also contain a field named path that will show the full path to the object within a bucket from where a row was filled.
    • csv_settings (CsvParserSettings | None) – Settings for the CSV parser. This parameter is used only in case the specified format is “csv”.
    • json_field_paths (dict[str, str] | None) – If the format is “json”, this field allows to map field names into path in the read json object. For the field which require such mapping, it should be given in the format <field_name>: <path to be mapped>, where the path to be mapped needs to be a JSON Pointer (RFC 6901).
    • downloader_threads_count (int | None) – The number of threads created to download the contents of the bucket under the given path. It defaults to the number of cores available on the machine. It is recommended to increase the number of threads if your bucket contains many small files.
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
    • debug_data (Any) – Static data replacing original one when debug mode is active.
  • Returns
    Table – The table read.

Example:

Consider that there is a table, which is stored in CSV format in the min.io S3 bucket. Then, you can use this method in order to connect and acquire its contents.

It may look as follows:

import os
import pathway as pw
class InputSchema(pw.Schema):
    owner: str
    pet: str
t = pw.io.minio.read(
    "animals/",
    minio_settings=pw.io.minio.MinIOSettings(
        bucket_name="datasets",
        endpoint="avv749.stackhero-network.com",
        access_key=os.environ["MINIO_S3_ACCESS_KEY"],
        secret_access_key=os.environ["MINIO_S3_SECRET_ACCESS_KEY"],
    ),
    format="csv",
    schema=InputSchema,
)

Please note that this connector is interoperable with the AWS S3 connector, therefore all examples concerning different data formats in pw.io.s3.read also work with min.io input.