mysqlelasticsearchrdbmsapache-spark-sqlopentsdb

Can suggestion a solution for big, relational data analyzer please?


I`m looking for some suggestions on my requirements. Below are the description of my requirements. Feel free to contact me for any details please. Even some suggestions on how I can describe my questions more clearly is also very appreciate:)

Requirements description

I have some data, the format is like below: router, interface,timestamp, src_ip, dst_ip, src_port, dst_port, protocol, bits r1, 1, 1453016443, 10.0.0.1, 10.0.0.2, 100, 200, tcp, 108 r2, 1, 1453016448, 10.0.0.3, 10.0.0.8, 200, 200, udp, 100

As you can see, it is some network raw data. I omit some columns just to make it looks more clear. The volume of data is very big. And it is generating very fast, like 1 billion rows every 5min...

What I want is to do some real time analysis on these data. For example:

draw a line using the timestamp

select sum(bits) , timestamp from raw_data group by router,interface where interface = 1, router=r1.

find out which 3 src_ip sending the most data for one interface

select sum(bits) from raw_data where router=r1 and interface=2 group by src_ip order by sum(bits) desc limit 3

I have already tried some solutions and each of them is not very suitable for it. For example :

rdbms

MySQL seems fine except a few problems:

  1. the data is too big
  2. I`m having a lot more columns than I described here. To improve my query speed, I have to some index on most of the columns. But i think create index on big table and the index containing too many columns is not very good, right?

openTSDB

OpenTSDB is a good timeseries database. But also not suitable for my requirements.

openTSDB is having problem to solve the TOP N problem. In my requirements "to get top 3 src_ip which sending most data", openTSDB can not resolve this.

Spark

I know that apache spark can be used like RDBMS. It having the feature called spark SQL. I did not try but I guess the performance should not satisfy the real time analysis/query requirement, right? After all, spark is more suitable for offline calculation, right?

Elastic Search

I really give a lot hope on ES when I know this project. But it is not suitable either. Because When you aggregating more than one column, you have to use the so called nested bucket aggregation in elasticsearch. And the result of this aggregation can not be sorted. You have to retrieve all the result and sort by your self. In my case, the result is too much. To sort the result will be very difficult

So.... I`m stuck here. Can anyone give some suggestions please?


Solution

  • I don't see why ES would not be able to achieve your requirements. I think you misunderstood this part

    But it is not suitable either. Because When you aggregating more than one column, you have to use the so called nested bucket aggregation in elasticsearch. And the result of this aggregation can not be sorted.

    Your first requirement draw a line using the timestamp could be easily achieved with a query/aggregation like this:

    {
      "query": {
        "bool": {
          "must": [
            {
              "term": {
                "interface": 1
              }
            },
            {
              "term": {
                "router": "r1"
              }
            }
          ]
        }
      },
      "aggs": {
        "by_minute": {
          "date_histogram": {
            "field": "timestamp",
            "interval": "1m"
          },
          "aggs": {
            "sum_bits": {
              "sum": {
                "field": "bits"
              }
            }
          }
        }
      }
    }
    

    As for your second requirement find out which 3 src_ip sending the most data for one interface, it can also easily be achieved with a query/aggregation like this one:

    {
      "query": {
        "bool": {
          "must": [
            {
              "term": {
                "interface": 2
              }
            },
            {
              "term": {
                "router": "r1"
              }
            }
          ]
        }
      },
      "aggs": {
        "by_src_ip": {
          "terms": {
            "field": "src_ip",
            "size": 3,
            "order": {
              "sum_bits": "desc"
            }
          },
          "aggs": {
            "sum_bits": {
              "sum": {
                "field": "bits"
              }
            }
          }
        }
      }
    }
    

    UPDATE

    According to your comment, your second requirement above could change to find the top 3 combination of src_ip/dst_ip. This would be doable with a terms aggregation using a script instead of a term which would build the src/dest combination and provide the sum of bits for each couple, like this:

    {
      "query": {
        "bool": {
          "must": [
            {
              "term": {
                "interface": 2
              }
            },
            {
              "term": {
                "router": "r1"
              }
            }
          ]
        }
      },
      "aggs": {
        "by_src_ip": {
          "terms": {
            "script": "[doc.src_ip.value, doc.dst_ip.value].join('-')",
            "size": 3,
            "order": {
              "sum_bits": "desc"
            }
          },
          "aggs": {
            "sum_bits": {
              "sum": {
                "field": "bits"
              }
            }
          }
        }
      }
    }
    

    Note that in order to run this last query, you'll need to enable dynamic scripting. Also since you'll have billions of documents, scripting might not be the best solution, but it's worth giving it a try before diving further. One other possible solution would be to add a combination field (src_ip-dst_ip) at indexing time so that you can use it as a field in your terms aggregation without having to resort to scripting.