Kafka Python Tutorial for Fast Data Architecture

Kafka Python Tutorial for Fast Data Architecture

(Last Updated On: September 22, 2018)

In this Kafka python tutorial we will create a python application that will publish data to a Kafka topic and another app that will consume the messages.

Fast Data Series Articles

  1. Installing Apache Mesos 1.6.0 on Ubuntu 18.04
  2. Kafka Tutorial for Fast Data Architecture
  3. Kafka Python Tutorial for Fast Data Architecture
  4. Apache Spark Tutorial | Fast Data Architecture Series

This is the third article in my Fast Data Architecture series that walks you through implementing Bid Data using a SMACK Stack.  This article builds on the others so if you have not read through those, I highly suggest you do so that you have the infrastructure you need to follow along in this tutorial.

This article will walk you though pulling web site metrics from Clicky.com.  I have another article where we will pull metrics from Google Analytics and publish the metrics to Apache Kafka: Kafka Python and Google Analytics.

Example Application Architecture

In order to demonstrate how to analyze your big data we will be configuring a big data pipeline that will pull site metrics from Clicky.com and push those metrics to a Kafka topic on our Kafka Cluster.

This is just one pipeline that you might want to implement in your Big Data Implementation.  Web site statistics can be a valuable part of your data as this can give you data about web site visitors, pages visited, etc.  Combine this data with other data like social media shares when you perform your data analytics and you would be able to make some pretty neat business decisions about when is the best time for you to post site updates to social media in order to attract the most visitors.  That is main benefit of implementing big data: not necessarily the raw data itself but the business knowledge you can extract from that raw data and make more informed business decisions.

kafka python tutorial site stats intake diagram

In this example, we will pull the ‘pages‘ statistics from the Clicky.com API and push them to the admintome-pages Kafka topic.  This will give us JSON data of AdminTome’s top pages.

Clicky Web Analytics

In order to fully follow along in this article you will need to have a website linked to Clicky.com.  It’s free so why not.  Register your site at clicky.com.  I personally use it because it has better metrics reporting for blogs (like abandon rate) than Google Analytics gives.  You will need to add some code to your page so that clicky can start collecting metrics.

After you page is sending metrics to clicky you will need to get some values in order to use the Clicky API and pull metrics from our python application.  Go to preferences for your site and you will see two numbers that we will need:

  • Site ID
  • Site key

Don’t publish these anywhere because they could give anyone access to your website data.  We will need these numbers later when we connect to the API and pull our site statistics.

Preparing Kafka

First, we need to prepare our Kafka Cluster by adding a topic to our Kafka cluster that we will use to send messages to.  As you can see from the diagram above our topic in Kafka is going to be admintome-pages.

Login to the Mesos Master you ran Kafka-mesos from .  If you followed the previous article, the master I used was mesos1.admintome.lab.  Next, we will create the topic using the kafka-mesos.sh script:

$ cd kafka/
$ ./kafka-mesos.sh topic add admintome-pages --broker=0 --api=http://mslave2.admintome.lab:7000

Notice that the API parameter points to the Kafka scheduler we created using kafka-mesos in the last article.  You can verify that you now have the correct topics:

$ ./kafka-mesos.sh topic list --api=http://mslave2.admintome.lab:7000
topics:
name: __consumer_offsets
partitions: 0:[0], 1:[0], 2:[0], 3:[0], 4:[0], 5:[0], 6:[0], 7:[0], 8:[0], 9:[0], 10:[0], 11:[0], 12:[0], 13:[0], 14:[0], 15:[0], 16:[0], 17:[0], 18:[0], 19:[0], 20:[0], 21:[0], 22:[0], 23:[0], 24:[0], 25:[0], 26:[0], 27:[0], 28:[0], 29:[0], 30:[0], 31:[0], 32:[0], 33:[0], 34:[0], 35:[0], 36:[0], 37:[0], 38:[0], 39:[0], 40:[0], 41:[0], 42:[0], 43:[0], 44:[0], 45:[0], 46:[0], 47:[0], 48:[0], 49:[0]
options: segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

