BigQuery + Python for Production Data Science

by Emma Tebbe

July 2020

Originally posted on Medium


For the past year and a half, we at Brandfolder have been moving toward using BigQuery as a datalake. It began by storing a copy of all our raw data there, and we’ve steadily moved into storing feature extraction tables as well as data produced by machine learning models and accessed directly by our product. Housing data in BigQuery we find it much easier to access both in terms of speed and flexibility.

For those unfamiliar with Google BigQuery, it is a serverless cloud data warehouse (think database) with built-in and automated distributed computing. Initially, we started using BigQuery as a copy of our production database, and found ourselves leaning on it as our data size grew. Async replica DB queries would take upwards of half an hour, while in BigQuery these same queries would be complete within a minute.

Additionally, with our datalake living primarily in parquet in Google Cloud Storage, we were limited in several ways. We needed Pyspark to access it, which meant getting Google Dataproc involved with clusters, and it often meant pulling much bigger datasets than we needed and whittling down after reading the files. Using Pyspark to read and manipulate the files would also often take quite a long time, which negatively impacted our ETL process. As some of our tables began to grow by 30M+ rows a day, our ETL process began taking longer than we could afford. Surveying our infrastructure, we noticed that we could spend much less effort managing our data pipelines with our datalake in BigQuery as opposed to continuing to manage Spark instances facing ever-growing datasets with complex joins.

It’s important to note that BigQuery charges based on the size (in GB) of the data accessed. If the tables you’re querying are large and you’re running the same query multiple times a day, it could become expensive. On the other hand, if you lean on BigQuery for data wrangling you won’t have to pay for processing power and compute time the way you might have to when using Spark.

Our data infrastructure is almost entirely in Python, from ETL through to ML model development. In order to make the most of a Python + BigQuery infrastructure, we access BigQuery in three ways: Pandas, Pyspark, and the OS python package. For each of these methods, I’ll discuss the package we use, examples, the use cases for that method, and notes.


icon

1. Pandas + BigQuery

This combination is often used to pre-aggregate data and pull in smaller datasets for tasks like reporting or visualizations. With a clearly defined in-line query, this may be the most intuitive and easy package to use, but is limited by data size, since the results need to be handled in Pandas.

Package: google-cloud-bigquery

Example:


import pandas as pd
from google.cloud import bigquery
bqclient = bigquery.Client()
query = (
    """
    SELECT CASE
               WHEN count(distinct id) = 0 then 0
               ELSE count(distinct CASE
                            WHEN atts.state = 'created' THEN id
                        END)/count(distinct id)
           END AS created_percent,
           count(distinct id) AS num_attachments
    FROM `datalake.dataset.attachments` AS atts
    WHERE CAST(atts.created_at AS DATE) = '2020-07-07'
    """
)
result = bqclient.query(query).to_dataframe()

Use cases:

Notes:

icon

2. Pyspark + BigQuery

The Spark / BigQuery connector comes in where the Pandas BigQuery package is limited by data size. This connector allows you to pull in BigQuery tables as a simple Spark read command. However, with the BigQuery / Spark connector you can’t write queries in your Python code. The way we’ve found around that is to create views in BigQuery that contain the necessary query, which Spark can then read.

Package: bigquery-spark-connector

Example: The following example shows how we pull data from a view. You can also pull from a table, and wouldn’t need the viewsEnabled option.


results = spark.read.format('bigquery') \
    .option('table', 'datalake.dataset.table') \
    .option('viewsEnabled', 'true') \
    .load()
## insert any data manipulations here ##
results.write.format('bigquery') \
  .option('table', 'datalake.dataset.output_table') \
  .save()

Below is an example of a view the Spark connector might read from.


SELECT o.key AS org_key,
           attachment_key,
           t.name AS tag
FROM (SELECT name, 
             asset_id,
             organization_id 
      FROM `datalake.dataset.tags`) t
INNER JOIN
    (SELECT id,
            asset_key
     FROM `datalake.dataset.assets`
     GROUP BY 1,2) a
  ON t.asset_id = a.id
INNER JOIN
    (SELECT asset_id,
            attachment_key AS attachment_key
     FROM `datalake.dataset.attachments`
     GROUP BY 1,2) att
  ON a.id = att.asset_id
INNER JOIN
    (SELECT id,
            key
     FROM `bi-database.boulder.organizations`
     GROUP BY 1,2) o
  ON t.organization_id = o.id
GROUP BY 1,2,3

Use cases:

Notes:

3. OS + BigQuery

This is another method that can allow you to modify and insert queries within Python. The benefit of using the OS package is the data does not get pulled into memory. We use this method when the script in question needs to be dependent on other jobs but needs no Python data manipulation.

Packages: OS package and bq command-line tool

Example: This example shows how to replace a BigQuery table with a query. We also use the OS package to write BigQuery tables from parquet, delete rows from BigQuery tables, or copy BigQuery tables.


import os
query = """
    SELECT group_id,
           a.id as asset_id,
           organization_id,
           brandfolder_id
    FROM `datalake.dataset.events` e
    inner join `datalake.dataset.assets` a 
        on a.asset_key = e.resource_key)
    inner join `datalake.dataset.brandfolders` b 
        on a.brandfolder_id = b.id
    WHERE action_name in ("viewed", "downloaded", "shared")
    group by 1,2,3,4
"""
os.system("""
        bq query \
        --destination_table project:dataset.asset_sessions \
        --replace \
        --use_legacy_sql=false \
        '""" + query + """'
         """)

Use cases:

Notes:



icon

One of the risks of leaning on BigQuery is cost, as each query’s cost is determined by the size of the tables it’s accessing. It’s worth noting that in making that transition the cost of using BigQuery has not increased noticeably, but there are several ways to ameliorate those costs:

It’s also worth mentioning that we evaluated BigQuery’s Storage API about a year ago. Unfortunately, it was fairly immature at the time, and challenging to access with Spark. Furthermore it’s exposing the underlying data in BQ that we’d still need to wrangle, as opposed to leveraging BQ for wrangling as described.



Between Pandas, Pyspark, and the OS package, we’ve been able to seamlessly transition to using BigQuery as our primary datalake. We’ve benefited with quicker processing times, reliability boosts, and code simplification. With those benefits, I highly recommend dipping a toe into the world of Python + Bigquery.