Data Pipeline: Send logs from Kafka to Cassandra

Data Pipeline: Send logs from Kafka to Cassandra

(Last Updated On: September 22, 2018)

In this post, I will outline how I created a big data pipeline for my web server logs using Apache Kafka, Python, and Apache Cassandra.

In past articles I described how to install and configure Apache Kafka and Apache Cassandra.  I assume that you already have a Kafka broker running with a topic of www_logs and a production ready Cassandra cluster running.  If you don’t then please follow the articles mentioned in order to follow along with this tutorial.

In this post, we will tie them together to create a big data pipeline that will take web server logs and push them to an Apache Cassandra based data sink.

This will give us the opportunity to go through our logs using SQL statements and possible other benefits like applying machine learning to predict if there is an issue with our site.

Here is the basic diagram of what we are going to configure:

data pipeline example using kafka, python, and cassandra

Lets see how we start the pipeline by pushing log data to our Kafka topic.

Pushing logs to our data pipeline

Apache Web Server logs to /var/logs/apache.  For this tutorial, we will work with the Apache access logs which show requests to the web server.  Here is an example:

108.162.245.143 - - [08/Aug/2018:17:44:40 +0000] "GET /blog/terraform-taint-tip/ HTTP/1.0" 200 31281 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"

Log files are simply text files where each line is a entry in the log file.

In order to easily read our logs from a Python application that we will write later, we will want to convert these log lines into JSON data and add a few more fields.

Here is what our JSON will look like:

{
  "log": {
    "source": "",
    "type": "",
    "datetime": "",
    "log": ""
  }
}

The source field is going to be the hostname of our web server.  The type field is going to let us know what type of logs we are sending.  In this case it will be ‘www_access’ since we are going to send Apache access logs.  The datetime field will hold the timestamp value of when the log was created.  Finally, the log field will contain the entire line of text representing the log entry.

I created a sample python application that takes these logs and forwards them to kafka.  You can find it on GitHub at admintome/logs2kafka.  Let’s look at the forwarder.py file in more detail:

import time
import datetime
import socket
import json
from mykafka import MyKafka


def parse_log_line(line):
    strptime = datetime.datetime.strptime
    hostname = socket.gethostname()
    time = line.split(' ')[3][1::]
    entry = {}
    entry['datetime'] = strptime(
        time, "%d/%b/%Y:%H:%M:%S").strftime("%Y-%m-%d %H:%M")
    entry['source'] = "{}".format(hostname)
    entry['type'] = "www_access"
    entry['log'] = "'{}'".format(line.rstrip())
    return entry


def show_entry(entry):
    temp = ",".join([
        entry['datetime'],
        entry['source'],
        entry['type'],
        entry['log']
    ])
    log_entry = {'log': entry}
    temp = json.dumps(log_entry)
    print("{}".format(temp))
    return temp


def follow(syslog_file):
    syslog_file.seek(0, 2)
    pubsub = MyKafka(["mslave2.admintome.lab:31000"])
    while True:
        line = syslog_file.readline()
        if not line:
            time.sleep(0.1)
            continue
        else:
            entry = parse_log_line(line)
            if not entry:
                continue
            json_entry = show_entry(entry)
            pubsub.send_page_data(json_entry, 'www_logs')


f = open("/var/log/apache2/access.log", "rt")
follow(f)

The first thing we do is open the log file /var/log/apache2/access.log for reading.  We then pass that file to our follow() function where our application will follow the log file much like  tail -f /var/log/apache2/access.log would.

If the follow function detects that a new line exists in the log it converts it to JSON using the parse_log_line() function.  It then uses the send_page_data() function of MyKafka to push the JSON message to the www_logs topic.

Here is the MyKafka.py python file:

from kafka import KafkaProducer
import json


class MyKafka(object):

    def __init__(self, kafka_brokers):
        self.producer = KafkaProducer(
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            bootstrap_servers=kafka_brokers
        )

    def send_page_data(self, json_data, topic):
        result = self.producer.send(topic, key=b'log', value=json_data)
        print("kafka send result: {}".format(result.get()))

This simply calls KafkaProducer to send our JSON as a key/value pair where the key is the string ‘log’ and the value is our JSON.

