Twitter Live Stream with Kafka

Caesario Kisty
11 min readDec 5, 2022

In case your business needs insight and decisions that need to be taken immediately, your data must be able to travel from source to destination instantly. As an example, you are required to analyze tweets during the World Cup 2022. Building a Streaming Data Pipeline is one of the skills that Data Engineers should possess. I will demonstrate how to build a streaming data pipeline from the Twitter API to Kafka, then store it in BigQuery, and visualize it using Looker Studio. This article includes the following sections:

  1. Setup Twitter API
  2. Create Google Compute Engine
  3. Installing Apache Kafka
  4. Setup Google BigQuery
  5. Create Kafka Producer and Kafka Consumer
  6. Create Service Account for Kafka Consumer
  7. Visualize Data with Looker Studio

Setup Twitter API

It is necessary to have a Twitter account before we can use the Twitter API. I already have one, so I headed over to Twitter’s Developer Platform and signed in with my account. After successfully signing into it, I created a project and an app as they asked. Then voila! I obtained my API Key, API Secret Key, and Bearer Token for later access to the Twitter API. For more details, you can check this site.

Create a Google Compute Engine

To get started with Google Compute Engine, you must first log in to your Google Cloud Console and build a new project. After creating a project, navigate to the Compute Engine section of the Cloud Console to create a virtual machine (VM). Creating a VM instance requires specifying a few details. These details include the type of machine to use, the operating system to install, and the amount of CPU and memory to allocate to the instance. In my case, I set up an instance with the name “VM-1” and chose Asia-East1 Zone B as a region for my Virtual Machine. In Google Compute Engine we can specify the CPU technology that will be used, and I use e2-medium with 2 core CPU and 4 GB memory for my Virtual Machine. As far as this use case is concerned, I think that’s enough.

Next, I use Ubuntu 20.04 LTS with a 10 GB default disk.

During the configuration of this instance, billing for usage should be kept in mind. Pricing is estimated based on the configuration of the VM. There are several variables to consider, such as region, CPU technology, the operating system, etc (Compute Engine Pricing). As a result of that configuration, I obtained my monthly pricing estimate.

Now that my instance is created, I can start using it.

Installing Apache Kafka

On this site, you can find instructions for installing Apache Kafka as a quick start. Also, I followed that step to install my Kafka cluster in my Compute Engine. For simplicity, I downloaded the latest Kafka release version from this and then extracted it. After that, configure zookeeper properties (bin/zookeeper.properties) and server properties (bin/server.properties). Finally, run both the Zookeeper and Kafka servers. In my case, they are run in the background.

Create a systemd unit file for Zookeeper with the following command:

sudo vim /etc/systemd/system/zookeeper.service

Next, add the following content to the zookeeper. service:

[Unit]
Description=ZookeeperService
Requires=network.target
After=network.target

[Service]
Type=simple
User=YOUR_USERNAME
WorkingDirectory=/path_to_your_kafka_folder/config/
ExecStart=/path_to_your_kafka_folder/bin/zookeeper-server-start.sh zookeeper.properties
ExecStop=/path_to_your_kafka_folder/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Then, run the below command to start the Zookeeper Service:

sudo service zookeeper start
sudo service zookeeper status

While the Zookeeper Service is running, I create a unit file for Kafka Service with this command:

sudo vim /etc/systemd/system/kafka.service

Next, add the following content to the kafka.service:

[Unit]
Description=Kafka Service
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=YOUR_USERNAME
WorkingDirectory=/path_to_your_kafka_folder/config/
ExecStart=/path_to_your_kafka_folder/bin/kafka-server-start.sh server.properties
ExecStop=/path_to_your_kafka_folder/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Finally, run the below command to start the Kafka Service:

sudo service kafka start
sudo service kafka status

Setup Google BigQuery

Google BigQuery is a cloud-based data warehouse and analytics platform that allows us to easily explore large datasets using SQL-like queries. It is designed to handle massive amounts of data, making it a popular choice for businesses and organizations that need to process large amounts of data quickly and efficiently. With BigQuery, we can analyze data in real time, which allows for faster decision-making and more accurate insights. That is the reason why I am using this to handle Streaming Data Pipelines. In addition, BigQuery integrates seamlessly with other Google Cloud products, making it easy for us to integrate their data with other tools and services. Next part I will demonstrate how to integrate BigQuery and Looker Studio.

