snowflake-cloud-data-platformsnowflake-task

STREAM not consumed from a task created by a stored procedure


TLDR: I have stream that gets consumed when I or a task I directly created issue a DML on it. But when it is a task created by a Stored Procedure, the stream does not get consumed.

I have a stream which behaves as expected and I can see it has data when I select on it: SELECT SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name');

Using the same role that created it I consume the stream:

INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM (SELECT * FROM ANALYTICS_DB.schema.stream_name);

I SELECT again the SYSTEM$STREAM_HAS_DATA, all good it is consumed.

Now, I am bundling that into a task:

CREATE TASK IF NOT EXISTS ANALYTICS_DB.schema.table_test
            WAREHOUSE = wh
            SCHEDULE = 'USING CRON * * * * * Etc/UTC'
            COMMENT = 'Checking when was the last time tables got updated'
        WHEN -- conditional check if the stream has new data
            SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
        AS -- same previous query
    INSERT INTO ANALYTICS_DB.schema.table
    (stuff, last_checked, column_max)
    SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(MODIFIED_AT)
    FROM FROM ANALYTICS_DB.schema.stream_name;

After a minute or so I check my stream, again, all good, it consumes the stream when running on the schedule.

The stored procedure to create the tasks.

My SQL part:

create PROCEDURE IF NOT EXISTS ANALYTICS_DB.schema.create_tasks()
    returns string
    language javascript
    EXECUTE AS CALLER
as

and the javascript part (trimmed to the important bits for the sake of readers). It runs fine, creates the tasks, the tasks run according to schedule, the queries are issued but the stream is not consumed. Therefore my max() calculation is done on an ever growing table.

$$
// trimmed some stuff here getting the data

    while (result_set.next())
    {
        var lagschema = result_set.getColumnValue(1);
        var lagtable = result_set.getColumnValue(2);
        var lagcolumn = result_set.getColumnValue(3);

        var sql_task = `CREATE TASK IF NOT EXISTS schema.ppw_freshness_schema.stream_name
            WAREHOUSE = wh
            SCHEDULE = 'USING CRON */5 * * * * Etc/UTC'
            COMMENT = 'Checking when was the last update'
        WHEN 
            SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
        AS
                INSERT INTO ANALYTICS_DB.schema.table
    (stuff, last_checked, column_max)
    SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
    FROM FROM ANALYTICS_DB.schema.stream_name;`;

        var create_task = snowflake.createStatement({sqlText: sql_task});
        create_task.execute();
        var start_task = snowflake.createStatement({sqlText: `ALTER TASK IF EXISTS schema.ppw_freshness_schema.stream_name RESUME;`});
        start_task.execute();
    }
// error handling
    $$;

See below how the task I created via a Stored Procedure is running every single time, because it never empties the stream. As soon as I manually create the same task, it can empty the stream, and finally skip runs when there is no new data (which is the wanted behavior).

task_runs


Solution

  • Nothing could have given a clue about the problem here as it lied in the naming of the stream itself. So entirely my mistake. On top of this, the tests I was running were using a very active table so it dissimulated the fact that the stream was actually performing as expected.