Now that we have our log data being pushed to Kafka we need to write a consumer in python to pull messages off the topic and save them as a row in a Cassandra table.

But first we should prepare Cassandra by creating a Keyspace and a table to hold our log data.

Preparing Cassandra

In order to save our data to Cassandra we need to first create a Keyspace in our Cassandra cluster.  Remember that a keyspace is how we tell Cassandra a replication strategy for any tables attached to our keyspace.

Let’s start up CQLSH.

$ bin/cqlsh cass1.admintome.lab
Connected to AdminTome Cluster at cass1.admintome.lab:9042.
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> 

Now run the following query to create our keyspace.

CREATE KEYSPACE admintome WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}  AND durable_writes = true;

Now run this query to create our logs table.

CREATE TABLE admintome.logs (
    log_source text,
    log_type text,
    log_id timeuuid,
    log text,
    log_datetime text,
    PRIMARY KEY ((log_source, log_type), log_id)
) WITH CLUSTERING ORDER BY (log_id DESC)

Essentially, we are storing time series data which represents our log file information.

You can see that we have a column for source, type, datetime, and log that match our JSON from the previous section.

We also have another row called log_id that is of the type timeuuid.  This creates a unique UUID from the current timestamp when we insert a record into this table.

Cassandra stores one row per partition.  A partition in Cassandra is identified by the PRIMARY KEY.  In this example, our PK is a COMPOSITE PRIMARY KEY where we use both the log_source and the log_type values as a primary key.

So for our example, we are going to create a single partition in Cassandra consisting of the primary key (‘www2’,’www_access).  The hostname of my web server is www2 so that is what log_source is set to.

We also set the Clustering Key to log_id.  These are guaranteed unique keys so we will be able to have multiple rows in our partition.

If I lost you there don’t worry, it took me a couple of days and many headaches to understand it fully.  I will be writing another article soon detailing why the data is modeled in this fashion for Cassandra.

Now that we have our Cassandra keyspace and table ready to go, we need to write our Python consumer to pull the JSON data from our Kafka topic and insert that data into our table as a new row.

Python Consumer Application

I have posted the source code to the kafka2cassandra python application on GitHub at admintome/kafka2cassandra.

We use the same Kafka Python module that we used in our producer code above, but instead we will use KafkaConsumer pull messages off of our topic.  We then will use the Python Cassandra Drive module from Datastax to insert a row into our table.

Here is the code for the poller.py file:

import sys
from kafka import KafkaConsumer
import json
from cassandra.cluster import Cluster

consumer = KafkaConsumer(
    'www_logs', bootstrap_servers="mslave2.admintome.lab:31000")

cluster = Cluster(['192.168.1.47'])
session = cluster.connect('admintome')

# start the loop
try:
    for message in consumer:
        entry = json.loads(json.loads(message.value))['log']
        print("Entry: {} Source: {} Type: {}".format(
            entry['datetime'],
            entry['source'],
            entry['type']))
        print("Log: {}".format(entry['log']))
        print("--------------------------------------------------")
        session.execute(
            """
INSERT INTO logs (log_source, log_type, log_datetime, log_id, log)
VALUES (%s, %s, %s, now(), %s)
""",
            (entry['source'],
             entry['type'],
             entry['datetime'],
             entry['log']))
except KeyboardInterrupt:
    sys.exit()

This is a simple loop where we use KafkaConsumer to pull a message off the Kafka topic.  I have no idea why but I only got a proper python dictionary when I called json.loads() twice from log JSON data returned from KafkaConsumer.

If you find out why please post in the comments, I would love to know why.

We also create a connection to our Cassandra cluster and connect to our admintome keyspace with these two lines:

cluster = Cluster(['192.168.1.47'])
session = cluster.connect('admintome')

We then insert our JSON data (which is now stored in the entry dict) to our logs table in Cassandra.

"""
INSERT INTO logs (log_source, log_type, log_datetime, log_id, log)
VALUES (%s, %s, %s, now(), %s)
""",
            (entry['source'],
             entry['type'],
             entry['datetime'],
             entry['log']))

