Pathway in Minutes: Quick ETL Examples

In this article, you will learn how to start your journey with Pathway and build a simple ETL pipeline.

First, you need to make sure Pathway is installed. Install Pathway with a single pip command

pip install pathway

on a Python 3.10+ installation, and you are ready to roll!

A simple sum example

Let's start with a simple sum over positive values stored in CSV files, and written to a JSON Lines file:

Pathway code example.

Join and filter ETL example

This example takes two data sources:

  1. live measurements from a Kafka topic
  2. thresholds from local CSV files.

The aim is to combine the data from those two data sources and find the live measurements that are above the threshold.

Source Code

This is how you can do the whole pipeline in Pathway:

import pathway as pw

# Declare the Schema of your tables using pw.Schema.
# There are two input tables: (1) measurements which is 
# live stream and (2) threshold which is a CSV that might be modified over time.
# Both have two columns: a name (str) and a float.
class MeasurementSchema(pw.Schema):
    name: str
    value: float


class ThresholdSchema(pw.Schema):
    name: str
    threshold: float


# Define Kafka configuration to connect to your Kafka instance
rdkafka_settings = {
    "bootstrap.servers": "server-address:9092",
    "security.protocol": "sasl_ssl",
    "sasl.mechanism": "SCRAM-SHA-256",
    "group.id": "$GROUP_NAME",
    "session.timeout.ms": "6000",
    "sasl.username": "username",
    "sasl.password": "********",
}

# Accessing the measurements using the Kafka Connector
measurements_table = pw.io.kafka.read(
    rdkafka_settings,
    topic="topic",
    schema=MeasurementSchema,
    format="json",
    autocommit_duration_ms=1000
)

# Accessing the threshold data stored in CSV files
thresholds_table = pw.io.csv(
    './threshold-data/',
    schema=ThresholdSchema,
)

# Joining tables on the column name
joined_table = (
    # The left table is measurements_table (referred as pw.left)
    measurements_table
    .join(
        # The right table is thresholds_table (referred as pw.right)
        thresholds_table,
        # The join is done on the column name of each table 
        pw.left.name==pw.right.name,
    )
    # The columns of the joined table are chosen using select
    .select(
        # All the columns of measurements are kept
        *pw.left,
        # The threshold column of the threshold table is kept
        pw.right.threshold
    )
)

alerts_table = (
    joined_values
    # Filtering value strictly higher than the threshold.
    .filter(pw.this.value > pw.this.threshold)
    # Only name and value fields are kept
    .select(pw.this.name, pw.this.value)
)

# Sending the results to another Kafka topic, on the same Kafka instance
pw.io.kafka.write(
    alerts_table, rdkafka_settings, topic_name="alerts_topic", format="json"
)

# Launching Pathway computation.
pw.run()
Pathway ETL example pipeline

More details

This example connects to two data sources using input connectors. Each connector connects to a data source and the input data stream is represented in a table, measurements_table and thresholds_table in this case. You can specify the schema of the data using pw.Schema. You can use the same schema for several tables.

Now that you have data, you can process it as you want! Joins, temporal windows, filtering... You can have a glimpse of the available operations in our basic operations guide.

With pw.run(), the computation is launched. Each update in the input data streams will automatically trigger the update of the whole pipeline. When new input is received in either measurements_table or thresholds_table, the tables joined_table and alerts_table are updated and the changes are forwarded to the Kafka topic using the Kafka output connector. Pathway will poll for new updates until the process is terminated: the computation runs forever until the process gets killed. This is the normal behavior of Pathway.

If you want to test your pipeline on static and finite data, Pathway also provides a static mode and a demo module.

You can learn more about the concepts of Pathway in our dedicated article.

Understanding the output

Suppose that the following input data has been received on the Kafka topic:

{"name": "A", "value":8}
{"name": "B", "value":10}

And the threshold values are:

name, threshold
"A", 9
"B", 9

Then the output is:

{"name": "B", "value":10, "time":1, "diff":1}

The output contains two more fields: time and diff:

  • time represents the time at which the update happened. In practice, the time is a regular timestamp.
  • diff informs whether the row should be an insertion (diff = 1) or a deletion (diff = -1). An update is represented by two rows: one to remove the old value, one to add the new values. Those two rows have the same time to ensure the atomicity of the operation.

In this case, we assume the first values were computed at time 1. The value diff is equal to 1 as it is an insertion.

Suppose now that the thresholds have changed, the file got an update and now looks like that:

name, threshold
"A", 7
"B", 11

Connector will automatically detect any new files or modifications within ./threshold-data/ and update the tables accordingly.

That will trigger the reexecution of join and filter giving this output:

{"name": "B", "value":10, "time":1, "diff":1}
{"name": "B", "value":10, "time":2, "diff":-1}
{"name": "A", "value":8, "time":2, "diff":1}

There are two more lines: the old alert is removed (diff=-1) and a new one is inserted for the other event (diff=1). Old values are still kept as the output is a log of insertion and suppression allowing to have exhaustive information about what happened to our data.

Keep in mind that some output connectors to external data storage system might take these -1 and +1 rows, link them by time, and represent as update operation (like in case of SQL database).

Other examples

Looking for more? Check out our showcase catalog.