I have this input in benthos to query lines of my db for a one-to-many relation between main table and child table :
input:
sql_raw:
driver: mysql
dsn: user:password@tcp(ip:3306)/db
query: |
SELECT m.id, m.type, c.position, c.value
FROM main m
LEFT JOIN child c ON c.main_id = m.id
ORDER BY m.id ASC, c.position ASC
With a simple output, i have this result :
{ "mid": 1, "type": "someType", "position": 0, "value": 2 }
{ "mid": 1, "type": "someType", "position": 1, "value": 5 }
{ "mid": 1, "type": "someType", "position": 2, "value": 20 }
{ "mid": 1, "type": "someType", "position": 3, "value": 8 }
I want an output like the following :
{
"mid": 1,
"type": "someType",
"children": [
{
"position": 0,
"value": 2,
},
{
"position": 1,
"value": 5,
},
{
"position": 2,
"value": 20,
},
{
"position": 3,
"value": 8,
},
]
}
How should i do that ? With a cache ? How to aggregate (join ?) messages ?
The only way i found so far is to create a pipeline with a sql_raw processor for each main id... not ideal as it requires as many sql requests as main table lines...
Does the entire dataset fit into memory? If yes, then you can use a generate
input combined with a sql_select
processor which returns a JSON array of messages that you can then massage into the desired structure via bloblang. Here's an example:
input:
generate:
count: 1
interval: 0s
mapping: root = ""
processors:
- sql_select:
driver: sqlite
dsn: file:./foobar.db
table: test
columns:
- mid
- type
- position
- value
init_statement: |
CREATE TABLE IF NOT EXISTS test (
mid int,
type varchar,
position int,
value int
);
INSERT INTO test (mid, type, position, value) VALUES (1, 'someType', 0, 2), (2, 'someOtherType', 1, 5), (1, 'someType', 2, 20), (2, 'someOtherType', 3, 8);
- unarchive:
format: json_array
- group_by_value:
value: ${! json("mid") }-${! json("type") }
- mapping: |
root = if batch_index() == 0 { this.with("mid", "type").merge({"children": json().with("position", "value").from_all()}) } else { deleted() }
output:
stdout: {}
I assume you have multiple values for mid
and type
, so I used group_by_value
to split the single batch of rows into smaller batches based on these 2 fields and then I collapsed them into single messages with the structure you requested. Since I don't have your database, for this sample config I used a simple sqlite
table that contains the results of your query.
If your dataset doesn't fit into memory, then there are workarounds, like fetching all the IDs, splitting them in small batches and then running the query you have for each of these batches. You might also want to use a cache to keep track of the processed IDs in case Benthos needs to be restarted. The config is a bit more involved though.