If you are using Kafka to capture large quantities of events or transactional data, you are probably also looking for ways to enrich that data in real-time.

Let’s say you are pushing e-commerce transactions that look like the following to a Kafka topic:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
  "orderkey": 1,
  "linenumber": 1,
  "custkey": 7381,
  "partkey": 155190,
  "suppkey": 828,
  "orderdate": "1996-01-02",
  "orderpriority": "5-LOW",
  "shippriority": 0,
  "quantity": 17,
  "extendedprice": 21168,
  "ordtotalprice": 173665,
  "discount": 4,
  "revenue": 20321,
  "supplycost": 747,
  "tax": 2,
  "commitdate": "1996-02-12",
  "shipmode": "MAIL"
}

In order to find out things like:

  • what your top selling products are,
  • what your total revenue per product category is,
  • who your most successful supplier is, or
  • where do your customers come from

you are going to want to enrich that partkey with information about the product, that suppkey with the Supplier’s information and that custkey with relevant information about the Customer.

There are two ways to go about this with Tinybird; for both of them, we are going to need those additional dimensions within the database. So let’s first generate a lot of Customers, Suppliers and Parts and ingest them.

We will use a modified version of the Star Schema Benchmark dbgen tool to generate fake data quickly and in large quantities. Like so:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ dbgen -s 10 -T c > customers.csv
$ head -3 customers.csv
custkey,name,address,city,nation,region,phone,mktsegment
1,"Customer#000000001","j5JsirBM9P","MOROCCO  0","MOROCCO","AFRICA","25-989-741-2988","BUILDING"
2,"Customer#000000002","487LW1dovn6Q4dMVym","JORDAN   1","JORDAN","MIDDLE EAST","23-768-687-3665","AUTOMOBILE"
$ cat customers.csv | wc -l
  300001

$ dbgen -s 10 -T p > parts.csv
$ head -3 parts.csv
partkey,name,mfgr,category,brand,color,type,size,container
1,"lace spring","MFGR#1","MFGR#11","MFGR#1121","goldenrod","PROMO BURNISHED COPPER",7,"JUMBO PKG"
2,"rosy metallic","MFGR#4","MFGR#43","MFGR#4318","blush","LARGE BRUSHED BRASS",1,"LG CASE"
$ cat parts.csv | wc -l
  800001

$ dbgen -s 10 -T s > suppliers.csv
$ head -3 suppliers.csv
suppkey,name,address,city,nation,region,phone
1,"Supplier#000000001","sdrGnXCDRcfriBvY0KL,i","PERU     0","PERU","AMERICA","27-989-741-2988"
2,"Supplier#000000002","TRMhVHz3XiFu","ETHIOPIA 1","ETHIOPIA","AFRICA","15-768-687-3665"
$ cat suppliers.csv | wc -l
   20001

Now we can just post these CSVs to Tinybird’s Datasources API and it will figure out the types and ingest the data automatically:

1
2
3
4
5
6
$ export TOKEN=your_tinybird_token
$ for ds in customers parts suppliers
> do
>   curl -X POST -H "Authorization: Bearer $TOKEN" \
>     "https://api.tinybird.co/v0/datasources?name=${ds}" -F csv=${ds}.csv
> done

Just like that, we have our dimension tables as data sources in Tinybird:

Our three dimensions datasources, now created in Tinybird
Our three dimensions datasources, now created in Tinybird

Lastly, we’ll need the actual line orders for every purchase - we will create an empty data source for that, as we will be pushing those records via Kafka.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Lineorders Schema
# orderkey UInt32,
# linenumber UInt8,
# custkey UInt32,
# partkey UInt32,
# suppkey UInt32,
# orderdate Date,
# orderpriority String,
# shippriority UInt8,
# quantity UInt8,
# extendedprice UInt32,
# ordtotalprice UInt32,
# discount UInt8,
# revenue UInt32,
# supplycost UInt32,
# tax UInt8,
# commitdate Date,
# shipmode String

#create lineorders datasource with schema
$ curl -X POST -H "Authorization: Bearer $TOKEN" \
> "https://api.tinybird.co/v0/datasources?name=lineorders&mode=create&schema=orderkey%20UInt32%2Clinenumber%20UInt8%2Ccustkey%20UInt32%2Cpartkey%20UInt32%2Csuppkey%20UInt32%2Corderdate%20Date%2Corderpriority%20String%2Cshippriority%20UInt8%2Cquantity%20UInt8%2Cextendedprice%20UInt32%2Cordtotalprice%20UInt32%2Cdiscount%20UInt8%2Crevenue%20UInt32%2Csupplycost%20UInt32%2Ctax%20UInt8%2Ccommitdate%20Date%2Cshipmode%20String"

In order to push line orders to a Kafka Topic, we will use a simple Kafka producer in python that will read the individual line orders that we will generate via dbgen. Here is how the producer looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import click
import csv
import time
import json
from kafka import KafkaProducer

@click.command()
@click.argument('input', type=click.File('r'))
@click.argument('topic')
def produce(input, topic):
    serializer = lambda v: json.dumps(v).encode('utf-8')
    producer = KafkaProducer(value_serializer=serializer)
    reader = csv.reader(input)
    for row in reader:
        producer.send(topic, row)

if __name__ == '__main__':
    produce()

The producer receives the file name with the line orders and the Kafka topic name (‘orders’) to which it will push all of them one by one.

In the following snippet, we create a Python environment and start the producer (we assume you have Kafka and zookeeper running already):

1
2
3
4
$ python3 -m virtualenv .venv
$ . .venv/bin/activate
$ pip install click kafka-python requests
$ dbgen -s 10 -T l | python producer.py - orders

