pw.io.airbyte

pw.io.airbyte.read(config_file_path, streams, *, execution_type='local', mode='streaming', env_vars=None, service_user_credentials_file=None, gcp_region='europe-west1', gcp_job_name=None, refresh_interval_ms=60000)

sourceReads a table with an Airbyte connector that supports the incremental mode.

  • Parameters
    • config_file_path (PathLike | str) – Path to the config file, created with airbyte-serverless tool or just via Pathway CLI (that uses airbyte-serverless under the hood). The “source” section in this file must be properly configured in advance.
    • streams (Sequence[str]) – Airbyte stream names to be read.
    • execution_type (str) – denotes how the airbyte connector is run. If "local" is specified the connector is executed via local Docker execution. If "remote" is used, the connector runs as a Google Cloud Run job.
    • mode (str) – denotes how the engine polls the new data from the source. Currently "streaming" and "static" are supported. If set to "streaming", it will check for updates every refresh_interval_ms milliseconds. "static" mode will only consider the available data and ingest all of it in one commit. The default value is "streaming".
    • env_vars (dict | None) – environment variables to be set in the Airbyte connector before its’ execution.
    • service_user_credentials_file (str | None) – Google API service user json file. You can refer the instructions provided in the developer’s user guide to obtain them. The credentials are required for the "remote" execution type.
    • gcp_region (str) – Google region for the cloud job.
    • gcp_job_name (str | None) – the name of GCP job if "remote" execution type is chosen. If unspecified, the name is autogenerated.
    • refresh_interval_ms (int) – time in milliseconds between new data queries. Applicable if mode is set to "streaming".
    • autocommit_duration_ms – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph.

Returns:

A table with a column data, containing the pw.Json containing the data read from the connector. The format of this data corresponds to the one used in the Airbyte.

Example:

The simplest way to test this connector is to use The Sample Data (Faker) data source provided by Airbyte.

To do that, you can use Pathway CLI command airbyte create-source. You can create the Faker data source as follows:

The config file is located in ./connections/simple.yaml. It contains the basic parameters of the test data source, such as random seed and the number of records to be generated. You don’t have to modify any of them to proceed with this testing.

Now, you can just run the read from this configured source. It contains three streams: Users, Products, and Purchases. Let’s use the stream Users, which leads us to the following code:

import pathway as pw
users_table = pw.io.airbyte.read(  
    "./connections/simple.yaml",
    streams=["Users"],
)

Let’s proceed to a more complex example.

Suppose that you need to read a stream of commits in a GitHub repository. To do so,you can use the Airbyte GitHub connector.

abs create github --source "airbyte/source-github"

Then, you need to edit the created config file, located at ./connections/github.yaml.

To get started in the quickest way possible, you can remove uncommented option_title, access_token, client_id and client_secret fields in the config while uncommenting the section “Another valid structure for credentials”. It will require the PAT token, which can be obtained at the Tokens page in the GitHub - please note that you need to be logged in.

Then, you also need to set up the repository name in the repositories field. For example, you can specify pathwaycom/pathway. Then you need to remove the unused optional fields, and you’re ready to go.

Now, you can simply configure the Pathway connector and run:

import pathway as pw
commits_table = pw.io.airbyte.read(  
    "./connections/github.yaml",
    streams=["commits"],
)

The result table will contain the JSON payloads with the comprehensive information about the commit times. If the mode is set to "streaming" (the default), the new commits will be appended to this table when they are made.

In some cases, it is not necessary to poll the changes because the data is given in full in the beginning and is not updated afterwards. For instance, in the first example we used with the users_table table, you could also use the static mode of the connector:

users_table = pw.io.airbyte.read(  
    "./connections/simple.yaml",
    streams=["Users"],
    mode="static",
)

In the second example, you could use this mode to load the commits data at once and then terminate the connector:

commits_table = pw.io.airbyte.read(  
    "./connections/github.yaml",
    streams=["commits"],
    mode="static",
)

Please note that deployment of the code running with the "local" execution type may be challenging because the connector uses Docker under the hood. That may lead to a situation where you use Docker to deploy the code which, in turn, uses Docker image to run Airbyte’s data extraction routines. This problem is widely known as DinD.

To avoid DinD you may use the "remote" type of execution. If chosen, it runs the Airbyte’s data extraction part on the Google Cloud, which also saves CPU and memory at your development machine or server. To enable the "remote" execution type you would need to specify the corresponding execution type and to provide a path to the service account credentials data file. Consider that the credentials are located in the file ./credentials.json. Then, running the second example with the "remote" type of execution looks as follows:

commits_table = pw.io.airbyte.read(  
    "./connections/github.yaml",
    streams=["commits"],
    mode="static",
    execution_type="remote",
    service_user_credentials_file="./credentials.json",
)

Please keep in mind that the Google Cloud Runs are billed based on the vCPU time and memory time, measured in vCPU-seconds and GiB-seconds respectively. Having that said, the usage of small values for refresh_interval_ms is not advised for the remote runs, as they may result in more runs and consequently more vCPU and memory time spent, resulting in a bigger bill.