TLDR: I have stream that gets consumed when I or a task I directly created issue a DML on it. But when it is a task created by a Stored Procedure, the stream does not get consumed.
I have a stream which behaves as expected and I can see it has data when I select on it:
SELECT SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name');
Using the same role that created it I consume the stream:
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM (SELECT * FROM ANALYTICS_DB.schema.stream_name);
I SELECT again the SYSTEM$STREAM_HAS_DATA, all good it is consumed.
CREATE TASK IF NOT EXISTS ANALYTICS_DB.schema.table_test
WAREHOUSE = wh
SCHEDULE = 'USING CRON * * * * * Etc/UTC'
COMMENT = 'Checking when was the last time tables got updated'
WHEN -- conditional check if the stream has new data
SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
AS -- same previous query
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(MODIFIED_AT)
FROM FROM ANALYTICS_DB.schema.stream_name;
After a minute or so I check my stream, again, all good, it consumes the stream when running on the schedule.
My SQL part:
create PROCEDURE IF NOT EXISTS ANALYTICS_DB.schema.create_tasks()
returns string
language javascript
EXECUTE AS CALLER
as
and the javascript part (trimmed to the important bits for the sake of readers). It runs fine, creates the tasks, the tasks run according to schedule, the queries are issued but the stream is not consumed. Therefore my max() calculation is done on an ever growing table.
$$
// trimmed some stuff here getting the data
while (result_set.next())
{
var lagschema = result_set.getColumnValue(1);
var lagtable = result_set.getColumnValue(2);
var lagcolumn = result_set.getColumnValue(3);
var sql_task = `CREATE TASK IF NOT EXISTS schema.ppw_freshness_schema.stream_name
WAREHOUSE = wh
SCHEDULE = 'USING CRON */5 * * * * Etc/UTC'
COMMENT = 'Checking when was the last update'
WHEN
SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
AS
INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM FROM ANALYTICS_DB.schema.stream_name;`;
var create_task = snowflake.createStatement({sqlText: sql_task});
create_task.execute();
var start_task = snowflake.createStatement({sqlText: `ALTER TASK IF EXISTS schema.ppw_freshness_schema.stream_name RESUME;`});
start_task.execute();
}
// error handling
$$;
See below how the task I created via a Stored Procedure is running every single time, because it never empties the stream. As soon as I manually create the same task, it can empty the stream, and finally skip runs when there is no new data (which is the wanted behavior).
Nothing could have given a clue about the problem here as it lied in the naming of the stream itself. So entirely my mistake. On top of this, the tests I was running were using a very active table so it dissimulated the fact that the stream was actually performing as expected.