Nat TaylorObservations & more

The Joy of Snowflake

I’ve been in AdTech for about a decade now, and data analysis used to be a chore. Then along came Snowflake, the speed and simplicity of which, makes it joyful. I present the following use case and explain why I think Snowflake excels at it and briefly foray into the underlying data engineering.

Each day billions of OpenRTB Bid Requests are exchanged between 100s of AdTech vendors.

With so many participants, the transactions can include any version of the OpenRTB specification, any number of extensions, varied-size array data and other differences – all of which make it extremely challenging to structure and flatten into a schema for database storage.

The Bid Requests are complex JSON objects that resemble the following example – but that are always a little bit different in structure and the spec is evolving – so attempting to represent it with structure (as in schema for databases) requires considerable maintainence.

{
    "id": "7979d0c78074638bbdf739ffdf285c7e1c74a691",
    "at": 2,
    "tmax": 143,
    "imp": [{
        "id": "1",
        "tagid": "76334",
        "iframebuster": ["ALL"],
        "banner": {
            "w": 300,
            "h": 250,
            "pos": 1,
            "battr": [9, 1, 14014, 3, 13, 10, 8, 14],
            "api": [3, 1000],
            "topframe": 1
        }
    }],
    "app": {
        "id": "20625",
        "cat": ["IAB1"],
        "name": "com.cheezburger.icanhas",
        "domain": "http://cheezburger.com",
        "privacypolicy": 1,
        "publisher": {
            "id": "8428"
        },
        "ext": {
            "storerating": 1,
            "appstoreid": "457637357"
        }
    },
    "device": {
        "make": "Samsung",
        "model": "SCH-I535",
        "os": "Android",
        "osv": "4.3",
        "ua": "Mozilla/5.0 (Linux; U; Android 4.3; en-us; SCH-I535 Build/JSS15J) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30",
        "ip": "192.168.1.1",
        "language": "en",
        "devicetype": 1,
        "js": 1,
        "connectiontype": 3,
        "dpidsha1": "F099E6D1C485756C45D1EEACB33C73B55C4BC499",
        "carrier": "Verizon Wireless",
        "geo": {
            "country": "USA",
            "region": "PA",
            "type": 3,
            "ext": {
                "latlonconsent": 1
            }
        }
    },
    "user": {
        "id": "bd5adc55dcbab4bf090604df4f543d90b09f0c88",
        "ext": {
            "sessiondepth": 207
        }
    }
}

Still, the ability to quickly analyze the dataset often unblocks data scientists working on algorthm, customer success managers working on campaigns, BizDev folks working on supply deals, product managers looking at trends and more.

The OLAP workload is an excellent fit for partitioned columnar storage, so long as complex nested data types with optional fields are supported. The compute requirements for processing such a dataset are also significant, and there is a huge benefit from an efficient distributed SQL query engine that avoids Volcano Iteration and implements vectorization.

Luckily for thrifty organizations, it is possible to cobble together a solution that meets those conditions (columnar storage and efficient distributed SQL query engine) from open source software. Parquet and SparkSQL make a great combination.

However, for organizations that don’t want to cobble things together, manage them and want more functionality – paid solutions exist like Vertica, Neteeza and more recently Snowflake.

In my experience, every solution eventually falls short. Snowflake hasn’t yet.

Getting started with Snowflake can be as simple as uploading data S3, configuring your bucket as a datasource, creating a schemaless table, loading the data – and then the truly joyful part – querying with flexibility and blazing speed.

Snowflake is a fully featured cloud datawarehouse offering a wide range of related features, but to me, the joyful part is the query speed and flexibility, especially if you have been experiencing painfully slow (or schema-ed) querying on another platform.

Here’s an example. We want to look at the top sites by volume, so we want to run the following query.

select
    get_json_object(bid_request_json, '$.site.domain') domain,
    count(*) volume
from bid_requests
where concat_ws('-', year, month, day) = '2019-02-23'
group by rollup(domain)
order by volume desc
limit 10;

Let’s contemplate doing this without Snowflake.

If we just had this tables’ 58B rows of JSON sitting on a disk with each record taking about 1,500 Bytes, there would be about 87TB to scan. By simply partitioning by day, we could scan more like 300GB. With compression, more like 100GB. But then we’d be sort of stuck, since we really don’t want to add an ETL step to extract domain (e.g. for many reasons including that app records don’t have a site.domain so we have be careful to coalesce with app.bundle and we don’t want to think about that.)

