mysqlbenthos

Aggregate sql raw records with benthos


I have this input in benthos to query lines of my db for a one-to-many relation between main table and child table :

input:
  sql_raw:
    driver: mysql
    dsn: user:password@tcp(ip:3306)/db
    query: |
      SELECT m.id, m.type, c.position, c.value
      FROM main m
      LEFT JOIN child c ON c.main_id = m.id
      ORDER BY m.id ASC, c.position ASC

With a simple output, i have this result :

{ "mid": 1, "type": "someType", "position": 0, "value": 2 }
{ "mid": 1, "type": "someType", "position": 1, "value": 5 }
{ "mid": 1, "type": "someType", "position": 2, "value": 20 }
{ "mid": 1, "type": "someType", "position": 3, "value": 8 }

I want an output like the following :

{
    "mid": 1,
    "type": "someType",
    "children": [
        {
            "position": 0,
            "value": 2,
        },
        {
            "position": 1,
            "value": 5,
        },
        {
            "position": 2,
            "value": 20,
        },
        {
            "position": 3,
            "value": 8,
        },
    ]
}

How should i do that ? With a cache ? How to aggregate (join ?) messages ?

The only way i found so far is to create a pipeline with a sql_raw processor for each main id... not ideal as it requires as many sql requests as main table lines...


Solution

  • Does the entire dataset fit into memory? If yes, then you can use a generate input combined with a sql_select processor which returns a JSON array of messages that you can then massage into the desired structure via bloblang. Here's an example:

    input:
      generate:
        count: 1
        interval: 0s
        mapping: root = ""
      processors:
        - sql_select:
            driver: sqlite
            dsn: file:./foobar.db
            table: test
            columns:
              - mid
              - type
              - position
              - value
            init_statement: |
              CREATE TABLE IF NOT EXISTS test (
                mid int,
                type varchar,
                position int,
                value int
              );
              INSERT INTO test (mid, type, position, value) VALUES (1, 'someType', 0, 2), (2, 'someOtherType', 1, 5), (1, 'someType', 2, 20), (2, 'someOtherType', 3, 8);
    
        - unarchive:
            format: json_array
        - group_by_value:
            value: ${! json("mid") }-${! json("type") }
        - mapping: |
            root = if batch_index() == 0 { this.with("mid", "type").merge({"children": json().with("position", "value").from_all()}) } else { deleted() }
    
    output:
      stdout: {}
    

    I assume you have multiple values for mid and type, so I used group_by_value to split the single batch of rows into smaller batches based on these 2 fields and then I collapsed them into single messages with the structure you requested. Since I don't have your database, for this sample config I used a simple sqlite table that contains the results of your query.

    If your dataset doesn't fit into memory, then there are workarounds, like fetching all the IDs, splitting them in small batches and then running the query you have for each of these batches. You might also want to use a cache to keep track of the processed IDs in case Benthos needs to be restarted. The config is a bit more involved though.