This is now pumping line orders into Kafka, but nobody is consuming them yet. We will use a consumer (source code) that will read those orders in chunks of 20000 and send them to Tinybird, so that they get ingested directly into the “lineorders” datasource we created earlier.

When running the consumer, we specify what Kafka topic it needs to read from (again, ‘orders’) and what datasource in Tinybird it needs to populate, as well as the API endpoint. Like this:

1
2
3
4
5
6
$ python consumer.py orders lineorders
[2020-03-19 07:44:13.365284] status=200 datasource=lineorders, rows=20000
[2020-03-19 07:44:14.508733] status=200 datasource=lineorders, rows=20000
[2020-03-19 07:44:15.343863] status=200 datasource=lineorders, rows=20000
[2020-03-19 07:44:16.288563] status=200 datasource=lineorders, rows=20000
[2020-03-19 07:44:17.417385] status=200 datasource=lineorders, rows=20000

The consumer starts reading all those Kafka events at a rate of approximately 20K records per second and pushing them in chunks to Tinybird, and it will keep going while there are lineorders to consume. Let’s look at how the data is shaping up via the Tinybird UI:

Our lineorders datasource, now created in Tinybird
Our lineorders datasource, now created in Tinybird

Looking good!

Enriching the classic way

Now that we have the e-commerce transactions (line orders) coming in as well as all the required dimension tables (Customers, Parts and Suppliers) we can start enriching content with regular SQL joins.

Let’s say we wanted to extract how many parts of each category are sold per year for each country, and limit the results to years 1995 to 1997. We can create a Pipe and write an SQL query like this one:

Parts per category sold each year per country, with regular Joins
Parts per category sold each year per country, with regular Joins

In our entry level account, with over 60M line orders in total, that query can take almost 6 seconds to run; this is fine if you are only performing it every once in a while, but if you want to expose multiple API endpoints and hit them with multiple requests per second from live applications, those seconds will add up. And things would only get slower as data grows.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ curl --compressed -H 'Authorization: Bearer $TOKEN' \
  "https://api.tinybird.co/v0/pipes/lineorders_pipe.json"
{
  ...

  "data":
  [
    {
      "year": 1995,
      "customer_country": "ALGERIA",
      "part_category": "MFGR#32",
      "amount_sold": 380359
    },
    ...
    {
      "year": 1997,
      "customer_country": "VIETNAM",
      "part_category": "MFGR#43",
      "amount_sold": 367392
    }
  ],

  "rows": 1875,

  "statistics":
  {
    "elapsed": 5.979295137,
    "rows_read": 29728603,
    "bytes_read": 346861597
  }
}

We can obviously make that faster by throwing a bigger Tinybird account at you, which would parallelize the query amongst many more CPU cores and make it faster; however, we can also speed it up with a different approach.

Enriching at Ingestion Time

One of the best things about Clickhouse, the columnar database that powers Tinybird, is that it is extremely efficient at storing repetitive data. That means that, unlike in transactional databases, denormalizing data won’t have a huge impact on the amount of data you have to read when performing queries.

In Tinybird, you can create “Ingestion” pipes that materialize the result of a query into another datasource. This helps us enrich data as it comes into Tinybird; rather than performing JOINS every time you query the data, you perform those JOINS at ingestion time and the resulting data is available for you to query in a different datasource.

Here is an example of one of those Ingestion pipes through our UI.

This pipe materializes the result of several joins at ingestion time
This pipe materializes the result of several joins at ingestion time

What this Pipe is essentially doing is materializing the result of that query from the “lineorders” datasource to the “sales” datasource, and it happens every time new data gets ingested.

As you can see, it is adding every column from “lineorders” plus a number of other columns from the Parts, Category and Supplier dimensions, enabling us to have everything we need for one or more analytics use-cases in a single place.

Our new sales datasource contains all the fields we require from the dimensions tables
Our new sales datasource contains all the fields we require from the dimensions tables

It uses “joinGet”, a Clickhouse function that enables you to extract data from a table as if you were extracting it from a dictionary; it is extremely fast and it requires that the tables you extract from to be created with a specific Clickhouse engine: that is why in the query you see those part_join_by_partkey or supplier_join_by_suppkey datasources - we create them automatically in these scenarios to enable fast joins at ingestion.

If we build a query to extract the same results as before but directly through the denormalized “sales” datasource, it would look like this:

A much simpler and faster query that produces the same result
A much simpler and faster query that produces the same result

If we hit that endpoint again, we get the same results but now in 161ms (vs almost 6 seconds), which is about 37 times faster.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ curl --compressed -H 'Authorization: Bearer $TOKEN'  "https://api.tinybird.co/v0/pipes/lineorders_pipe.json"
{
  ...
  "data":
  [
    {
      "year": 1995,
      "customer_nation": "ALGERIA",
      "part_category": "MFGR#32",
      "amount_sold": 380359
    },
    ...
    {
      "year": 1997,
      "customer_nation": "VIETNAM",
      "part_category": "MFGR#43",
      "amount_sold": 367392
    }
  ],
  "rows": 1875,
  "statistics":
  {
   "elapsed": 0.161279414,
   "rows_read": 27528169,
   "bytes_read": 965612644
  }
}

The beauty of this is that:

  • we can enrich data as soon as it hits Tinybird,
  • we can do it at a rate of hundreds of thousands of requests per second, whether this data comes through Kafka or any other means,
  • every time new data gets ingested, only the new rows need to be materialized,
  • while all that data is coming in, you can keep hitting your Tinybird real-time endpoints with abandon and we ensure that results are always up to date, with all the data you require for analysis

What about you? Do you use Kafka to capture events? Drop us a line or sign up to our waiting list for a guided onboarding session with us.