name: admintome
partitions: 0:[0]

name: admintome-pages
partitions: 0:[0]

And there is our new topic ready to go!  Now it’s time to get to the fun stuff and start developing our python application

Kafka Python Tutorial

Now that we have Kafka ready to go we will start to develop our Kafka producer.  The producer will get page metrics from the Clicky API and push those metrics in JSON form to our topic that we created earlier.

I assume that you have Python 3 installed on your system and virtualenv installed as well.

To get started we will need to setup our environment.

$ mkdir ~/Development/python/venvs
$ mkdir ~/Development/python/site-stats-intake
$ cd ~/Development/python/site-stats-intake
$ virtualenv ../venvs/intake
$ source ../venvs/intake/bin/activate
(intake) $ pip install kafka-python requests
(intake) $ pip freeze > requirements.txt

Next we need to create our classes.

Clicky Class

We will create a new python class called Clicky that we will use to interact with the Clicky API.  Create a new file called clicky.py and add the following contents:

import requests
import json


class Clicky(object):

    def __init__(self, site_id, sitekey):
        self.site_id = site_id
        self.sitekey = sitekey
        self.output = "json"

    def get_data(self, data_type):
        click_api_url = "https://api.clicky.com/api/stats/4"
        payload = {"site_id": self.site_id,
                   "sitekey": self.sitekey,
                   "type": data_type,
                   "output": self.output}
        response = requests.get(click_api_url, params=payload)
        raw_stats = response.text
        return raw_stats

    def get_pages_data(self):
        data = self.get_data("pages")
        return json.loads(data)

Save the file and exit.

In order to get our metrics we need to send an HTTP GET request to the Clicky API URL which is

https://api.clicky.com/api/stats/4

We also need to include several parameters:

  • site_id: This is the Site ID number that we got earlier
  • sitekey: This is the Site key number that also got  earlier
  • type: To get our top pages we set the type to ‘pages’
  • output: We set this to “json” so that the API will return JSON data

Finally we call the requests python module to perform an HTTP GET to our API URL with the parameters we specified.  In the get_pages_data method we return a dict that represents our JSON data.  Next, we will code our Kafka class implementation.

MyKafka Class

This class will interact with our Kafka cluster and push web site metrics to our topic for us.  Create a new file called mykafka.py and add the following contents:

from kafka import KafkaProducer
import json


class MyKafka(object):

    def __init__(self, kafka_brokers):
        self.producer = KafkaProducer(
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            bootstrap_servers=kafka_brokers
        )

    def send_page_data(self, json_data):
        self.producer.send('admintome-pages', json_data)

First, we import the kafka-python library specifically the KafkaProducer class that will let us code a Kafka producer and publish messages to our Kafka Topic.

from kafka import KafkaProducer

We now define our MyKafka class and create the constructor for it:

class MyKafka(object):
    def __init__(self, kafka_brokers):

This takes an argument that represents the kafka brokers that will be used to connect to our Kafka cluster.  This an array of strings in the form of:

[ "broker:ip", "broker:ip" ]

We will use only one broker where is the one we created in the last article: mslave1.admintome.lab:31000:

[ "mslave1.admintome.lab:31000" ]

We next instantiate a new KafkaProducer object named producer.  Since we will be sending data to Kafka in the form of JSON we tell the KafkaProducer to use the JSON decoder dumps to parse the data using the value_serializer parameter.  We also tell it to use our brokers with the bootstrap_servers parameter.

self.producer = KafkaProducer(
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            bootstrap_servers=kafka_brokers
        )

Finally, we create a new method that we will use to send the messages to our admintome-pages topic:

def send_page_data(self, json_data):
    self.producer.send('admintome-pages', json_data)

That’s all there is to it.  Now we will write our Main class that will control everything.

Main Class

