pythondbt

Accessing query results from DBT in python script


I am doing a bunch of programmatic running of dbt models from a python script. Part of this is the need to access the results of a simple query to determine what to do next. The dbtRunnerResult object never seems to have the query results, no matter whether I run a model or an operation. So so far I've gotten a workaround to execute a macro that then logs the rows of the query result and I parse that from the stdout of a subprocess in python like so:

The dbt macro:

{% macro execute_sql(query) %}
{% set results = run_query(query) %}
{% for row in results %}
{{ log('counter=='~row[0], info=True) }}
{% endfor %}
{% endmacro %}

and here's the python script:

import subprocess
import json
import re

# Define a function to run the dbt command and capture the output
def run_dbt_command(command):
    result = subprocess.run(command, capture_output=True, text=True)
    return result.stdout, result.stderr

# Define the SQL query
sql_query = "SELECT COUNT(*) FROM dbt.model_x"

# Run the SQL query using the execute_sql macro
command = [
    'dbt',
    'run-operation',
    'execute_sql',
    '--args',
    json.dumps({"query": sql_query})
]

stdout, stderr = run_dbt_command(command)

# Access the query results from the log output
if "counter==" in stdout:
    print("Query executed successfully")
    # Extract the rows from the log output
    rows = re.findall(r'counter==(\d+)', stdout)
    for row in rows:
        print(row)
else:
    print("Query execution failed")
    print(stderr)

This does let me access the log output from the macro but this feels really dumb and I'd much rather do something like

from dbt.cli.main import dbtRunner, dbtRunnerResult
from dbt.contracts.graph.manifest import Manifest

# Initialize the dbt runner and parse the manifest
res: dbtRunnerResult = dbtRunner().invoke(["parse"])
manifest: Manifest = res.result
dbt = dbtRunner(manifest=manifest)

sql_query = "SELECT COUNT(*) FROM dbt.model_x"

run: dbtRunnerResult = dbt.invoke([
    'run-operation',
    'execute_sql',
    '--args',
    json.dumps({"query": sql_query})
])

if run.success:
    print("Query executed successfully")
    results = run.result.results.something

I'm running dbt version 1.8 and various old pieces around the internet around dbt-rpc or dbt.get_results(query) just aren't a thing anymore.

EDIT: The reason I want to use DBT for this query running is so that I don't have to have a DBT client and corresponding DB client (e.g. psycopg2, clickhouse-connect / clickhouse-driver) to run queries separately and handle auth & connections and whatnot.


Solution

  • So I found a more "dbt" way for my use case. It doesn't give me query results back per-se, but since I need it only for some branching if/else logic, I made a test for it.

    -- test/check_model_x.sql
    with tests as (
        SELECT COUNT(*) as cnt FROM {{ ref('model_x') }} WHERE some_condition = 1
    )
    SELECT 'name_of_test', cnt FROM tests where cnt > 0
    

    Then my python script looks like this:

    from dbt.cli.main import dbtRunner, dbtRunnerResult
    from dbt.contracts.graph.manifest import Manifest
    
    class MyLooper():
    
        def __init__(self):
            # Initialize the dbt runner and parse the manifest
            res: dbtRunnerResult = dbtRunner().invoke(["parse"])
            manifest: Manifest = res.result
            self.dbt = dbtRunner(manifest=manifest)
    
            self.loop_counter = 0
            self.is_done = False
    
        def start_base(self):
            self.dbt.invoke([
                'run',
                '-s',
                'model_x'
                '--full-refresh',
                '--quiet'
            ])
            print('running full refresh')
            
    
    
        def check_x(self):
            self.loop_counter += 1
            run : dbtRunnerResult = self.dbt.invoke([
                'test',
                '-s',
                'check_model_x',
                '--quiet'
            ])
    
            if run.success:
                print(f"test success after run {self.loop_counter}")
                return True
            else:
                print(f"test failed after run {self.loop_counter}")
                return False
            
        def reduce_x(self):
            while (self.is_done is False and self.loop_counter <= 10):
                self.is_done = self.check_x()
                if self.is_done is False:
                    print('running reducer') 
                    self.dbt.invoke([
                        'run',
                        '-s',
                        'model_x',
                        '--quiet'
                    ])
            if (self.is_done is False and self.loop_counter > 10):
                print("error, stopped reducing after max tries")
    
    
    if __name__ == "__main__":
        runner = MyLooper()
        runner.start_base()
        runner.reduce_x()
    

    DBT tests will fail if they return anything, so by wrapping my count(*) query in a CTE and selecting from it only when that count is > 0, I get a test that fails when I need to run my incremental model again.

    The loop_counter variable is just there to prevent an infinite loop possibility.

    The DBT STDOUT looks something like this when I run it:

    11:10:27  Running with dbt=1.8.8
    11:10:27  Registered adapter: clickhouse=1.8.4
    11:10:28  Performance info: /path/to/project/target/perf_info.json
    11:10:29  1 of 1 FAIL 1 check_x ...................................................... [FAIL 1 in 0.07s]
    11:10:29  Failure in test check_edges (tests/check_model_x.sql)
    11:10:29    Got 1 result, configured to fail if != 0
    test failed after run 1
    ...................................................... [FAIL 1 in 0.14s]
    ....
    ...more  runs...
    ....
    11:10:46  1 of 1 FAIL 1 check_x ...................................................... [FAIL 1 in 0.06s]
    11:10:46  Failure in test check_edges (tests/check_model_x.sql)
    11:10:46    Got 1 result, configured to fail if != 0
    test failed after run 6
    running reducer
    test success after run 7
    

    Like I said, not actually a way to get query results cleanly back from DBT runs, but serving the purpose of being able to make decicisions based on a query result. This can likely be extended with custom test logic in DBT that would give more than a pass/fail in run.result.results[0].message, but this is good enough for me at the moment.