Wednesday, September 1, 2021

Building a serverless “Twitter data stack” for free with dbt and BigQuery (using Python, Google Cloud Functions, and Pub/Sub triggers)

➔ Overview

This project deploys a serverless Python app on Google Cloud Functions and uses a Pub/Sub trigger to pull tweets from the Twitter API, push them to BigQuery, process them with dbt, and then send tweets to people who have gone quieter than usual.

Inspiration

I was inspired recently by a tweet from Claire Carroll:

I've been looking for an excuse to dig into some technologies that are new to me, and decided to use this as an opportunity for a weekend project.

The technologies on my short list included Docker and Google Cloud Platform (GCP), especially BigQuery.  Using these services also has the added advantage of making the project completely free (compared to Snowflake, which has no free tier, or AWS, which is well known to have a dangerous free tier that's impossible to understand).

➔ Project Architecture

I initially planned to build a containerized app using Docker to stream tweets, deploy on Cloud Run, and send data to BigQuery.  I built a rough version (and successfully deployed my first Docker containers) but quickly realized that streaming Twitter data: (1) involved more complexities than I wanted to deal with and (2) was overkill... for a side project that is already overkill.

So on the advice of my friend (and brilliant former Googler) Danny Hermes, I moved off the container structure and went straight to Google Cloud Functions (the Google equivalent of AWS Lambda).  This provided the added bonus of building this project as a serverless app, with data polling triggered by scheduled GCP Pub/Sub messages.

The overall project became:
"Twitter data stack" serverless diagram


I created a Twitter list called “Data Twitter” to assemble a (very small and random) subset of the data people I follow.  (I made the list quickly and I missed a lot of people—let me know who else I should add!)

The actual dbt logic creates a simple DAG that looks at recent user tweets, our own recent “r u okay?” activity, and creates a set of actions that we want to trigger:

"Twitter data stack" DAG


A few notes about this project:

  • This is functional code and works for this little project, but it is hardly production-grade.  I have a smattering of error checks and tests (mostly to handle errors I encountered while building), but it’s well short of what I’d want for a commercial application.  I’m still a little surprised I actually gave this app access to post to my Twitter...
  • The naming conventions are a bit of a mess.  I use ‘status’ and ‘tweet’ interchangeably, and I treat ‘actions’ and ‘activity’ differently.  Sorry.  Built this project in stages as an experiment over a few evenings, so project clarity was not priority #1.
  • I chose Google Cloud because I wanted to learn more about it and it has a good free tier.  If I were building this for a production use case, I would almost certainly choose Snowflake and AWS.  Given the app’s simplicity, I’d probably deploy as much of this self-contained on Snowflake as possible.  See a great example from Census on pulling data from the Clearbit API using a Snowflake external function and an AWS Lambda.
  • Feedback welcome!  Feel free to DM or email me.

This post goes through the entire process end-to-end and shares the code I used to build this project.  If so inclined, you should be able to get up and running with just the following.  If not, let me know!

At the end I mention some potential additional features and my takeaways from building out this project.

Key Steps

  1. Apply for a Twitter Developer Account
  2. Set up your code environment
  3. Set up your Twitter Developer Account
  4. Set up your Google Cloud Platform environment
  5. Deploy and configure Google Cloud Functions and Pub/Subs
  6. Clone dbt GitHub repo and set up dbt Cloud
  7. Create and configure dbt project

Assets

  1. GitHub repo for the Python code (running on Cloud Functions)
  2. GitHub repo for the dbt code (running on BigQuery via dbt Cloud)

➔ Process

This section walks through each step to get the project deployed.

1. Apply for a Twitter Developer Account

The long pole in the tent in working with Twitter data is waiting for approval for the Twitter API.  This took 48–72 hours.

Fill out your details at the Twitter Developer Portal.

I wrote some basic details about a personal project using Twitter data to learn about the API.  I got a followup email asking for basically all my information again:

Twitter info email


I replied to the email reiterating that I’m working on a personal non-commercial project. A day later I received the following:

Twitter account approved

2. Set up your code environment

Clone the code repo for Twitter Data Stack:
git clone https://github.com/zacharykimball/twitter-data-stack
"Twitter Data Stack" git repo

Go to the repo directory:
cd twitter-data-stack
Make sure you have Tweepy and BigQuery and the Google local functions framework for Python installed:
python3 -m pip install tweepy
python3 -m pip install google-cloud-bigquery
python3 -m pip install functions-framework
Generate the appropriate requirements doc for later deployment to GCP:
pip-compile --generate-hashes --allow-unsafe --output-file=requirements.txt requirements.in
(Note: we have to allow-unsafe in order to pin the Python package setuptools for the bigquery package to operate correctly.)

Create your own ‘config’ file where you’ll load your own keys and details:
cp config.py.example config.py

3. Set up Your Twitter Developer Account

Create the name of your app:

Twitter app name

Twitter will create an API key, API secret key, and Bearer Token for your access (the purple boxes are my censored private keys):
Twitter API keys
(Note: The purple boxes are censored private data.)

Edit the ‘config’ file (see prior section) to add your Twitter consumer key and consumer secret.

Then go to your project and select Keys and tokens:

Twitter keys and tokens

You’ll want to generate a new access token and secret for your app to use.  Click “Generate”:
Twitter generate access token and secret

Twitter creates the token and secret (the purple boxes are my censored private keys).
Twitter access token and secret

Edit the ‘config’ file to set the access token and token secret.

4. Set up your Google Cloud Platform environment


Configure the project.
Click your current project to open other project options (mine is my company Hardfin but yours will be different):
GCP current project


Choose “New Project” in the top right:
GCP new project

Name the project and click “Create”:

GCP project create


Choose “Select Project”:

GCP select project


Go back to the GCP dashboard and find the project number associated with your project (censored in purple here):

GCP project number


Edit your ‘config’ file to add the project number.


Configure a service account.

Choose “Credentials”:

GCP credentials


Click “Create Credentials” and choose “Service account”:

GCP service account


Give the service account a name and click “Create and Continue”:

GCP service account create


Now you have to select the permissions to give the service account.


Note: Google had about 1,000 role types so I gave up on picking something with appropriately restricted permissions and instead chose “Owner” to give broad leeway so I wouldn’t run into permissions issues later.  DON'T DO THIS ON A PRODUCTION APPLICATION.


Click “Continue”.

GCP service account access


Grant your own user access to this service account, which will be important later.  Click “Done”.

GCP service account user


On the service accounts page, find the service account:

GCP service account list


Edit the ‘config’ file in your repo and add the service account email address from GCP.


Set up keys.

Choose “Manage keys”:

GCP service account manage keys


Create new key:

GCP create key


Create a JSON key:

GCP service account JSON key


The private key file should download automatically:

GCP service account key download


You will have a file like ‘twitter-data-stack-123456.json’ available.  Move it to your project directory and change the name to ‘key.json’.  (If you store or name the file differently, be sure to update your config file to reflect the correct file path.)


Create datasets.

Go to GCP and navigate to BigQuery:

GCP BigQuery


Choose ‘create dataset’ in your current project to create a new dataset:

BQ create dataset


Give your dataset a name (such as ‘twitter’).  This will be the database where we land raw data for later analysis with dbt:

BQ create dataset tweets


While we’re here, we’re going to create another dataset to use for our analyzed data later on.  Give your dataset a name (such as ‘twitter_analysis’):

BQ create dataset analysis



Enable the BigQuery API.

Go to APIs & Services:

GCP APIs & services


Choose the BigQuery API:

BQ API


Ensure that the API is enabled:

BQ API enabled


Set up billing (to make everything work, not to actually pay anything).

We need to set up a billing account or GCP will start denying us access to services.  Go to GCP Billing:

GCP billing


Choose Link a Billing Account:

GCP billing account


If you have an existing billing account, you can link it here.  If not (I did not as this is my first foray into GCP) you can Create Billing Account here.

GCP enable billing

Complete the verification flow to set up billing and link your billing account.


Set up budget.

We are building a tiny project, have the benefit of limited free services, and also have a credit for a 90-day free trial.  But things could somehow get away from us with an errant query or process—and this is supposed to be a free side project!  Let’s get ahead of the curve by setting up a budget and alerts:

GCP budgets & alerts


Create a budget:

GCP create budget


GCP budgets are defined by a "scope."  I set up two scopes — one for understanding rough implied spend (not including discounts and promotions) and one to have “hard” limits that I don’t want to cross (inclusive of discounts and promotions).


First, I’m monitoring all services (exclusive of discounts/promotions):

GCP scope monitor


I want alerts if I’m going over $30 per month ($1 per day).  This is a tiny pet project, I have no idea how GCP’s billing stacks up, and I want to understand immediately what my “burn rate” would be if I have to pay (and thus if I need to nuke the project).

GCP scope monitor amount


I threw in a bunch of thresholds to make sure I’d get tagged along the way, if needed.

GCP scope monitor actions

Click “Finish” to activate the monitoring.


I then created a second budget to try to institute more of a “hard limit.”  (Note that GCP doesn’t actually let you set a limit here, only alerts if you are approaching a predefined “budget.” Hence why I’m going overkill on alerting to understand the cost factors in my first GCP project.)

GCP create budget button


This one is inclusive of discounts and promotions:

GCP scope hard


I decided if push comes to shove, I was okay forking over $5/month for this little game:

GCP scope hard amount


Here I’m using forecasted triggers to understand where actual spend might land:

GCP scope hard actions

Click “Finish” to activate the second budget monitor.


Set up alerting.

Finally, we turn on alerting. Set up a notification channel for this project, using your preferred notification method (I used email).

GCP notification channels


Then in error notifications, select your new notification channels:


Save and you're all set!

5. Deploy and configure Google Cloud Functions and Pub/Subs

Our functions are going to run in Google Cloud Functions, which is GCP's serverless offering.

Set up the Google SDK if you don’t have it already:
brew install --cask google-cloud-sdk
Be sure to set up zsh to use gcloud CLI (replace ‘zsh’ with ‘bash’ if on bash):
source "$(brew --prefix)/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/path.zsh.inc"
source "$(brew --prefix)/Caskroom/google-cloud-sdk/latest/google-cloud-sdk/completion.zsh.inc"
Depending on the latest data, you may need to run:

gcloud components update
Run gcloud init to open a browser window to log in to the Google Cloud SDK.

You will be given a choice of which project to select — choose the relevant one:
GCP select project dialog

Throughout this process, you may be prompted to enable certain additional GCP services in order for this to function correctly.  If so, just choose ‘y’ for yes.

In addition to authorizing your own account (which we did via browser), you’ll have to activate the service account:

gcloud auth activate-service-account <service-account-email>@twitter-data-stack.iam.gserviceaccount.com --key-file=key.json
Substitute your own service account name and JSON key file location (if applicable).

Deploy function to pull tweets.
Note: the code that actually triggers a tweet is not active to prevent people from accidentally tweeting by following this guide blindly.  If you want to trigger real tweets, you need to take two steps in main.py:
  • (1) uncomment line 247
  • (2) delete or comment out line 250
tweet code to uncomment

You can check that your code works locally by running the following in one Terminal window:

functions_framework --signature-type event --target pull_tweets

And calling the function in another window (this call just uses sample data to simulate a real Pub/Sub call):

curl localhost:8080 \

  -X POST \

  -H "Content-Type: application/json" \

  -d '{

        "context": {

          "eventId":"1144231683168617",

          "timestamp":"2020-05-06T07:33:34.556Z",

          "eventType":"google.pubsub.topic.publish",

          "resource":{

            "service":"pubsub.googleapis.com",

            "name":"projects/sample-project/topics/gcf-test",

            "type":"type.googleapis.com/google.pubsub.v1.PubsubMessage"

          }

        },

        "data": {

          "@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage",

          "attributes": {

             "attr1":"attr1-value"

          },

          "data": "d29ybGQ="

        }

      }'