Create a new file called main.py and add the following contents:

from clicky import Clicky
from mykafka import MyKafka
import logging
import time
import os
from logging.config import dictConfig


class Main(object):

    def __init__(self):
        if 'KAFKA_BROKERS' in os.environ:
            kafka_brokers = os.environ['KAFKA_BROKERS'].split(',')
        else:
            raise ValueError('KAFKA_BROKERS environment variable not set')

        if 'SITE_ID' in os.environ:
            self.site_id = os.environ['SITE_ID']
        else:
            raise ValueError('SITE_ID environment variable not set')

        if 'SITEKEY' in os.environ:
            self.sitekey = os.environ['SITEKEY']
        else:
            raise ValueError('SITEKEY environment variable not set')

        logging_config = dict(
            version=1,
            formatters={
                'f': {'format':
                      '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'}
            },
            handlers={
                'h': {'class': 'logging.StreamHandler',
                      'formatter': 'f',
                      'level': logging.DEBUG}
            },
            root={
                'handlers': ['h'],
                'level': logging.DEBUG,
            },
        )
        self.logger = logging.getLogger()

        dictConfig(logging_config)
        self.logger.info("Initializing Kafka Producer")
        self.logger.info("KAFKA_BROKERS={0}".format(kafka_brokers))
        self.mykafka = MyKafka(kafka_brokers)

    def init_clicky(self):
        self.clicky = Clicky(self.site_id, self.sitekey)
        self.logger.info("Clicky Stats Polling Initialized")

    def run(self):
        self.init_clicky()
        starttime = time.time()
        while True:
            data = self.clicky.get_pages_data()
            self.logger.info("Successfully polled Clicky pages data")
            self.mykafka.send_page_data(data)
            self.logger.info("Published page data to Kafka")
            time.sleep(300.0 - ((time.time() - starttime) % 300.0))


if __name__ == "__main__":
    logging.info("Initializing Clicky Stats Polling")
    main = Main()
    main.run()

The end state of this example is to build a Docker container that we will then run on Marathon.  With that in mind, we don’t want to hard code some of our sensitive information (like our clicky site id and sitekey) in our code.  We want to be able to pull those from environment variables.  If they are not set then we through an exception and exit out.

        if 'KAFKA_BROKERS' in os.environ:
            kafka_brokers = os.environ['KAFKA_BROKERS'].split(',')
        else:
            raise ValueError('KAFKA_BROKERS environment variable not set')

        if 'SITE_ID' in os.environ:
            self.site_id = os.environ['SITE_ID']
        else:
            raise ValueError('SITE_ID environment variable not set')

        if 'SITEKEY' in os.environ:
            self.sitekey = os.environ['SITEKEY']
        else:
            raise ValueError('SITEKEY environment variable not set')

We also configure logging so that we can see what is going on with our application.  I have coded an infinite loop in our code that will poll clicky and push the metrics to our Kafka topic every five minutes.

    def run(self):
        self.init_clicky()
        starttime = time.time()
        while True:
            data = self.clicky.get_pages_data()
            self.logger.info("Successfully polled Clicky pages data")
            self.mykafka.send_page_data(data)
            self.logger.info("Published page data to Kafka")
            time.sleep(300.0 - ((time.time() - starttime) % 300.0))

Save the file and exit.

Running our application

To test that everything works you can try running the application after you set your environment variables:

(intake) $ export KAFKA_BROKERS="mslave1.admintome.lab:31000"
(intake) $ export SITE_ID="{your site id}"
(intake) $ export SITEKEY="{your sitekey}"
(intake) $ python main.py
2018-06-25 15:34:32,259 root INFO Initializing Kafka Producer
2018-06-25 15:34:32,259 root INFO KAFKA_BROKERS=['mslave1.admintome.lab:31000']
2018-06-25 15:34:32,374 root INFO Clicky Stats Polling Initialized
2018-06-25 15:34:32,754 root INFO Successfully polled Clicky pages data
2018-06-25 15:34:32,755 root INFO Published page data to Kafka

