This post introduces the log management framework at Capillary which helps in application/system logs collection, aggregation, alerting and reporting. At Capillary, about 1 billion log events get generated every day from about 35 different types of services and components. An efficient way to process and access these logs is a key tool for operational excellence. There are several open-source frameworks and SaaS offerings such as Splunk and Loggly available. Frameworks like Flume, Kafka and Scribe simplify the collection and aggregation capabilities which have been adopted by many open-source and proprietary log-processing frameworks. Our log-framework is built on Apache Flume, MongoDB, Hive, Hadoop and Amazon S3. On a broad level, this framework parses the log lines generated by each module, extracts a set of  relevant features, aggregates and stores them in 3 types of stores: 1) MongoDB – for real-time querying 2) HDFS/S3 – for Map-Reduce operations 3) MySQL – dashboards

Access and Query pattern requirements

Below are few sample log lines from MySQL’s slow query logs. A set of log lines combine to a single log event. The log events which are extracted for this sample will include the actual query as well its properties present in the previous log lines.

The expectations from this log framework were to provide :

  1. Feature extraction (near real-time) – The framework must be able to parse semi-structured log formats and make them query-able. In the above example, the relevant information that gets extracted from the log event are Time, Query_time, Lock_time and the actual query.

  2. Log consolidation (near real-time) – In Capillary’s SOA, every transaction goes through several services. A log trail ordered by time across these services based on a request-id/customer-id/transaction-id helps in debugging the production issues. Another example is a time-based co-relation. Suppose the response time of the APIs were detected to be high in between 11AM and 11:15AM, a log query to find the slow queries from any of the databases during the same time( obtained from MySQL slow query logs) helps in narrowing down to the problem much faster.

  3. Reports and ad-hoc queries (hourly/nightly) – Monitor the performance of various services on an hourly or daily basis by obtaining the percentile distribution, average response times (on a time-window), error-rates, etc. This removes the overhead of capturing these performance metrics inside every service. For e.g. a log-query to get all the queries which have a very high Rows_examined/Rows_sent ratio, will  point to all the MySQL queries which haven’t been written well or don’t have proper indexes defined.

  4. Dashboards – Answer questions like – Post release, did the number of slow queries per day increase this week compared to last week? Did any of my clusters respond slowly compared to other clusters last Tuesday? Such questions would require an access to the past data and the ability to drill down.

  5. Alerts (near real-time) – Events such as response time crossing a certain threshold, FATAL errors in the application etc should be notified and also sent to tools like Nagios. For e.g a simple alert on Query_time exceeding a particular threshold  helps to proactively fix issues.

Architecture

log framework

In this framework, we are using Apache Flume to tail the logs, extract out the features, collect and then store them in multiple stores. Flume follows the store-forward pattern where the agents and the collectors are based on a 3-stage pipeline – source, channel and sink. Channel is a buffer between the sources and sinks which may be running at different throughput rates.

Agent

  1. Source – The source of an agent is each new log line obtained from tailing the log files.

  2. Channel – The log events are buffered in a file based channel that provides persistence and allows replays in case of a crash.

  3. Sink – Feature extraction – This is a custom sink where we have plugged in our feature extractor that takes a parsing specification, extract the features based on that specification and then sends the events to the collector. Some of its features are –

    1. Supporting multi-line aggregation. In the above example, #Time is the pattern for which every event of interest gets buffered till the next #Time line comes

    2. The parsing specification has a hierarchical parse specification in java.text.MessageFormat. We use fixed fields (for common attributes like request-id, execution time) and flex fields (like a Map). The key/value pairs form the sparse metrics in the data stores.

    3. The extracted features along with the original body are sent to the collector in BSON serialized object format over Avro – RPC.

The specifications to parse the log lines are defined in a yaml file. Sample specification for mysql slow query log lines is shown below :

Collector

  1. Source – Source of the collector are the events collected from different agents which are BSON serialized.

  2. Channels – One file channel for each of its sinks because the write throughput to these sinks may vary.

  3. Sinks

    1. MongoSink – Simply batch and store the BSONs in a table. One table for each day. When a new table is added, index specifications on the key-values are added based on the access pattern required.

    2. HDFS sink – Stream the JSONs to HDFS folder structure by date/module/host/hour using HDFS Sink. Alternatively a S3 sink can also be used.

    3. Real-time-alerts Sink – This is a custom sink which doesn’t store any data but shoots real-time alerts through emails

      1. It uses a rule engine to read the alert specifications. If a log event passes any of the rules, an alert is sent via an email.

      2. Time-window based aggregation –  Eg. Send an alert only if there are 10 warnings of a particular type in the last 5 minutes.

      3. Throttling of alerts – Eg. Don’t send more than 5 emails in 30 minutes for the same a particular alert.

Query Language

  1. Real-time Query for Log search – The logs are indexed by request-ids which originate from an API call. The first module that processes this request, logs the request-id along with other attributes as well such as event type, customer_id, txn_id which flows through the log framework and gets indexed. All the subsequent modules also log this request-id along with other attributes that it may want to log. For eg, To fetch all the logs of a customer for a particular day, we find different request-ids from the indexed tables and then fire one query per request-id to fetch logs across all the modules and then sort on the timestamp to find the whole trail.

  2. Hive Query on Map-Reduce – In order to get different statistics, ad-hoc queries are fired on the hive table which is either a view on MongoDB, HDFS folders or s3 bucket. These tables are partitioned over date-time, module, host, hour to prune the partitions while querying if possible. The reports generally run on a nightly basis and summarize the findings. A set of pre-defined hive-queries create cubes on dimensions like cluster, server, tenant, date-ranges etc. and pushes them to MySQL for the dashboard.

    hostname mysqluser slowcount
    campaign-shard-db dsl 66
    campaign-shard-db campaign 10
    dbintouchslave1 dsl 2174
    dbintouchslave1 capreader 287
    dbintouchslave1 capillaryro 4
    dbintouchslave1 capillary 1
    dbmaster intouch 2542
    dbmaster event_manager 299
    dbmaster emf 108
    dbmaster capillaryro 45
  3. Dashboard queries – The dashboard queries the MySQL directly to show trends and also provides the ability to drill-down to a particular dimension. A query language is defined to fetch the data from MySQL. A sample query looks like

Few of the reports that can be viewed on our dashboard can be found here.

Some fine points

  1. The feature extractor is a custom sink that can run on the collector as well. We decided to keep it on the agents in order to push the parsing computation down to the agents, which reduces the CPU requirements at the collector.
  2. It is possible to remove the collector altogether and push the data-store sinks to the agents directly.
  3. We compress the logs in LZO format for efficient storage and IO.
  4. If the collector crashes or any data-store slows down or goes down, the previous step channels keep buffering the events. Each module discovers other modules through a dynamic registry. So, as soon as a replacement service comes up, it receives the events that didn’t succeed
  5. When we create a log trail across services for a request-id, timestamp is used to order the trail. Since these log timestamps are collected from different machines (possibly), time drift should be managed well.