If the function is working correctly locally, deploy to GCP:
gcloud functions deploy pull_tweets --runtime python39 --trigger-topic pull_tweets --timeout 2m

(Note: I set the timeout to 2 minutes because the loop I am using to query user timelines can take 3–4 seconds per user, so the current Data Twitter list with 16 people can exceed Google's default timeout, which is 60 seconds. Could definitely refactor the code to be more efficient but this is easier and I'm not close to the 9-minute limit on Google Clouds Functions.)

At some point, you may be asked to select an appropriate region, pick something local:
GCP select region


This will create a new Pub/Sub topic ‘pull_tweets’ that triggers the pull_tweets function in twitter-data-stack.

You can run the cloud function by publishing a message on GCP:

gcloud pubsub topics publish pull_tweets --message 'test run'

You can check that the function ran correctly by looking at Cloud Function logs:
gcloud functions logs read pull_tweets --limit 10

Schedule function to pull tweets.
Next let’s set up a scheduled job to run the pub/sub trigger and pull these tweets on a schedule.  Depending how many accounts you’re tracking, you probably won’t need to do this more often than you want to analyze it.  But Cloud Functions is cheap and we’ll get faster error resolution if we set it more often, so it’s ~costless to run every hour.

Note: Cloud Scheduler has a very low free tier — it allows 3 free scheduled jobs per month.  This process uses 2 of them.  If you have other jobs running, you may incur additional cost.  (Although note that Cloud Scheduler currently charges $0.10 per job per month so worst case you’ll incur $0.20 per month for this.)

First enable the Cloud Scheduler API on your project.

Then set up a scheduled job to run on the hour every hour:

gcloud scheduler jobs create pubsub pull_tweets_job --schedule "0 * * * *" --topic pull_tweets --message-body "hourly run"

Deploy function to take action on tweets.

We can now set up the “action” script that will trigger tweets when our dbt code suggests an action.  The process is very similar as pulling tweets.

First check your code works locally by running the following in one Terminal window:

functions_framework --signature-type event --target action_tweets

And calling the function in another window (this call uses sample data to simulate a Pub/Sub call):

curl localhost:8080 \

  -X POST \

  -H "Content-Type: application/json" \

  -d '{

        "context": {

          "eventId":"1144231683168617",

          "timestamp":"2020-05-06T07:33:34.556Z",

          "eventType":"google.pubsub.topic.publish",

          "resource":{

            "service":"pubsub.googleapis.com",

            "name":"projects/sample-project/topics/gcf-test",

            "type":"type.googleapis.com/google.pubsub.v1.PubsubMessage"

          }

        },

        "data": {

          "@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage",

          "attributes": {

             "attr1":"attr1-value"

          },

          "data": "d29ybGQ="

        }

      }'

Because we have no actions defined yet, this code should exit gracefully with a note that the actions table doesn’t exist yet.  It will be created automatically by dbt when we load our data models.

