Uncovering hidden user relationships in crypto exchanges with Fuzzy Join on streaming data
Przemek Uznański·January 9, 2023·0 min read
In this article, we want to analyze a stream of transactions in a crypto exchange. We find all the pairs of users A and B such as A sells B the ETH coin, and buys from B BTC in a separate transaction.
First we import Pathway and load the two transactions logs.
import pathway as pw
transactions = pw.io.kafka.read(
rdkafka_settings={
"group.id": "$GROUP_NAME",
"bootstrap.servers": "clean-panther-8776-eu1-kafka.upstash.io:9092",
"session.timeout.ms": "6000",
},
topics=["eth_transactions"],
)
eth_transactions = transactions.filter(pw.this.currency == "ETH")
btc_transactions = transactions.filter(pw.this.currency == "BTC")
Now we just need to find all the pairs of buyers/sellers in both transactions and use our fuzzy_match_tables
to extract the matching pairs.
eth_movement_totals = eth_transactions.groupby(pw.this.sender, pw.this.receiver).reduce(
user_A=pw.this.sender,
user_B=pw.this.receiver,
usd_total_estimate=pw.reducers.sum(pw.this.usd_estimate),
)
btc_movement_totals = btc_transactions.groupby(pw.this.sender, pw.this.receiver).reduce(
user_A=pw.this.receiver,
user_B=pw.this.sender,
usd_total_estimate=pw.reducers.sum(pw.this.usd_estimate),
)
left_projection = {"user_A": "C1", "user_B": "C2"}
right_projection = {"user_A": "C1", "user_B": "C2"}
matches = pw.ml.smart_table_ops.fuzzy_match_tables(
eth_movement_totals,
btc_movement_totals,
left_projection=left_projection,
right_projection=right_projection,
)
matched_users = matches.select(
btc_sender=btc_movement_totals.ix(matches.right).user_B,
btc_receiver=btc_movement_totals.ix(matches.right).user_A,
eth_sender=eth_movement_totals.ix(matches.left).user_A,
eth_receiver=eth_movement_totals.ix(matches.left).user_B,
confidence=matches.weight,
)
We can now store the resulting table in postgres, or any other database supported by Pathway.
pw.io.postgres.write(
matched_users,
postgres_settings={
"host": "localhost",
"port": "5432",
"dbname": "transactions",
"user": "pathway",
"password": "my_password",
},
table_name="matched_users_btc_eth_swapping",
)
Would you like to find users that match within given time-window? Take a look at recipes on group-by with a tumbling window.