Damian Hagge

Helping companies with Software and Technical Strategy

Histograms with Elasticsearch

by Damian Hagge on August 10, 2017
Histograms with Elasticsearch

Elasticsearch has been gaining popularity very quickly over the past few years, and for good reason. It features fast and efficient text search capability backed by Lucene, but it also has excellent real-time analytics. I’ve personally used elasticsearch extensively and generally found it to be better for search-based analytics than other options such as hadoop.

Once of the analytics use-cases I’ve found elasticsearch to excell at is real-time histograms, especially when they need to be real-time ad-hoc queries. In this arcticle we’ll work through an example of how to build a histogram query with elastcisearch.

One thing to keep in mind is that elasticsearch works at scale! Everything you see here could also be done in SQL but there’s a point at which relational-databases don’t handle the data volume well and a NoSQL is the only realistic way to query very large datasets.

Histograms

Firstly, what is a histogram? Well, it’s a graphical representation of a time-sliced series of data. Let’s say, for example, that you have a dataset of purchases which contains the fields purchase, date and price. A histogram would be a graph of that dataset over time, for example, the average price of purchases for each month of 2016:

Average Sales By Month Histogram

Elasticsearch Histogram

Now let’s build our own histogram using elasticsearch.

Install and start elasticsearch

First let’s install elasticsearch.

Once it’s installed we can start the instance:

cd elasticsearch-5.0.2/bin/
./elasticsearch

To verify that the instances started correctly goto http://localhost:9200?pretty and you should see a response in the browser that looks something like:

{
    "name": "Piper",
    "cluster_name": "elasticsearch",
    "cluster_uuid": "_VGr2MAHQPK7tVxPlCPmDw",
    "version": {
        "number": "2.4.1",
        "build_hash": "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
        "build_timestamp": "2016-09-27T18:57:55Z",
        "build_snapshot": false,
        "lucene_version": "5.5.2"
    },
    "tagline": "You Know, for Search"
}

Index a dataset

Now let’s index a dataset into the elasticsearch instance. Indexing is just the elasticsearch terminology for adding a record.

We’ll create a dataset for a cactus distributer in the south-western United States. I’ve provided two examples below, a command line-based curl example and a Java example. You can follow either one since they both do exactly the same thing.

Curl

First let’s create an elasticsearch mapping for our index. A mapping is like a schema for the documents we are going to index into elasticsearch.

curl -XPUT http://localhost:9200/cactus -d '{
  "mappings" : {
    "sales" : {
      "properties" : {
        "price" : { "type" : "double" },
        "date" : { "type" : "date" },
        "state" : { "type" : "keyword" },
        "category" : { "type" : "keyword" }
      }
    }
  }
}'

This mapping for the sales type contains four fields: price, date, state and category and specifies field datatypes for each one. The keyword type is critical here since it will allow us to do terms aggregations on those fields as we’ll see later on when we execute our queries.

Now let’s index a sample dataset into elasticsearch:

# pull down the sample bulk index data from github
curl https://raw.githubusercontent.com/dhagge/sitepoint-elasticsearch-aggregation/master/resources/data.index > data.index

# index the data into ES via bulk index request
curl -XPOST http://localhost:9200/_bulk --data-binary "@data.index"; echo

Now let’s verify that the data actually did index correctly. In a brower go to http://localhost:9200/cactus/_search?pretty and you should see results like:

{
    "took": 95,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
    },
    "hits": {
        "total": 2000,
        "max_score": 1,
        "hits": [
            {
                "_index": "cactus",
                "_type": "sales",
                "_id": "AVjHuSFdTUnQhGv7Lwlk",
                "_score": 1,
                "_source": {
                    "price": 1916.55,
                    "date": "2016-12-01",
                    "state": "AZ",
                    "category": "Cholla"
                }
            },
        ... etc ...

Note that anytime you want to start from scratch you can always drop the cactus index by doing:

curl -XDELETE http://localhost:9200/cactus

Java

Full source code for the Java population code can be found here in github.

First let’s create a method to create our test dataset:

static final List<String> STATES =
        Arrays.asList("AZ", "CA", "CO", "NM", "NV", "TX", "UT");
static final List<String> CATEGORIES = Arrays.asList(
        "Cholla", "Barrel", "Hedgehog", "Prickly Pear", "Saguaro");

static final DecimalFormat df2 = new DecimalFormat(".##");
static final Random random = new Random();
/**
 * Create a random dataset to index into elasticsearch.
 * @param bulkRequest The bulk request builder to add the dataset to
 * @return The document to index
 * @throws IOException
 */
public static void createDataset(Client client,
                                 BulkRequestBuilder bulkRequest)
        throws IOException {
    // put 1000 random records into the dataset
    //  - a random price between 1 and 10,000
    //  - a random date within the past year
    //  - a random state in the southwest
    //  - a random category of cactus
    for (int i=0; i<1000; i++) {
        XContentBuilder data = jsonBuilder()
            .startObject()
            .field("price", df2.format(random.nextDouble() * (10000.0 - 1.0) + 1.0))
            .field("date", LocalDate.now().minusDays(random.nextInt(365)))
            .field("state", STATES.get(random.nextInt(STATES.size())))
            .field("category", CATEGORIES.get(random.nextInt(CATEGORIES.size())))
            .endObject();
        IndexRequestBuilder builder =
                client.prepareIndex("cactus", "sales").setSource(data);
        bulkRequest.add(builder);
    }
}

This creates 1000 records with random price and date as well as chooses a random state in the south-west United States and a random category of cactus. Each record is added to our bulk index request via the call to client.prepareIndex("cactus", "sales").setSource(data).

Now let’s write some code that creates the mapping for our index:

public static XContentBuilder createMapping() throws IOException {
    return jsonBuilder()
        .startObject()
            .startObject("sales")
                .startObject("properties")
                    .startObject("price")
                        .field("type", "double")
                    .endObject()
                    .startObject("date")
                        .field("type", "date")
                    .endObject()
                    .startObject("state")
                        .field("type", "keyword")
                    .endObject()
                    .startObject("category")
                        .field("type", "keyword")
                    .endObject()
                .endObject()
            .endObject()
        .endObject();
}

Now let’s actually create logic that calls our createDataset(..) method and indexes it into elasticsearch. A bulk index is an index which takes multiple records in a single call.

TransportClient client = createClient();

try {

    // create the mapping for our index
    client.admin().indices().prepareCreate("cactus")
            .addMapping("sales", createMapping())
            .execute().actionGet();

    // create a bulk request (which batches multiple requests into a single call)
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    createDataset(client, bulkRequest);

    // now call elasticsearch to index the documents
    BulkResponse bulkResponse = bulkRequest.get();
    if (bulkResponse.hasFailures()) {
        System.out.println("Oops, apparently cactus datasets are hard to create...");
        System.out.println(bulkResponse.buildFailureMessage());
    } else {
        System.out.println("Yay, we have lots of cactus sales!");
    }
} finally {
    client.close();
}
}

We create a TransportClient which connects to the elasticsearch admin port. Then we create the index via the call to prepareCreate(...) and supply our mapping. Finally we add the dataset to the BulkRequest and make the request via the call to bulkRequest.get().

Now let’s verify that the data actually did index correctly. In a brower go to http://localhost:9200/cactus/_search?pretty and you should see the data we just indexed (note that by default an ES query only returns 10 records).

Histogram queries

To obtain data results that can be graphed as a histogram we use search aggregation. This is a search query which returns buckets of data that we can use to display a graphical representation of the data.

First let’s query elasticsearch for a histogram of total sales by month.

Curl

curl -XPOST http://localhost:9200/cactus/_search?pretty -d '{
  "aggregations": {
    "salesByDate": {
      "date_histogram": {
        "interval": "month",
        "field": "date"
      },
      "aggregations": {
        "totalSales": {
          "sum": {
            "field": "price"
          }
        }
      }
    }
  },
  "size": 0
}'

We’re sending an aggregation request with a date_histogram aggregation named salesByDate which specifies an interval or month on the field date. We’re also including a totalSales sub-aggregation which is a sum on the field price.

This will return us an aggregation response with each salesByDate bucket containing a single totalSales bucket which in turn contains the sum of all prices for data that belong in that month.

When you run the curl command you should see a response something like:

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1000,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "salesByDate" : {
      "buckets" : [
        {
          "key_as_string" : "2015-12-01T00:00:00.000Z",
          "key" : 1448928000000,
          "doc_count" : 75,
          "totalSales" : {
            "value" : 402499.5707550049
          }
        },
        {
          "key_as_string" : "2016-01-01T00:00:00.000Z",
          "key" : 1451606400000,
          "doc_count" : 82,
          "totalSales" : {
            "value" : 424961.34213256836
          }
        },
        ...etc...
      ]
    }
  }
}

As you can see each salesByDate bucket contains a key which is timestamp of the month of data the bucket represents, and a totalSales object with the value field representing the total sum of all sales for that month.

Java

To perform the exact same aggregation query in Java we would do the following:

    DecimalFormat PRICE_FORMATTER = new DecimalFormat(".##");
    DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("yyyy/MM/dd");

    TransportClient client = ...;

    AggregationBuilder agg =
            AggregationBuilders.dateHistogram("salesByDate").field("date")
            .dateHistogramInterval(DateHistogramInterval.MONTH)
            .subAggregation(AggregationBuilders.sum("totalSales").field("price"));

    SearchResponse sr = client.prepareSearch("cactus")
            .addAggregation(agg)
            .execute().actionGet();

    MultiBucketsAggregation aggResp = sr.getAggregations().get("salesByDate");

    for (MultiBucketsAggregation.Bucket bucket : aggResp.getBuckets()) {
        DateTime key = (DateTime)bucket.getKey();
        InternalSum price = bucket.getAggregations().get("totalSales");
        System.out.println("Date: " + DATE_FORMATTER.print(key) +
                ", Total Sales: " + PRICE_FORMATTER.format(price.getValue()));
    }

We’re creating the same search aggregation as the curl example: a date_histogram aggregation named salesByDate which specifies an interval or month on the field date.

If you run this code you’ll see the total sales per month output, something similar to:

Date: 2015/12/01, Total Sales: 402499.57
Date: 2016/01/01, Total Sales: 424961.34
Date: 2016/02/01, Total Sales: 507919.05
Date: 2016/03/01, Total Sales: 392810.21
Date: 2016/04/01, Total Sales: 380018.01
...etc...

Other histograms

There are, of course lots of other histograms that can be imagined. For example let’s say we wanted to actually obtain average sales per month by state:

curl -XPOST http://localhost:9200/cactus/_search?pretty -d '{
  "aggregations": {
    "salesByDate": {
      "date_histogram": {
        "interval": "month",
        "field": "date"
      },
      "aggregations": {
        "state": {
          "terms": { 
            "field": "state" 
          },
          "aggregations": {
            "totalSales": {
              "avg": {
                "field": "price"
              }
            }
          }
        }
      }
    }
  },
  "size": 0
}'

Or another example of average price by category per day:

curl -XPOST http://localhost:9200/cactus/_search?pretty -d '{
  "aggregations": {
    "salesByDate": {
      "date_histogram": {
        "interval": "day",
        "field": "date"
      },
      "aggregations": {
        "state": {
          "terms": { 
            "field": "category" 
          },
          "aggregations": {
            "totalSales": {
              "avg": {
                "field": "price"
              }
            }
          }
        }
      }
    }
  },
  "size": 0
}'

Elasticsearch features rich search capabilities around histograms, which can be explored here.

Summary

We’ve seen how we can create an elasticsearch index, put a dataset into the index and then run various aggregations on that dataset to obtain raw data which represents a histogram.

Histograms are only a part of what elasticsearch can do and I strongly encourage you to explore the full documentation to get a feel for it’s full capabilities.

Tags: elasticsearch elastic.co big data java