When a product grows it’s easy to gather millions of rows per day from different sources: CDN logs, database records, user and app-generated events…

You usually don’t pay so much attention at the beginning, you just store them somewhere hoping that you can extract valuable information in the future.

And there is always a use case where that data is useful:

  • Product management decisions
  • Internal real-time dashboards
  • User facing dashboard stats
  • Analytics API
  • Measuring product usage

The billion rows problem

If you have 10 million events a day it takes a little bit over 3 months to have 1 billion rows in your storage. Postgres, Mongo or any other database you use in your product tech stack is able to deal with 4 billion rows, the problem is consuming them fast and by a lot of clients.

You open a database console, run a query and you notice a simple aggregation takes 20-30 seconds. Someone analyzing the data could wait a little bit for the results but if you want to expose real-time analytics in your application, that’s just too much time.

Now the problem is bigger if you have a medium load, let’s say 100QPS with dynamic filters over the full data range (4B rows a year) with some JOINs to augment the data with, for example, information about your users from your CRM.

How do we approach this?

This is the workflow we use at Tinybird when we approach a use case like that:

    1. Understand data and use cases
    1. Design data schema to be as good as possible to satisfy the use cases but still compatible with incoming data. This includes denormalization, columns, precalculation, indices, data order…
    1. Build queries for the worst-case scenario but taking into account usage statistics
    1. Run benchmark based on use cases
    1. Evaluate bottlenecks (CPU, mem, network, disk)
    1. [Partitioning and sharding]
    1. Iterate guided by benchmark results

Tinybird Analytics enables us to do these iterations pretty fast, you will see how below.

For real-time analytics API at scale (and in general for any kind of application) we aim to:

  • Query as little data as possible. That means, use indices as much as possible, sort data to be as compact as possible and use the right algorithm:

    • Use faster operations first. For instance, filtering is faster than grouping or joining, so the filtering comes first
    • Statistics are always our friend and Clickhouse, the engine we use under the hood, has plenty of functions that use statistical approximations that speed up queries
  • Build pre-aggregations in real-time. Pre-aggregations are usually expensive because they need to be fully recalculated often. Clickhouse allows having pre-aggregated tables that can be recalculated incrementally as new data arrives.

For small datasets (under 1M a day) it’s usually enough to denormalize and be careful with the indices; for larger datasets with a high load, a mix of join tables, denormalization, pre-aggregation and scaling is needed. Luckily Tinybird deals with the harder parts for us.

How to do this with Tinybird

Let’s get this use case from Algolia. They explain how they provide real-time analytics for their clients. Basically they have a bunch of periodic tasks to create aggregations every 5 minutes and every day and then, using some trickery on SQL, they use queries to get the final results.

How would that work on Tinybird?

Tinybird has an API and user interface, but it also has a CLI client that allows creating data projects using text files (so they can be stored in a repo and therefore be in the CI cycle). The data definition format is pretty close to a docker file.

Let’s create the landing dataset

-- query_log.datasource file

SCHEMA >

    timestamp DateTime,
    app_id String,
    user_id String,
    query String

Let’s define a view of the data by 5 minutes

-- rollup.pipe file

NODE rollups
SQL >
    SELECT toStartOfFiveMinutes(timestamp) minute,
        app_id,
        countState() count,
        uniqState(user_id) user_count,
        topKState(10)(query) top_queries
    FROM query_log
    GROUP BY app_id, minute

--  we tell tinybird that this is a view that is updated when query_log table is updated
TYPE aggregating

-- lets create a new node that queries the previous node and expose an API
NODE api
SQL >
    SELECT minute,
        countMerge(count) count,
        uniqMerge(user_count) user_count,
        topKMerge(top_queries)
    FROM rollups
    WHERE minute between  and 
    group by minute

That gives us total number of queries, unique user count and top queries every five minutes. We could do exactly the same thing for daily queries with “toStartOfDay(timestamp)” in a rollup_1day.pipe file.

Now lets push those pipelines to Tinybird; it will process every dependency and create needed pipes and datasources.

$ tinyb push rollup.pipe

** processing rollup.pipe
** processing query_log.datasource
** building dependencies
** Running query_log.datasource (dev)
** Running rollup.pipe (dev)

$ tinyb push rollup_1day.pipe
...

Now let’s generate a fake random dataset with a long-tail like probability distribution with a few million rows (10M) and push it to the datasource we previously created. There are many ways to push data into tinybird, in this case, we are streaming the data directly from the local file.

python -c 'import uuid, random, datetime;[print("%s,%s,%s,%s" % (datetime.datetime(2019,1,1,0,0,0) + datetime.timedelta(seconds=x + random.randint(0, 20)), int(2*random.gammavariate(1, 2)), int(10000*random.gammavariate(1, 2)), int(100000*random.gammavariate(1, 2)))) for x in range(1, 10000000)]' | \
gzip | \
curl -H "Authorization: Bearer $TOKEN" -H 'Content-encoding: gzip' \
     -F csv=@- https://api.tinybird.co/v0/datasources?name=query_log&mode=append

Now, we query it for the first seven days of January

curl -s http://api.tinybird.co/v0/pipes/rollup.json\?start_date=2019-01-01&end_date=2019-01-07&token=$TOKEN \
    | jq .data | head -n 10
[
  {
    "minute": "2019-01-01 00:00:00",
    "count": 155,
    "user_count": 155
  },
  {
    "minute": "2019-01-01 00:05:00",
    "count": 290,
    "user_count": 290
  },
  ...
]

You can still query the query_log table if there is a use case not covered by the materialized pipe but let’s check the difference in number of rows between them:

  • full table - 10M rows
  • rollups 5 minutes- 616k rows - 1/20 of original size
  • rollups 1 day - 5k rows - 1/2000 of original size

It’s about 20x less for five minutes and 2000x less for 1 day. That means a query that takes 1 second for the raw table will take 50ms for the five-minute rollups. There is another important point here: the rollup tables are really small, so they likely fit in memory and, even without SDD, the performance is going to be pretty high.

The same example with 500m rows (~6M rows per day) throws the same size ratios and these results for query times (with a 4 core machine using all of them per query):

  • raw table query log - 2900ms
  • rollups 5 minutes - 250ms
  • rollups 1 day - 150ms

And, in general, this is the way we are able to achieve very high QPS with low latency and easily scale it up to billions of rows. All in one place and without costly or unflexible pre-aggregations and ETLs.

Want to talk about similar use cases or do a quick proof of concept? Don’t hesitate to email us!!