We are now sending messages to our Kafka Topic!   We will build our Docker container next and deploy it to Marathon.  Finally, we will wrap up by writing a test consumer that will get our messages from our topic.

I have created a GitHub repository for all the code used in this article: https://github.com/admintome/clicky-state-intake

Create a Docker container

Now that we have our application code written, we can create a docker container so that we can deploy it to Marathon.  Create a Dockerfile file in your application directory with the following contents:

FROM python:3

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD [ "python", "./main.py" ]

Build the container

$ docker build -t  {your docker hub username}site-stats-intake .

After docker is completed you will want to push it to your docker repository that your Mesos Slaves have access to.  For me this is Docker Hub:

$ docker push -t admintome/site-stats-intake

Then login to each of your Mesos slaves and pull the image down

$ docker pull admintome/site-stats-intake

We are now ready to create a Marathon application deployment for our application.

Deploying to Marathon

Go to your Marathon GUI.

http://mesos1.admintome.lab:8080

Click on the Create Application Button. Then click the JSON mode button:

Paste in the following JSON code:

{
  "id": "site-stats-intake",
  "cmd": null,
  "cpus": 1,
  "mem": 128,
  "disk": 0,
  "instances": 1,
  "container": {
    "docker": {
      "image": "admintome/site-stats-intake"
    },
    "type": "DOCKER"
  },
  "networks": [
    {
      "mode": "host"
    }
  ],
  "env": {
    "KAFKA_BROKERS": "192.168.1.x:port",
    "SITE_ID": "{your site_id}",
    "SITEKEY": "{your sitekey}"
  }
}

Be sure to substitute the correct values for KAFKA_BROKERS, SITE_ID, and SITEKEY in the env section for your environment.

Finally, click on the Create Application button to deploy the application.  After a few seconds you should see the application is Running.

To see the logs click on the site-stats-intake application then click on the stderr link to download a text file containing the logs.

Now that we have our application deployed to Marathon we will write a short consumer that we will run on our development system to show us what messages have been received.

Write a Python Kafka Consumer

This will be a simple Kafka consumer that will check out topic and display all messages on the topic.  Not really useful at this point but it lets us know that our little polling application is working correctly.

Create a new file called consumer.py and add the following contents:

import sys
from kafka import KafkaConsumer

consumer = KafkaConsumer('admintome-pages', bootstrap_servers="mslave1.admintome.lab:31000",
                         auto_offset_reset='earliest')

try:
    for message in consumer:
        print(message.value)
except KeyboardInterrupt:
    sys.exit()

Save and exit the file.  This has the Kafka broker hardcoded because we simply using it to test everything.  Make sure to update the bootstrap-servers parameter with your broker name and port.

Now run the command and you should see a ton of JSON that represents your most visited pages:

(intake) $ python consumer.py
b'[{"type": "pages", "dates": [{"date": "2018-06-25", "items": [{"value": "145", "value_percent": "43.2", "title": "Kafka Tutorial for Fast Data Architecture - AdminTome Blog", "stats_url": "http://clicky.com/stats/visitors?site_id=101045340&date=2018-06-25&href=%2Fblog%2Fkafka-tutorial-for-fast-data-architecture%2F", "url": "http://www.admintome.lab/blog/kafka-tutorial-for-fast-data-architecture/"},...

What’s Next?

We now have a data pipeline that has some data that we can use.  The next step will be to use that data and analyze it.  In the article, we will install and configure the next part of our SMACK stack which is Apache Spark.  We will also configure it analyze our data and give us something meaningful.

Need more Python training?

Checkout these awesome Python courses on Pluralsight.  Pluralsight has great video training courses at a great price.  I use Pluralsight every time I need to learn something new.

5 thoughts on “Kafka Python Tutorial for Fast Data Architecture

Leave a Comment

you're currently offline