Kafka-ELK Docker for AVRO Streaming

  • 17 February 2016 Andriy Kashcheyev 3168

I've been doing a simulation of the possible data ingestion flow where incoming messages are funnelled through Kafka and stored on ELK for fast analytical and intelligent insights into data quality. Instead of working with raw messages which require several stages of data transformations (format conversion, filtering, mapping to business attributes), I've taken post-processed files as initial input. However, the files are in AVRO format which is not an issue for Kafka but would be a real parsing disappointment for Logstash

Logstash Kafka plugin supports JSON out of the box, so it was an obvious choice for me to tackle the case. There is an avro-tools utility in all Hadoop distributions (as well as independantly downloadable from: https://avro.apache.org) to work with AVRO files which supports conversation to JSON format. However I immediately faced a idiosyncrasy how avro-tools renders avro messages (pretty format)

{
  "eventDate" : {
    "string" : "2015-12-15 15:59:30"
  },
  "eventTime" : {
    "string" : "16:31:33"
  },
  "dataProducer" : {
    "string" : "SYSTEM.2-14"
  },
  "userId" : null,
  "profileId" : {
    "string" : ""
  },
  "userZipCode" : {
    "string" : ""
  },
  "deviceZipCode" : null,
  "userType" : {
    "string" : ""
  },
  "userStatus" : {
    "string" : ""
  }
}

This additional nesting for attribute values type reference pollutes Elastic index and makes Kibana unmanageable. Whatever empirical parameters I was throwing at avro-tools the output would still be in the same prolix format. Another thing I noticed is that the generated full file was not parsable JSON because objects were not enclosed into an array but plainly followed each other as independant JSON objects delimited by "\n". It looks fine for „non-pretty“ format where I could feed it effortlessly to kafka-console-producer, but quite useless in "pretty" format. Nevertheless it was a "pretty" format which I used as an input in a simple tool which has been relatively quickly coded for transforming into "lean" JSON messsages like this:

[{"eventDate" : "2015-12-15 15:59:30","eventTime" : "16:31:33","dataProducer" : "SYSTEM.2-14","userId" : null,"profileId" : "","userZipCode" : "","deviceZipCode" : null,"userType" : "","userStatus" : ""}] 

The tool which is unscrupulously called ajt (avor json transformer) and follows my disposition to single task pipe based stream processing in command line. This is how Avro file goes to Kafka topic as a stream:

./ajt.sh part-0001.avro | kafka-console-producer --broker-list $KAFKA_URL --topic $KAFKA_TOPIC 

The code is a real hack of string matches and conditions crafted specifically for JSON prettified output, so the tool is very single purposed. However it is only 110 lines of scala code, imports a single library scala.io.Source and does not even deserve a repository: https://gist.github.com/syspulse8913/c1000098fd92b9145b32:

What is the best way to show off but embed it in a demoable Docker!

I have shamelessly reused the work of https://hub.docker.com/r/qnib/elk/ (thanks a lot Christian ) as a baseline for my Docker and didn't even bother creating the Dockerfile: https://hub.docker.com/r/intropro/kelk/

Most of the docker preparation effort went into tweaking Logstash, Consul and Supervisord configurations but the biggest pain point was making image clean after multiple test runs with random logs and files. I have even stumbled upon the bug in Docker (my version is 1.7.0) when I was cleaning Elastic indexes from spurious logstash ingests directories and Docker aufs would silently preserve them between after commit and next time the new image tag was started I dumbfoundly contemplated messages in Kibana from seemingly non-existant index.

There is an ajt script which periodically scans the mounted volume for avro files and starts streaming them into ELK. There one trick I employ to generate Elastic timestamp value from two JSON fields which can be a mixture of only date or date + time. This is my logstash 00_entry.conf:

input {
    kafka {
        zk_connect => "localhost:2181"
        group_id => "bi"
        topic_id => "bi-ingest"
        reset_beginning => false
        consumer_threads => 1
        queue_size => 20
        rebalance_max_retries => 4
        rebalance_backoff_ms => 2000
        consumer_timeout_ms => -1
        consumer_restart_on_error => true
        consumer_restart_sleep_ms => 0
        fetch_message_max_bytes => 1000000
        codec => "json"
    }
}
filter {
  if [field] !~ /\.raw$/ {
    mutate {
      remove_field => [ "field" ]
    }
  }
  mutate {
    add_field => {
      "eventTimestamp" => "%{eventDate} %{eventTime}"
    }
  }
  date {
    match => [ "eventTimestamp", "YYYY-MM-dd HH:mm:ss", "YYYY-MM-dd HH:mm:ss HH:mm:ss" ]
  }
}

The final goal of the work is to quickly prototype Business Intel dashboards without waiting for the real solution with all necessary subsystems (Kafka, ELK, etc) to be installed by IT: