I'm trying to load all new files from an AWS S3 bucket depending on its paths to two Snowflake tables, but I couldn't yet succeed even with one table. What I've tried:
Created a stage:
CREATE or replace STAGE DATA_SCIENCE.INFRA.jobs_resource_usage URL = 's3://om/jobs-resource-usage/'
storage_integration = om_s3 FILE_FORMAT=(TYPE='JSON');
Created a table:
create or replace TABLE DATA_SCIENCE.INFRA.job_metrics (
job_name STRING,
build_number INT,
cpu_perc INT,
mem BIGINT,
"timestamp" TIMESTAMP
);
Created a pipe:
create or replace pipe DATA_SCIENCE.INFRA.job_metrics auto_ingest=true as
copy into DATA_SCIENCE.INFRA.job_metrics
from (select
REGEXP_SUBSTR(METADATA$FILENAME,'job_name=(.*)/',1, 1, 'e', 1),
REGEXP_SUBSTR(METADATA$FILENAME,'build_number=([0-9]+)',1, 1, 'e', 1),
$1:cpu_perc::INT,
$1:mem::BIGINT,
$1:timestamp::TIMESTAMP
from @DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/);
Added the SQS ARN to the bucket's event with:
select parse_json(SYSTEM$PIPE_STATUS('DATA_SCIENCE.INFRA.job_metrics')):notificationChannelName;
has returnedThe stage works, because I can list files with like:
ls '@DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/job_name=Ingest job';
Which returns S3 file names like (included a sample to see its format):
s3://om/jobs-resource-usage/metrics/job_name=Ingest job/build_number=144.json
I can successfully load the file manually with:
copy into DATA_SCIENCE.INFRA.job_metrics
from (select
REGEXP_SUBSTR(METADATA$FILENAME,'job_name=(.*)/',1, 1, 'e', 1),
REGEXP_SUBSTR(METADATA$FILENAME,'build_number=([0-9]+)',1, 1, 'e', 1),
$1:cpu_perc::INT,
$1:mem::BIGINT,
$1:timestamp::TIMESTAMP
from @DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/)
files=('job_name=Ingest job/build_number=144.json');
However, the pipe doesn't load anything. If I do a
select SYSTEM$PIPE_STATUS('DATA_SCIENCE.INFRA.job_metrics');
I can see it receives the notification messages:
{"executionState":"RUNNING","pendingFileCount":0,"notificationChannelName":"arn:aws:sqs:us-west-2:494544507972:sf-snowpipe-concealed","numOutstandingMessagesOnChannel":7,"lastReceivedMessageTimestamp":"2020-08-13T09:59:21.107Z"}
but I can't see any lastForwardedMessageTimestamp
entries, which suggests there's a problem with path matching?
I've tried multiple permutations with the leading slash and to upload files right to the metrics
path, without any spaces or =
s, without success.
What did I do wrong, how could I figure out what's the problem here?
Review what stages you have pointing to your S3 buckets. Having multiple stages at different levels of granularity can cause reading conflicts of the message queues. If the pipe is working correctly and seeing the messages, you will see a lastForwardedMessageTimestamp like you mentioned. If you don't see that, you either don't have any messages in your queue, or the pipe is not reading the queue correctly, or there is a conflict and something else is reading the queue messages first. Do you have access to check your SQS queue logs to make sure messages are showing up in the first place and that your queue is working? If your queue is working correctly, I would double check you have permissions to the queue set correctly and that you don't have multiple stages conflicting on your integration and queue.