The first thing to do to set up BigQuery is create a dataset in my GCP project. The dataset in BigQuery can be considered analogous to a database in a traditional RDBMS. In both cases, the dataset or database serves as a container for storing and organizing data. It’s quite easy to create a dataset in BigQuery.

  1. Open the BigQuery web UI.
  2. Click on the project that I want to create the dataset.
  3. Click on the “Create dataset” button in the top-right corner of the page.
  4. In the “Create dataset” dialog box, I enter the name “live_stream_tweet” and select a location for the dataset.
  5. Click on the “Create” button to create the dataset.

Similar concept to a database in an RDBMS, we can set up a table within BigQuery. Choose the dataset you want to create the table in and click the “Create table” button. In the dialog box, enter the name “worldcup_table” and select an empty table. Afterward, I design a scheme for the table based on the data format I receive from Kafka Consumer.

username:STRING,
name:STRING,
verified:BOOLEAN,
profile_image:STRING,
text:STRING,
retweet_count:INTEGER,
reply_count:INTEGER,
like_count:INTEGER,
quote_count:INTEGER,
tweet_source:STRING,
created_at:DATETIME

Then, click the “Create Table” button.

Create Kafka Producer and Kafka Consumer

In this part, I create a couple of python programs as a Producer and Consumer for Kafka. To deal with it, I use the kafka-python library. kafka-python provides a convenient and easy-to-use API for working with Kafka in Python. It allows Python developers to create, produce, and consume Kafka topics, as well as manage and monitor Kafka clusters. I use pip to install Kafka-Python by running the following command.

pip install kafka-python

First, Kafka Producer. It is just a simple program. I ingest the data from Twitter API, transform the data as needed, and send the data to the Kafka Topic. To deal with Twitter API, I use the tweepy library. With tweepy, developers can easily access and interact with the Twitter API using Python. It includes support for searching, posting, and streaming tweets, as well as managing and monitoring Twitter accounts. It’s also very simple to install tweepy, just running the following command in my Compute Engine.

pip install tweepy

For Kafka Producer simple program, you can find the source code below.

import tweepy
from json import dumps, loads
from kafka import KafkaProducer
from datetime import datetime, timedelta

producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))

class MyStreamer(tweepy.StreamingClient):

def on_data(self, data):
get_data = loads(data.decode("utf-8"))

new_data = {}
new_data['username'] = get_data['includes']['users'][0]['username']
new_data['name'] = get_data['includes']['users'][0]['name']
new_data['verified'] = get_data['includes']['users'][0]['verified']
new_data['profile_image'] = get_data['includes']['users'][0]['profile_image_url']
new_data['text'] = get_data['data']['text']
new_data['retweet_count'] = get_data['data']['public_metrics']['retweet_count']
new_data['reply_count'] = get_data['data']['public_metrics']['reply_count']
new_data['like_count'] = get_data['data']['public_metrics']['like_count']
new_data['quote_count'] = get_data['data']['public_metrics']['quote_count']
new_data['tweet_source'] = get_data['data']['source']
temp = datetime.strptime(get_data['data']['created_at'], "%Y-%m-%dT%H:%M:%S.%fZ")
new_data['created_at'] = str(temp + timedelta(hours = 7))

producer.send("tweet-topic-worldcup",new_data)

streamer = MyStreamer("PUT_YOUR_BEARER_TOKEN_HERE")

previousRules = streamer.get_rules().data
print(previousRules)
if previousRules:
streamer.delete_rules(previousRules)

streamer.add_rules(tweepy.StreamRule("(World Cup OR WorldCup OR Qatar OR Football OR #Qatar2022) -is:retweet"))

streamer.filter(
tweet_fields=["created_at", "lang", "public_metrics", "source"],
expansions=["author_id"],
user_fields=["username", "name", "location", "profile_image_url",
"public_metrics", "url", "verified"],
)

I will give a brief explanation of the source code above. I import a few libraries as needed, such as tweepy and kafka. Then I prepared the Kafka Producer with the KafkaProduce module, with assigned some parameters like bootstrap_servers and value_serializer. The bootstrap_server is localhost because the source code will run in the same host as the Kafka cluster. The value_serializer is set with the purpose to dump the data into string type as needed by Kafka Topic. Then later, the data will send to the topic with the name ”tweet-topic-worldcup”.

For streaming the tweet, I use the StreamingClient module in tweepy. Set up the bearer token in class that I build as needed by StreamingClient. With the on_data function, I get the data stream from Twitter, then load it in a JSON type. In every tweet, I only chose a few data that I need, such as username, display name, the text of a tweet, created time of a tweet, etc. The transformed data is stored in the python dictionary type, then send to the Kafka Topic. Additionally, I use add_rules to specify keywords in Twitter Streaming based on my use case.

Next, Kafka Consumer. The scenario consists of consuming the data, then sending it to BigQuery, sounds simple, right? To perform BigQuery in my code, I install a google-cloud-bigquery library. This library provides a Python API for working with BigQuery, allowing us to easily create, query, and manage BigQuery datasets and tables from our Python code. To install that library is just running the following command.

pip install google-cloud-bigquery

Here is the Kafka Consumer source code.

from google.cloud import bigquery
from kafka import KafkaConsumer
from json import loads
from time import sleep
import os

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "PATH_TO_YOUR_SERVICE_ACCOUNT_JSON_KEY"

consumer = KafkaConsumer(
'tweet-topic-worldcup',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)

PROJECT_ID = "PUT_YOUR_GCP_PROJECT_ID"
DATASET_ID = "live_stream_tweet"
TABLE_ID = "worldcup_data"

client = bigquery.Client()

for event in consumer:
data = event.value
errors = client.insert_rows_json(
f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}", [data]
)

I know, maybe you confuse by Service Account in the code above. No worries, I will explain it later. Just keep in your mind that it should be specified in your code. So that my code is able to consume the data from the topic, I use the KafkaConsumer module with some parameter that is assigned. I set auto_offset_reset to “latest” so that it just retrieves the latest tweet when the Kafka Consumer is run. Next, I set the PROJECT_ID, DATASET_ID, and TABLE_ID according to the BigQuery settings I had previously defined. The data is sent to BigQuery using the insert_rows_json function.

Then look there! The data is successfully stored in BigQuery.

Create Service Account for Kafka Consumer

In keeping with my promise, I will explain Service Accounts to you in detail. Service Accounts are used to authenticate applications and services that are running on GCP, allowing them to access other Google Cloud services. In my case, Service Account is used to authenticate my Compute Engine when connecting to BigQuery. Service Accounts are an important part of the GCP security model, more specifically related to both the zero trust architecture and the principle of least privilege.

Service Accounts in GCP can help to implement the zero trust architecture by providing a way to grant access to GCP resources and APIs based on the identity of the service or application trying to access them. This allows us to grant access to GCP resources only to specific applications and services that have been authenticated, helping to prevent unauthorized access to our GCP resources.

Service Accounts in GCP also can help to implement the principle of least privilege by allowing you to grant applications and services only the specific permissions they need to access GCP resources. For example, if we have an application that only needs read-only access to a Cloud Storage bucket (I will explain about this in another article later), we can create a service account and grant it read-only access to the bucket. This will prevent the application from being able to write to the bucket or access other GCP resources, helping to ensure that it only has the minimum permissions it needs to perform its tasks.

Then how to create a service account JSON key? Follow these steps:

  1. On my Google Cloud Console, I headed over to the “IAM & Admin” section, and click on the “Service accounts” page.
  2. Click on the “Create service account” button.
  3. I named the service account “kafka-to-bigquery” and select a role for the service account. I assigned a role for this JSON key as a “BigQuery Data Editor”.
  4. Click on the “Create” button to create the service account.
  5. Click on the “Create key” button in the “Keys” section of the service account page.
  6. In the “Create key” dialog box, select the JSON key type, and then click on the “Create” button.
  7. My service account JSON key will be downloaded to my computer. Upload it to my Compute Engine so that can be used by my Kafka Consumer code.

Visualize Data with Looker Studio

Woohooo… We have reached the last part of this article. I will visualize the data in BigQuery using Looker Studio. These applications can be used to create interactive dashboards, reports, and visualizations that provide insights into our data.

How to build a visualization with it? Take a look at this site and click on the tab “Connect to Data ”. We will is shown by many sources of data that are able to connect. Obviously, I chose BigQuery, then selected the project, dataset, and table that aim at my data in BigQuery before.

If the looker studio is able to connect with a table in BigQuery, then they will redirect the page to the canvas which can be used to design our dashboard or visualization.

On that canvas, we can build a visualization depending on the scenario that we make. Select the type of visualization we want to create from the list of available options. This could be a bar chart, line chart, pie chart, scatter plot, or any other type of visualization supported by Looker Studio. I recommend you visit this site, to learn more about how to choose the right Data Visualization.

Here are a few visualizations that build for this use case.

The most active Twitter user
Tweet sources as a percentage
The trend of how many tweets were sent every minute

Well, thanks for reading this article. This is just one example of how to build a Streaming Data Pipeline. Next, have you ever thought to build streaming processing from a database source? Is it possible? See you in my further article.

Thank you :)

--

--