If the function is working correctly locally, we deploy to GCP:

gcloud functions deploy action_tweets --runtime python39 --trigger-topic action_tweets
This will create a new Pub/Sub topic ‘action_tweets’ that triggers the action_tweets function in twitter-data-stack.

You can run the cloud function by publishing a message on GCP:

gcloud pubsub topics publish action_tweets --message 'test run'

You can check that the function ran correctly by looking at Cloud Function logs:

gcloud functions logs read action_tweets --limit 10

(Note: Because this action does not do anything initially, you may not see any log items. This is a good sign, because you’d see errors if there were a problem.)

Schedule function to take action on tweets.
Set up a scheduled job to run once a day (I have it set to 3:05pm UTC, which is 8:05am Pacific Daylight Time and about one hour after the dbt job kicks off.  (I set 5 minutes after the hour because the pull_tweets job kicks off on the hour and usually runs for about a minute.):

gcloud scheduler jobs create pubsub action_tweets_job --schedule "5 15 * * *" --topic action_tweets --message-body "daily action run"


6. Clone dbt GitHub repo and set up dbt Cloud

Having direct access to GitHub is very advantageous with dbt Cloud, because you can enable features like CI.


Clone or fork the twitter-data-stack-dbt repo on GitHub.


If you want to configure your own rules to trigger tweets, they are in models/marts/actions/trigger_actions.sql on lines 42–50:

dbt trigger rules


If you don’t have one, create a dbt Cloud account for this project.

Go to set up integrations in your dbt Cloud profile and connect to your GitHub repos.
dbt GitHub integration

7. Create and configure dbt Cloud project

In dbt Cloud, go to Account Setting in the hamburger menu and create “New Project.”

Set up a connection to your new BigQuery database:
dbt Set Up Database



You can use your service account JSON key to make setup super easy:

dbt BigQuery settings


Test and continue.

For a fully-blown app or data analytics project, I would recommend setting up your own Github or Gitlab repo.  But for the purposes of this little exercise, it would work fine to directly clone the Github I built into a dbt Cloud managed repo.
dbt Set Up Repo


Skip inviting users for now.

Go to “Environments” in the menu and set up a deployment environment for dbt to work in.  This should use the same dataset name you created earlier for analyzed data.  (Here we are still using ‘twitter_analysis’.)
dbt Environment settings


Got to Jobs to set up a job to run every morning to process the latest Twitter data.

dbt Jobs page


I set the job to run at 2pm UTC, which is 7am Pacific Daylight Time (before I expect people on Pacific time to start tweeting again), but you could adjust if you’re emphasizing people in a different time zone.  Be sure to run source freshness and add a command for ‘dbt test’ to be sure our models pass.
Process Twitter Data dbt run


Be sure to include the call for ‘source freshness’ so that we only run our models if we have fresh data from Twitter.  Otherwise we risk creating actions based on missing data.  Note that I included the source freshness call as the first command (instead of ticking the “source freshness” box) — this ensures that the entire job fails if sources are not fresh, so we don’t accidentally generate “actions” based on stale data.


I also went ahead and set up an email notification for dbt if a production run fails:

dbt Email Notifications


If you run the job right away you’ll see the very simple DAG that this project creates.

"Twitter data stack" DAG


We source two primary tables:

  • (1) ‘tweets’ — the raw tweet data we’ve collected from Twitter (API reads)
  • (2) ‘activity’ — records of individual activity that we’ve triggered based on this app (API writes)

We look to ‘tweets’ to see when a user goes inactive (and thus needs a “u okay?” prompt).  We look to ‘activity’ to see when we’ve already prompted inactive users recently, so we hold off on harassing them every day!


Example tweets table
Example of ‘tweets’ table


Example activity table
Example of ‘activity’ table


Most of the rest of the DAG is basic staging, fact, and dimension tables (stg_tweets, stg_activity, fct_tweets, fct_activity, and dim_users).  The dim_users table is where we’ve done a bit of math based to understand relative frequencies of user tweeting.  This affects our decision for whether to “action” a user and send them a “r u okay?” prompt.


Example dim_users table
Example of ‘dim_users’ table



Finally, we create a ‘trigger_actions’ table, which holds the actions that we want to prompt.  That is, these are the users who have gone inactive that we want to send tweets!


Example trigger_actions table
Example of ‘trigger_actions’ table


Is it overkill to use dbt to materialize this table instead of just hacking a complex query in Python and triggering tweets directly?  Yes.  But we’re doing it because it’s a nice example of using dbt to break down complex analysis into very simple constituent parts.  And this way it’s easier to tweak dbt models via our git repo—and solicit PRs or other input—without needing to rewrite and redeploy our serverless functions.



➔ Additional Features

This is the point in the process where I thought about adding a Slack authorization.  When a message is set to be triggered, Slack sends a message prompting “yes/no” for whether to send the “r u okay?” tweet.

I considered this Slack integration for about one minute, then decided “not this weekend.”  Would definitely be a great addition if someone wanted to add it!  Shouldn’t be super complicated, just a slightly different implementation.

I also contemplated if there was a way to incorporate a reverse ETL tool (aka operational analytics) such as Census into the flow, as suggested in Claire’s original tweet.  I couldn’t think of anything that wasn’t completely redundant, given that we already have our desired actions in BigQuery as an output of our dbt model, and none of the popular reverse ETL tools have a Twitter integration.


➔ Takeaways

I went into this project fully expecting to be impressed by GCP, and BigQuery in particular.  I was not.

Google Cloud is certainly built “by engineers for engineers.” But not engineers who appreciate clarity or good UX.  Every single UI option seems to have a slide-in dropdown menu and 76 suboptions.  Even the error messages are useless!  I’m fairly technical (as “non-technical” people go) and some things were inscrutable to me.  I suspect this would be true for most people who are not very familiar with GCP.

My favorite error was:
No matching signature for operator > for argument types
What does that mean?  (After some Googling, turned out I was trying to compare a DATETIME and a TIMESTAMP, which BigQuery refuses to convert and compare natively.)

Interestingly, the GCP docs are actually very comprehensive.  But there are so many of them, and they are so detailed, that I found it hard to root out what I really needed.  My experience with Snowflake has been when you Google for an answer you’ll find one definitive source in the docs.  With GCP I would search something about “Cloud Functions” and I would get the “Cloud Functions” tutorial and the “Cloud Functions” section of the “Cloud Run” tutorial and the “Pub/Sub Tutorial” and a bunch of others.  It was so repetitive that I would get lost trying to find simple answers.

Most surprisingly, BigQuery is just so slow compared to Snowflake.  Both the web interface (console.cloud.google.com) and the database response itself felt interminable compared to using Snowflake, which is extremely responsive.  It felt like going back in time 10 years to call out from a local machine to a SQL Server instance running on a crappy server in a busy remote cluster.

There were also a handful of database aspects in BigQuery that surprised me.  Most notably:
  • No auto-increment column structure
  • No “default” column value (e.g., set a created_at timestamp)
I would have thought these are table stakes for a sophisticated database implementation—critical for data work (and also very common for engineering applications).  I’m genuinely confused how BigQuery doesn’t offer these capabilities.

So all in all, I learned a ton about GCP and BigQuery.  I firmly decided that I would prefer Snowflake to BigQuery as a data warehouse for any commercial application (and the consensus was already clear on Snowflake vs Redshift).  I'm not personally familiar enough with AWS to rule out the rest of GCP more broadly as a competitive cloud platform.  Maybe that will be my next project!

No comments:

Post a Comment