Linear regression on a Kafka stream
Start doing ML on a streaming data source with Pathway.
In this article, we are going to see how to do a simple linear regression on streaming data from Kafka. This article can be seen as an extension of our realtime sum using CSV connectors.
We have a data stream of data points , and we want to compute a simple linear regression on those points: we want to compute the two parameters so that, for each point , can be approximated by .
We are not going to explain the mathematical details here, but you can find all the details in the Wikipedia article.
Connectors
First, we need a connector to connect to our input data streams and receive values on which the regression will be computed. In this article, we will set up a Kafka connector.
To be able to reproduce this example, you may want to use upstash which provides a free Kafka instance.
To use a Kafka connector, we need to set all the Kafka parameters by using a dictionary, following the format of librdkafka.
We also need to define a Kafka topic on which to connect onto: we will go with "linear-regression"
.
Here is an example of settings to connect to Kafka using SASL-SSL authentication over SCRAM-SHA-256 mechanism:
rdkafka_settings = {
"bootstrap.servers": "server-address:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********",
}
You need, of course, to replace the server address and the associated credentials.
With this, setting the connector is straightforward, you just need to specify the topic and the schema for your table:
class InputSchema(pw.Schema):
x: float
y: float
t = pw.io.kafka.read(
rdkafka_settings,
topic="linear-regression",
schema=InputSchema,
format="csv",
autocommit_duration_ms=1000
)
We used the csv
format, but there are two other ways to read from Kafka: raw
which reads a table with only one column data
in which the whole message is dumped and json
which reads JSON messages. You can see more about this connector in its dedicated tutorial. In our case we expect CSV messages.
💡 If you only want to test the linear regression, without generating a data stream through Kafka, you can directly use our stream generator:
t = pw.demo.noisy_linear_stream()
For the output, we use a CSV connector, which is set up as follows:
pw.io.csv.write(t, "regression_output_stream.csv")
For more details on how this connector works, checkout our example or the tutorial about it.
Doing a linear regression
To do the regression, we need to compute the sum of the , of the , of the and of the and the total number of data points received until then. This is done as follows:
t = t.select(
*pw.this,
x_square=t.x * t.x,
x_y=t.x * t.y
)
statistics_table = t.reduce(
count=pw.reducers.count(),
sum_x=pw.reducers.sum(t.x),
sum_y=pw.reducers.sum(t.y),
sum_x_y=pw.reducers.sum(t.x_y),
sum_x_square=pw.reducers.sum(t.x_square),
)
Then we can compute the estimation of and :
def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count):
d = count * sum_x_square - sum_x * sum_x
if d == 0:
return 0
else:
return (sum_y * sum_x_square - sum_x * sum_x_y) / d
def compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count):
d = count * sum_x_square - sum_x * sum_x
if d == 0:
return 0
else:
return (count * sum_x_y - sum_x * sum_y) / d
results_table = statistics_table.select(
a=pw.apply(compute_a, **statistics_table),
b=pw.apply(compute_b, **statistics_table),
)
Creating the input stream
You can skip this section if you use our stream generator
pw.demo.noisy_linear_stream()
To use the Kafka connector, we have to follow a few rules. First, the Kafka connector expects the first message to contain the names of the columns. The connector will not properly work without this message, however it must be sent only once: if sent twice, the second message will be treated like a normal row.
We can use the KafkaProducer API provided by Kafka to send message using Python:
producer = KafkaProducer(
bootstrap_servers=["server-address:9092"],
sasl_mechanism="SCRAM-SHA-256",
security_protocol="SASL_SSL",
sasl_plain_username="username",
sasl_plain_password="********",
)
producer.send(topic, ("x,y").encode("utf-8"), partition=0)
producer.send(
"linear-regression", ("0,0").encode("utf-8"), partition=0
)
producer.send(
"linear-regression", ("1,1").encode("utf-8"), partition=0
)
producer.close()
This code sample sends and and then closes the Kafka Producer. For our example, we are going to send more messages containing different pairs which are samples from the line . However, for the example not to be too simple, we are going to add a small random error to each .
Note that, depending on your version of Kafka, you may need to specify the API version to make this code work:
api_version=(0,10,2)
.
Gathering everything into one piece
The final version of our project contains two files: realtime_regression.py
which processes the stream using Pathway and generating_kafka_stream.py
which generates the streams.
Here is realtime_regression.py
:
import pathway as pw
rdkafka_settings = {
"bootstrap.servers": "server-address:9092",
"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-256",
"group.id": "$GROUP_NAME",
"session.timeout.ms": "6000",
"sasl.username": "username",
"sasl.password": "********",
}
class InputSchema(pw.Schema):
x: float
y: float
t = pw.io.kafka.read(
rdkafka_settings,
topic="linear-regression",
schema=InputSchema,
format="csv",
autocommit_duration_ms=1000,
)
pw.io.csv.write(t, "regression_input.csv")
t += t.select(
x_square=t.x * t.x,
x_y=t.x * t.y,
)
statistics_table = t.reduce(
count=pw.reducers.count(),
sum_x=pw.reducers.sum(t.x),
sum_y=pw.reducers.sum(t.y),
sum_x_y=pw.reducers.sum(t.x_y),
sum_x_square=pw.reducers.sum(t.x_square),
)
def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count):
d = count * sum_x_square - sum_x * sum_x
if d == 0:
return 0
else:
return (sum_y * sum_x_square - sum_x * sum_x_y) / d
def compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count):
d = count * sum_x_square - sum_x * sum_x
if d == 0:
return 0
else:
return (count * sum_x_y - sum_x * sum_y) / d
results_table = statistics_table.select(
a=pw.apply(compute_a, **statistics_table),
b=pw.apply(compute_b, **statistics_table),
)
pw.io.csv.write(results_table, "regression_output_stream.csv")
pw.run()
Don't forget the pw.run()
otherwise no computation will be done!
Once pw.run()
is called, the computation will be run forever until it gets killed.
And the generating_kafka_stream.py
:
from kafka import KafkaProducer
import time
import random
topic = "linear-regression"
random.seed(0)
def get_value(i):
return i + (2 * random.random() - 1)/10
producer = KafkaProducer(
bootstrap_servers=["server-address:9092"],
sasl_mechanism="SCRAM-SHA-256",
security_protocol="SASL_SSL",
sasl_plain_username="username",
sasl_plain_password="********",
)
producer.send(topic, ("x,y").encode("utf-8"), partition=0)
time.sleep(5)
for i in range(10):
time.sleep(1)
producer.send(
topic, (str(i) + "," + str(get_value(i))).encode("utf-8"), partition=0
)
producer.close()
Output
There are two outputs in this project: the CSV file regression_input.csv
which keeps all the updates received from Kafka and the CSV file output_stream.csv
in which all the successive updates of the sum values are displayed.
As in our previous example, the outputs are tables of changes. Each new message of Kafka triggers a new computation and the new values are output in the CSV files!
First, we can check that the generated values are correct:
x,y,time,diff
"0","0.06888437030500963",0,1
"1","1.0515908805880605",1,1
"2","1.984114316166169",2,1
"3","2.9517833500585926",3,1
"4","4.002254944273722",4,1
"5","4.980986827490083",5,1
"6","6.056759717806955",6,1
"7","6.9606625452157855",7,1
"8","7.995319390830471",8,1
"9","9.016676407891007",9,1
We obtain ten values which are sampled around the line. Let's check the regression we obtain:
a,b,time,diff
0,0,0,1
0,0,1,-1
0.06888437030500971,0.9827065102830508,1,1
0.06888437030500971,0.9827065102830508,2,-1
0.07724821608916699,0.9576149729305795,2,1
0.0769101730536299,0.9581220374838857,3,1
0.07724821608916699,0.9576149729305795,3,-1
0.05833884879671927,0.9766933617407955,4,1
0.0769101730536299,0.9581220374838857,4,-1
0.05087576879874134,0.9822906717392795,5,1
0.05833884879671927,0.9766933617407955,5,-1
0.03085078333935821,0.9943056630149089,6,1
0.05087576879874134,0.9822906717392795,6,-1
0.03085078333935821,0.9943056630149089,7,-1
0.03590542987734715,0.9917783397459139,7,1
0.03198741430177742,0.9934574892783012,8,1
0.03590542987734715,0.9917783397459139,8,-1
0.025649728471303895,0.9958341214647295,9,1
0.03198741430177742,0.9934574892783012,9,-1
We obtain close values to what we expect ( and ). You can play the values (number of samples, error, linear function to approximate etc.) to see how the algorithm reacts.
To go further
Congrats, you are now able to use Pathway with Kafka and do some non-trivial computation!
Why not try to do some more advanced computation such as linear regression with several explanatory variables? Or you may want to do some classification?