I will need to find out the co-occurrence times between one single tag and another fixed set of tags as whole. I have 10000 different single tags, and there are 10k tags inside fixed set of tags. I loop through all single tags under a fixed set of tags context with a fixed time range. I have total 1 billion documents inside the index with 20 shards.
Here is the elasticsearch query, elasticsearch 6.6.0:
es.search(index=index, size=0, body={
"query": {
"bool": {
"filter": [
{"range": {
"created_time": {
"gte": fixed_start_time,
"lte": fixed_end_time,
"format": "yyyy-MM-dd-HH"
}}},
{"term": {"tags": dynamic_single_tag}},
{"terms": {"tags": {
"index" : "fixed_set_tags_list",
"id" : 2,
"type" : "twitter",
"path" : "tag_list"
}}}
]
}
}, "aggs": {
"by_month": {
"date_histogram": {
"field": "created_time",
"interval": "month",
"min_doc_count": 0,
"extended_bounds": {
"min": two_month_start_time,
"max": start_month_start_time}
}}}
})
My question: Is there any solution which can have a cache inside elasticsearch for a fixed 10k set of tags terms query and time range filter which can speed up the query time? It took 1.5s for one single tag for my query above.
What you are seeing is normal behavior for Elasticsearch aggregations (actually, pretty good performance given that you have 1 billion documents).
There are a couple of options you may consider: using a batch of filter
aggregations, re-indexing with a subset of documents, and downloading the data out of Elasticsearch and computing the co-occurrences offline.
But probably it is worth trying to send those 10K queries and see if Elasticsearch built-in caching kicks in.
Let me explain in a bit more detail each of these options.
filter
aggregationFirst, let's outline what we are doing in the original ES query:
create_time
in certain time window;dynamic_single_tag
;fixed_set_tags_list
;The performance is a problem because we've got 10K of tags to make such queries for.
What we can do here is to move filter
on dynamic_single_tag
from query to aggregations:
POST myindex/_doc/_search
{
"size": 0,
"query": {
"bool": {
"filter": [
{ "terms": { ... } }
]
}
},
"aggs": {
"by tag C": {
"filter": {
"term": {
"tags": "C" <== here's the filter
}
},
"aggs": {
"by month": {
"date_histogram": {
"field": "created_time",
"interval": "month",
"min_doc_count": 0,
"extended_bounds": {
"min": "2019-01-01",
"max": "2019-02-01"
}
}
}
}
}
}
}
The result will look something like this:
"aggregations" : {
"by tag C" : {
"doc_count" : 2,
"by month" : {
"buckets" : [
{
"key_as_string" : "2019-01-01T00:00:00.000Z",
"key" : 1546300800000,
"doc_count" : 2
},
{
"key_as_string" : "2019-02-01T00:00:00.000Z",
"key" : 1548979200000,
"doc_count" : 0
}
]
}
}
Now, if you are asking how this can help the performance, here is the trick: to add more such filter
aggregations, for each tag: "by tag D"
, "by tag E"
, etc.
The improvement will come from doing "batch" requests, combining many initial requests into one. It might not be practical to put all 10K of them in one query, but even batches of 100 tags per query can be a game changer.
(Side note: roughly the same behavior can be achieved via terms
aggregation with include
filter parameter.)
This method of course requires getting hands dirty and writing a bit more complex query, but it will come handy if one needs to run such queries at random times with 0 preparation.
The idea behind second method is to reduce the set of documents beforehand, via reindex API. reindex
query might look like this:
POST _reindex
{
"source": {
"index": "myindex",
"type": "_doc",
"query": {
"bool": {
"filter": [
{
"range": {
"created_time": {
"gte": "fixed_start_time",
"lte": "fixed_end_time",
"format": "yyyy-MM-dd-HH"
}
}
},
{
"terms": {
"tags": {
"index": "fixed_set_tags_list",
"id": 2,
"type": "twitter",
"path": "tag_list"
}
}
}
]
}
}
},
"dest": {
"index": "myindex_reduced"
}
}
This query will create a new index, myindex_reduced
, containing only elements that satisfy first 2 clauses of filtering.
At this point, the original query can be done without those 2 clauses.
The speed-up in this case will come from limiting the number of documents, the smaller it will be, the bigger the gain. So, if fixed_set_tags_list
leaves you with a little portion of 1 billion, this is the option you can definitely try.
To be honest, this use-case looks more like a job for pandas. If data analytics is your case, I would suggest using scroll API to extract the data on disk and then process it with an arbitrary script.
In python it could be as simple as using .scan()
helper method of elasticsearch
library.
Elasticsearch will already try to help you with your query via request cache
. It is applied only to pure-aggregation queries (size: 0
), so should work in your case.
But it will not, because the content of the query will always be different (the whole JSON of the query is used as caching key, and we have a new tag in every query). A different level of caching will start to play.
Elasticsearch heavily relies on the filesystem cache, which means that under the hood the more often accessed blocks of the filesystem will get cached (practically loaded into RAM). For the end-user it means that "warming up" will come slowly and with volume of similar requests.
In your case, aggregations and filtering will occur on 2 fields: create_time
and tags
. This means that after doing maybe 10 or 100 requests, with different tags, the response time will drop from 1.5s to something more bearable.
To demonstrate my point, here is a Vegeta plot from my study of Elasticsearch performance under the same query with heavy aggregations sent with fixed RPS:
As you can see, initially the request was taking ~10s, and after 100 requests it diminished to brilliant 200ms.
I would definitely suggest to try this "brute force" approach, because if it works it is good, if it does not - it costed nothing.
Hope that helps!