So, we could turn to Parquet [0] and define a schema with a single column for the nested JSON (and still partition by day.) Parquet would do its’ thing, and parsing the JSON, automatically creating a column for $.site.domain and cleverly use dictionary compression. This sounds great!

Columnar storage of nested data is amazing. By automatically maintaining columns within the nested column we defined on our schema, a few things happen:

  1. Parquet can use the best encoding for the data (e.g. RLE for integers, dictionary for strings, delta for timestamps, etc) $.site.domain is highly repetive, so we can build a dictionary of sites mapped to numeric IDs, then store the numeric IDs in place of the domain::string and do RLE on top of that!
  2. Parquet can keep metadata (min,max,distincts,bloomfilter, etc) about each path individually. Thus, if we our query later includes a filter (e.g. $.site.domain = 'example.com') a good query engine could use nested predicate pushdown for pruning.

Now we need to query it, so we’ll turn to SparkSQL [1]. We’ll have to get a cluster spun up, configure it to read our Parquet, and finally execute our query. Luckily SparkSQL is pretty amazing. Version 2.0 rewrote the query engine to avoid Volcano iterators and leverage vectorization, and Version 2.4 supports nested schema pruning, so we can just read the automatically columnarized chunk for $.site.domain. Rejoice!

There is a lot to going on here. Most importantly:

  1. SparkSQL uses Hive’s SQL dialect [3], so we get lots of SQL power
  2. After nested schema pruning, it is very difficult to further reduce IO, so CPU becomes the bottleneck.
  3. IteratorModel execution would result in lots of function calls and returns, writes/reads back and forth to memory, can’t leverage fast implementations like pipelining, cache locality and vectorization – so Spark’s Tungsten Engine is crucial, with Whole Stage Code Generation and vectorized in-memory columanr data.

Now we “just” run the query via spark-shell or a JDBC client, and viola! (Well, probably “viola” but I don’t have this spun up right now and can’t remember exactly the nuances of HiveSQL and such.)

Then let’s run the same query in Snowflake

select
    bid_request:site.domain domain,
    count(*) volume
from bid_requests
where event_date = '2019-02-23'
group by rollup(domain)
order by volume desc
limit 10;

The first thing to notice is the simple syntax for working with semi-structured data, which I much prefer to lots of get_path() or similar. And second, we didn’t have to fuss with the parition folder structure into a concast_ws() (though this may be easier now since its been a while since I used Hive.)

Anyway, the imporant bit: the query speed should be at least as fast or faster than our homemade solution (Sorry, no benchmark!) but without the chore of maintaining all that code and infrastructure.

With only minimal management, Snowflake (internally) probably did something very similar, though we don’t actually know.

We can glean a lot from the query profile, but Snowflake is closed source so we can only make educated guesses about what’s going on based on what they describe in the SIGMOD 2016 whitepaper [4].

Let’s look the query profile (which I’ve included as text below but is presented visually in the browser-based Snowflake console.)

1.38s Compilation Time
19.748s Total Execution Time
12% Processing
15% Local Disk IO
73% Remote Disk IO
0% Initialization
IO: 12.50GB Bytes scanned
IO: 0.00% Percentage scanned from cache
IO: 0.07MB Bytes written
Network: Bytes sent over the network 12.17MB
Pruning: 4,564 Partitions scanned 
Pruning: 4,299,796 Partitions total 

The parition pruning eliminated 99.9% of the data was 99.98% effective where we needed 83,305,203 out of the 83,287,837 scanned records and after nested schema pruning we “only” scanned 12.5GB of data.

Once we had that data, we processed records at a rate of 35M/sec (83,287,837 / (19.8s * 12% processing)) so we can assume that the query execution was probably not Volcano Iterator and included in-memory columnar data that was well laid out for cache-ulitization and vectorization.

Running this query cost us about $0.04 (20s * 4 credits/hour * $2/credit) then we can automatically suspend the warehouse when it finished and then pay just for storage ($40/TB/mo.)

All we had to do was SQL (CREATE DATABASE, COPY INTO, SELECT)

Snowflake does much, much, much more too, which will be the topic of another blog post.

P.S. It later occurred to me to think through the scenario of using S3 + EC2 + GNU Utils. Something like ls chunk_* | xargs -n 1 -P 8 | zgrep -Eo 'site.domain="(.*?)"' | sort | uniq -c | sort -nr. I understand that S3 to EC2 now has a max throughput of 3125 MB/s, gzip -d is something like 200MB/s per core and that grep is around 100MB/s, which translate to about 90 seconds.

References

« »