Switching from Kafka to Redpanda
Not a fan of the JVM and ZooKeeper? In this article, you will learn how to switch from Kafka to Redpanda and how to adapt your Pathway project to Redpanda. The change is easier than you might think: your Pathway code remains the same!
When Kafka was first released in 2011, it was a game-changer for realtime data processing. Its innovative distributed streaming platform brought powerful features that allowed organizations to build scalable and fault-tolerant data pipelines, process streams of data in real time, and integrate with various data sources and sinks.
However, times have changed, and the world of data processing and streaming has evolved. Some of Kafka's once cutting-edge features are now seen as hindrances by some developers. For instance, Kafka can be complex to set up and manage, relies on ZooKeeper, and uses a JVM.
This is where Redpanda comes in. Redpanda is "a kafka" rebuilt in C++ as a, high-performance, Kafka-compatible streaming platform designed for modern applications. It offers improved performance, lower latency, and is much more efficient with resource utilization than Kafka. One of the significant benefits of Redpanda is its simplicity, making it easier to deploy and manage, saving time and resources.
Redpanda claims to be fully Kafka compatible but how easy is it really to switch from Kafka to Redpanda? Specifically, we want to know, how much of a lift is it to modify our Pathway Kafka integration?
If you already have your Pathway project deployed and want to switch to an existing Redpanda instance, you can jump directly to the Redpanda section. For both Kafka and Redpanda, we provide the sources of the respective projects.
You can also create a new project from our project template. The creator will ask you to choose between Kafka and Redpanda and will set everything up for you.
Best-rated movies problem
You just have been hired by a trendy VOD online platform. As a new team member, your first task is to identify the most popular movies in the catalog; Specifically, you want to find the K movies with the highest scores and how many ratings those movies have received.
For example, this is what the expected table for K=3 could be:
MovieID | Average | RatingNumber | |
---|---|---|---|
218 | 4.9 | 7510 | |
45 | 4.8 | 9123 | |
7456 | 4.8 | 1240 |
The ratings are received as a data stream through a Kafka instance, and you output the table to a CSV file.
Solution with Kafka
With Kafka, we need the following four components:
- ZooKeeper
- Kafka
- Pathway
- a stream producer
Each component will be hosted in a different docker container.
Ratings will be sent by the stream producer to Kafka on the topic ratings
.
Pathway listens to the topic, processes the stream, and outputs the ranking in a CSV file best_rating.csv
.
Pathway and the stream generator will have their own Dockerfile to install all the required dependencies.
The stream will be created by streaming the lines of a static data dataset.csv
.
Our project will have the following structure:
.
├── pathway-src/
│ ├── Dockerfile
│ └── process-stream.py
├── producer-src/
│ ├── create-stream.py
│ ├── dataset.csv
│ └── Dockerfile
├── docker-compose.yml
└── Makefile
Kafka and ZooKeeper
Kafka and ZooKeeper are configurable in the docker-compose.yml
file.
To keep things simple, no security mechanisms are used.
version: "3.7"
name: tuto-switch-to-redpanda
networks:
tutorial_network:
driver: bridge
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- tutorial_network
kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on: [zookeeper]
environment:
KAFKA_AUTO_CREATE_TOPICS: true
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CONFLUENT_SUPPORT_METRICS_ENABLE: false
ports:
- 9092:9092
command: sh -c "((sleep 15 && kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic ratings)&) && /etc/confluent/docker/run "
networks:
- tutorial_network
Here we are sending the messages to a topic called ratings
, created in the command
setting.
Generating the stream
To generate the stream, we start with a CSV dataset with the following columns: userId
(int), movieId
(int), rating
(float), and timestamp
(int).
This is the schema chosen by GroupLens for their MovieLens25M dataset:
we provide a toy dataset as an example, but the project will work with the whole MovieLens25M dataset.
To generate the stream, you can use a simple Python script to read the CSV file line by line, and each rating will be sent to Kafka using the kafka-python
package.
import csv
import json
import time
from kafka import KafkaProducer
topic = "ratings"
#We wait for Kafka and ZooKeeper to be ready
time.sleep(30)
producer = KafkaProducer(
bootstrap_servers=["kafka:9092"],
security_protocol="PLAINTEXT",
api_version=(0, 10, 2),
)
with open("./dataset.csv", newline="") as csvfile:
dataset_reader = csv.reader(csvfile, delimiter=",")
first_line = True
for row in dataset_reader:
# We skip the header
if first_line:
first_line = False
continue
message_json = {
"userId": int(row[0]),
"movieId": int(row[1]),
"rating": float(row[2]),
"timestamp": int(row[3]),
}
producer.send(topic, (json.dumps(message_json)).encode("utf-8"))
time.sleep(0.1)
producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)
producer.close()
Note that we connect to kafka:9092
and not localhost.
This script will have its own container:
stream-producer:
build:
context: .
dockerfile: ./producer-src/Dockerfile
depends_on: [kafka]
networks:
- tutorial_network
You only need to use a Python image and install the associated package:
FROM python:3.10
RUN pip install kafka-python
COPY ./producer-src/create-stream.py create-stream.py
COPY ./producer-src/dataset.csv dataset.csv
CMD ["python", "-u", "create-stream.py"]
Pathway
Now you have a stream generated from the dataset and sent to Kafka, so at this point you simply need to connect Pathway to Kafka and process the data. To connect to Kafka, configure the connection:
rdkafka_settings = {
"bootstrap.servers": "kafka:9092",
"security.protocol": "plaintext",
"group.id": "0",
"session.timeout.ms": "6000",
}
If you want to establish a more secure connection using a SASL-SSL authentication over a SCRAM-SHA-256 mechanism, you can do it as follows:
rdkafka_settings = {
"bootstrap.servers": "server:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********"
}
Let's connect to the ratings
topic using the pw.io.kafka.read
connector:
class InputSchema(pw.Schema):
movieId: int
rating: float
t_ratings = pw.io.kafka.read(
rdkafka_settings,
topic="ratings",
format="json",
schema=InputSchema,
autocommit_duration_ms=100,
)
You are only interested in the movieId
and rating
columns, so there is no need to include the others.
You can now define a function to find the best-rated movies:
def compute_best(t_ratings, K):
t_best_ratings = t_ratings.groupby(pw.this.movieId).reduce(
pw.this.movieId,
sum_ratings=pw.reducers.sum(pw.this.rating),
number_ratings=pw.reducers.count(pw.this.rating),
)
t_best_ratings = t_best_ratings.select(
pw.this.movieId,
pw.this.number_ratings,
average_rating=pw.apply(
lambda x, y: (x / y) if y != 0 else 0,
pw.this.sum_ratings,
pw.this.number_ratings,
),
)
t_best_ratings = t_best_ratings.select(
movie_tuple=pw.apply(
lambda x, y, z: (x, y, z),
pw.this.average_rating,
pw.this.number_ratings,
pw.this.movieId,
)
)
t_best_ratings = t_best_ratings.reduce(
total_tuple=pw.reducers.sorted_tuple(pw.this.movie_tuple)
)
t_best_ratings = t_best_ratings.select(
K_best=pw.apply(lambda my_tuple: (list(my_tuple))[-K:], pw.this.total_tuple)
)
t_best_ratings = t_best_ratings.flatten(pw.this.K_best).select(
pw.this.K_best
)
t_best_ratings = t_best_ratings.select(
movieId=pw.apply(lambda rating_tuple: rating_tuple[2], pw.this.K_best),
average_rating=pw.apply(lambda rating_tuple: rating_tuple[0], pw.this.K_best),
views=pw.apply(lambda rating_tuple: rating_tuple[1], pw.this.K_best),
)
return t_best_ratings
Using the function, your final file will look like this:
import pathway as pw
import time
rdkafka_settings = {
"bootstrap.servers": "kafka:9092",
"security.protocol": "plaintext",
"group.id": "0",
"session.timeout.ms": "6000",
}
class InputSchema(pw.Schema):
movieId: int
rating: float
t_ratings = pw.io.kafka.read(
rdkafka_settings,
topic="ratings",
format="json",
schema=InputSchema,
autocommit_duration_ms=100,
)
t_best_ratings = compute_best(t_ratings, 3)
# We output the results in a dedicated CSV file
pw.io.csv.write(t_best_ratings, "./best_ratings.csv")
# We wait for Kafka and ZooKeeper to be ready
time.sleep(20)
# We launch the computation
pw.run()
You can set up a dedicated container:
pathway:
build:
context: .
dockerfile: ./pathway-src/Dockerfile
depends_on: [kafka]
networks:
- tutorial_network
FROM python:3.10
RUN pip install -U pathway
COPY ./pathway-src/process-stream.py process-stream.py
CMD ["python", "-u", "process-stream.py"]
Results
We provide the sources of the Kafka project.
Let's use the following toy dataset:
userId,movieId,rating,timestamp
1,296,5.0,1147880044
1,306,3.5,1147868817
1,307,5.0,1147868828
1,665,5.0,1147878820
1,899,3.5,1147868510
1,1088,4.0,1147868495
2,296,4.0,1147880044
2,306,2.5,1147868817
2,307,3.0,1147868828
2,665,2.0,1147878820
2,899,4.5,1147868510
2,1088,2.0,1147868495
3,296,1.0,1147880044
3,306,2.5,1147868817
3,307,4.0,1147868828
3,665,2.0,1147878820
3,899,1.5,1147868510
3,1088,5.0,1147868495
You obtain the following results:
movieId,average_rating,views,time,diff
296,5,1,1680008702067,1
306,3.5,1,1680008702167,1
[...]
296,3.3333333333333335,3,1680008703767,-1
1088,3.6666666666666665,3,1680008703767,1
296,3.3333333333333335,3,1680008703767,1
899,3.1666666666666665,3,1680008703767,-1
As expected, the top 3 get updated whenever the ranking changes due to a new rating.
Switching to Redpanda
Congratulations! You can now find the K best-rated movies on your VOD platform; However, your team has discovered a new alternative to Kafka: Redpanda. It is totally Kafka-compatible, does not rely on ZooKeeper, and much easier to manage, is more durable (meaning no data loss, as there is no page cache) and according to their published benchmarks is faster than Kafka! Suppose your team is excited about this and proposes your next task: switch from Kafka to Redpanda.
With Redpanda, the project is simpler, all you have is:
- Redpanda
- Pathway
- the stream producer
Let's see how to deploy Redpanda in docker and how it impacts your project.
Docker
(You can skip this section if you already have an existing Redpanda instance)
First, remove the two services kafka
and zookeeper
, and replace them with a redpanda
service:
services:
redpanda:
command:
- redpanda
- start
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
- --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
- --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
- --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082
- --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
- --rpc-addr redpanda:33145
- --advertise-rpc-addr redpanda:33145
- --smp 1
- --memory 1G
- --mode dev-container
- --default-log-level=debug
- --set redpanda.enable_transactions=true
- --set redpanda.enable_idempotence=true
- --set redpanda.auto_create_topics_enabled=true
image: docker.redpanda.com/redpandadata/redpanda:v23.1.2
container_name: redpanda
volumes:
- redpanda:/var/lib/redpanda/data
networks:
- tutorial_network
You must now connect Pathway and the stream producer to redpanda
instead of kafka
.
This could have been avoided by naming the Kafka container differently or by naming the Redpanda container kafka
.
Pathway
As previously mentioned, we need to update the server's address in the settings:
producer = KafkaProducer(
bootstrap_servers=["redpanda:9092"],
security_protocol="PLAINTEXT",
api_version=(0, 10, 2),
)
This is it! The setting will work exactly the same as with Kafka; However, for consistency, there are also dedicated Redpanda connectors. There is no difference between Kafka and Redpanda connectors, as the same connector is used under the hood.
With the Redpanda connector pw.io.redpanda.read
, here is what your ./pathway-src/process-stream.py
file looks like:
import pathway as pw
import time
rdkafka_settings = {
"bootstrap.servers": "redpanda:9092",
"security.protocol": "plaintext",
"group.id": "0",
"session.timeout.ms": "6000",
}
class InputSchema(pw.Schema):
movieId: int
rating: float
t_ratings = pw.io.redpanda.read(
rdkafka_settings,
topic="ratings",
format="json",
schema=InputSchema,
autocommit_duration_ms=100,
)
t_best_ratings = compute_best(t_ratings, 3)
# We output the results in a dedicated CSV file
pw.io.csv.write(t_best_ratings, "./best_ratings.csv")
# We wait for Kafka and ZooKeeper to be ready
time.sleep(20)
# We launch the computation
pw.run()
If you don't care about the names of the connector and server, you don't have to change the file at all.
The stream generator
As with Pathway, you need to update the server name (if required) for the stream generator:
producer = KafkaProducer(
bootstrap_servers=["redpanda:9092"],
security_protocol="PLAINTEXT",
api_version=(0, 10, 2),
)
Small problem, if you look at the results now, we have an empty best_ratings.csv
.
This comes from the creation of the ratings
topic.
While the topic was already ready with Kafka, it was created at the reception of the first message with Redpanda.
Creating a topic makes Redpanda discard the messages until it is ready.
Sending a message at the beginning of the computation should solve this:
producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)
Note that this difference comes from Redpanda and not Pathway. Pathway connects to Redpanda and Kafka totally transparently. Pathway will receive and process the data the same way whether Kafka or Redpanda is used. Kafka and Redpanda are responsible for handling messages: Redpanda discards the incoming messages while the topic is created.
Despite the fix, the final file is very similar to the one for the Kafka version:
from kafka import KafkaProducer
import csv
import time
import json
topic = "ratings"
#We wait for Kafka and ZooKeeper to be ready
time.sleep(30)
producer = KafkaProducer(
bootstrap_servers=["kafka:9092"],
security_protocol="PLAINTEXT",
api_version=(0, 10, 2),
)
producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)
with open("./dataset.csv", newline="") as csvfile:
dataset_reader = csv.reader(csvfile, delimiter=",")
first_line = True
for row in dataset_reader:
# We skip the header
if first_line:
first_line = False
continue
message_json = {
"userId": int(row[0]),
"movieId": int(row[1]),
"rating": float(row[2]),
"timestamp": int(row[3]),
}
producer.send(topic, (json.dumps(message_json)).encode("utf-8"))
time.sleep(0.1)
producer.send(topic, "*COMMIT*".encode("utf-8"))
time.sleep(2)
producer.close()
You can also take a look at the sources of the Redpanda project.
With this, the results are the same as with Kafka:
movieId,average_rating,views,time,diff
296,5,1,1680008702067,1
306,3.5,1,1680008702167,1
[...]
296,3.3333333333333335,3,1680008703767,-1
1088,3.6666666666666665,3,1680008703767,1
296,3.3333333333333335,3,1680008703767,1
899,3.1666666666666665,3,1680008703767,-1
Bonus: sending your results to Redpanda
You've successfully computed the K best-rated movies using Redpanda, and your ranking is automatically updated thanks to Pathway; However, your team still isn't satisfied with the outcome. After taking a closer look, you realize the issue: you've been sending your results in a CSV file, which isn't the most suitable for handling a data stream. Not only that, but the file stays on your local computer, preventing others in the organization from accessing the data in real time.
Your team suggests sending the results back to Redpanda into a best_ratings
topic instead.
Redpanda is optimized for handling data streams, making it a more efficient and effective solution than sending data in a CSV file.
By doing this, you can ensure the data is accessible to everyone who needs it in real time.
Connecting to Redpanda with Pathway is as easy as connecting to Kafka.
You need to use the Redpanda connector pw.io.redpanda.write
, which is exactly the same as the Kafka connector:
rdkafka_settings = {
"bootstrap.servers": "redpanda:9092",
"security.protocol": "plaintext",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
}
pw.io.redpanda.write(
t_best_ratings,
rdkafka_settings,
topic_name="best_ratings",
format="json"
)
As previously mentioned, you can also establish a more secure connection using a SASL-SSL authentication over a SCRAM-SHA-256 mechanism as (e.g. connecting to Redpanda Cloud) follows:
rdkafka_settings = {
"bootstrap.servers": "redpanda:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********",
}
Conclusions
Congratulations! 🎉 You have built a K-best-rated application in Pathway and made it work with Redpanda. Being fully Kafka API-compatible, Redpanda just works: if the server name remains the same, you have nothing to do with your Pathway code! For consistency, you can use Pathway's Redpanda connectors which work exactly the same as Kafka connectors.