With Cassandra you always have to specify all the rows when you do an insert.  In the values section notice that we are using the now() CQL function to create a timeuuid value from the current timestamp.

Deploying our consumer to Kubernetes.

We want this consumer to always be running so we are going to use Kubernetes to deploy a docker container that runs this script for us.

You don’t have to complete this section to continue.  We already have a fully running pipeline.

We can use this Dockerfile to build our docker container.

FROM python:3

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD [ "python", "-u", "./poller.py" ]

Notice in the last line we tell it to run python -u?  That will tell python to display the text unbuffered.  This will allow us to see the output of our python application running in the docker container correctly.

Build the docker container and push it to your docker registry that your Kubernetes cluster is using.

Now create a kafka2cassandra.yaml file on your Kubernetes management system (the system you have KUBECTL installed on to manage your Kubernetes cluster) and add these contents:

apiVersion: v1
kind: Pod
metadata:
  name: kafka2cassandra
spec:
  containers:
    - name: kafka2cassandra
      image: admintome/kafka2cassandra
      stdin: true
      tty: true

Make sure to update the image parameter with the actual image location that you pushed your docker container to.

Also notice that we set stdin and tty to true in our Pod definition.  This is so we can see the text logging from our python script from Kubernetes correctly.

Now deploy the pod with

$ kubectl create -f kafka2cassandra.yaml

You see the pod successfully start and if you check the logs you will see that it is successfully pulling messages off of our Kafka topic and pushing the data to our Cassandra table.

Now it’s time to query Cassandra for our log data.

Cassandra Queries

Now that we have our data being sent to Cassandra we can run some queries on the data.

Start up CQLSH.

$ bin/cqlsh cass1.admintome.lab
Connected to AdminTome Cluster at cass1.admintome.lab:9042.
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>

Run the following query to count the number of rows we have so far.  Keep in mind this is assuming you have some web requests that have been processed already.

select count(*) from admintome.logs where log_source = 'www2' and log_type = 'www_access';

We should get a response back:

@ Row 1
-------+----
 count | 23

You can view the logs with this query:

select dateOf(log_id), log from admintome.logs where log_source = 'www2' and log_type = 'www_access' limit 5;

@ Row 1
-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 system.dateof(log_id) | 2018-08-08 04:35:20.493000+0000
 log                   | '172.69.70.168 - - [08/Aug/2018:04:35:20 +0000] "GET /blog/installing-puppet-enterprise-2017-3-agents/ HTTP/1.0" 200 35250 "http://www.admintome.lab/blog/tag/puppet/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36"'

@ Row 2

Keep in mind that that query is fine when you don’t have many rows but due to the way Cassandra stores data it can cause serious performance issues if you try to run it on a large data set.

A better way is to limit your query to a set time period like this query:

cqlsh> select dateOf(log_id), log from admintome.logs where log_source = 'www2' and log_type = 'www_access' and log_id >= maxTimeuuid('2018-08-08 04:30+0000') and log_id < minTimeuuid('2018-08-08 04:40+0000') limit 5;

@ Row 1
-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 system.dateof(log_id) | 2018-08-08 04:35:20.493000+0000
 log                   | '172.69.70.168 - - [08/Aug/2018:04:35:20 +0000] "GET /blog/installing-puppet-enterprise-2017-3-agents/ HTTP/1.0" 200 35250 "http://www.admintome.lab/blog/tag/puppet/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36"'

Need more Python training?

Checkout these awesome Python courses on Pluralsight.  Pluralsight has great video training courses at a great price.  I use Pluralsight every time I need to learn something new.

Conlusion

We now have a complete data pipeline that takes Apache Access logs, pushes them in JSON form to a Kafka topic, a python application that consumes the messages and inserts the data into Cassandra for long term storage and data analysis.

There is much more that can be done with this pipeline to make it more robust.  For example, once we start to add in more logs like the Apache Error log or other logs, we will need to create a Kafka consumer group and run more than one consumer container and split our Kafka topic into partitions.

I hope you have enjoyed this post.

If you did then please share it on social media and comment below, I would love to hear from you.

 

1 thought on “Data Pipeline: Send logs from Kafka to Cassandra

Leave a Comment

you're currently offline