sqlamazon-athenaattribution

Athena: Query exhausted resources at this scale factor. How to optimize the given query


I am using the below query to execute on AWS Athena. It gives error 'Query exhausted resources at this scale factor. '. In the query stats tab, I see that the data scanned is approximately 36 GB.

 WITH session_dataset AS (
            SELECT user_id,
                max(medium) as medium,
                max(event_date) as event_date,
                session_id
            FROM view_session
            where date(event_date) <= date_add('day', - 1, current_date)
                and date(event_date) >= date_add('day', - 90, current_date)
                and category not in ('Offline Sources')
            GROUP BY user_id,
                session_id
        ),
        user_conversion AS (
            select user_id,
                session_id,
                name,
                event_date,
                has_crm,
                customer_retention_type
            from view_session
            where cohort_type = 'conversion'
                and name is not null
                and date(event_date) <= date_add('day', - 1, current_date)
                and date(event_date) >= date_add('day', - 90, current_date)
        ),

    dataset_yesterday AS (
            SELECT uc.user_id,
                uc.name,
                max(uc.has_crm) as has_crm,
                max(uc.customer_retention_type) as customer_retention_type,
                count(sd.session_id) as view_count,
                date_diff(
                    'day',
                    date(min(sd.event_date)),
                    date(max(uc.event_date))
                ) AS days_convert,
                array_agg(
                    sd.medium
                    order by sd.event_date,
                        split(sd.session_id, '_') [ 1 ] asc
                ) as medium_list
            FROM session_dataset sd,
                user_conversion uc
            where date(sd.event_date) <= date(uc.event_date)
                and date(sd.event_date) >= date_add('day', - 1, current_date)
                and uc.user_id = sd.user_id
                and split(uc.session_id, '_') [ 1 ] >= split(sd.session_id, '_') [ 1 ]
            GROUP BY uc.user_id,
                uc.session_id,
                uc.name
        ),
dataset_week AS (
            SELECT uc.user_id,
                uc.name,
                max(uc.has_crm) as has_crm,
                max(uc.customer_retention_type) as customer_retention_type,
                count(sd.session_id) as view_count,
                date_diff(
                    'day',
                    date(min(sd.event_date)),
                    date(max(uc.event_date))
                ) AS days_convert,
                array_agg(
                    sd.medium
                    order by sd.event_date,
                        split(sd.session_id, '_') [ 1 ] asc
                ) as medium_list
            FROM session_dataset sd,
                user_conversion uc
            where date(sd.event_date) <= date(uc.event_date)
                and date(sd.event_date) >= date_add('day', - 7, current_date)
                and uc.user_id = sd.user_id
                and split(uc.session_id, '_') [ 1 ] >= split(sd.session_id, '_') [ 1 ]
            GROUP BY uc.user_id,
                uc.session_id,
                uc.name
        ),
dataset_month AS (
            SELECT uc.user_id,
                uc.name,
                max(uc.has_crm) as has_crm,
                max(uc.customer_retention_type) as customer_retention_type,
                count(sd.session_id) as view_count,
                date_diff(
                    'day',
                    date(min(sd.event_date)),
                    date(max(uc.event_date))
                ) AS days_convert,
                array_agg(
                    sd.medium
                    order by sd.event_date,
                        split(sd.session_id, '_') [ 1 ] asc
                ) as medium_list
            FROM session_dataset sd,
                user_conversion uc
            where date(sd.event_date) <= date(uc.event_date)
                and date(sd.event_date) >= date_add('day', - 30, current_date)
                and uc.user_id = sd.user_id
                and split(uc.session_id, '_') [ 1 ] >= split(sd.session_id, '_') [ 1 ]
            GROUP BY uc.user_id,
                uc.session_id,
                uc.name
        ),
 dataset_quarter AS (
        SELECT uc.user_id,
            uc.name,
            max(uc.has_crm) as has_crm,
            max(uc.customer_retention_type) as customer_retention_type,
            count(sd.session_id) as view_count,
            date_diff(
                'day',
                date(min(sd.event_date)),
                date(max(uc.event_date))
            ) AS days_convert,
            array_agg(
                sd.medium
                order by sd.event_date,
                    split(sd.session_id, '_') [ 1 ] asc
            ) as medium_list
        FROM session_dataset sd,
            user_conversion uc
        where date(sd.event_date) <= date(uc.event_date)
            and date(sd.event_date) >= date_add('day', - 90, current_date)
            and uc.user_id = sd.user_id
            and split(uc.session_id, '_') [ 1 ] >= split(sd.session_id, '_') [ 1 ]
        GROUP BY uc.user_id,
            uc.session_id,
            uc.name
       )

select 'yesterday' as window,
        name,
        sum(days_convert) as days_convert,
        count(name) as total_conversion,
        sum(view_count) as total_view,
        count(
            distinct IF(has_crm = '1', user_id, NULL)
        ) AS customer_count,
        count(distinct IF(has_crm != '1' or has_crm is null, user_id, NULL)) AS anonymous_customer_count,
        count(
            distinct IF(
                lower(customer_retention_type) = 'returning',
                user_id,
                NULL
            )
        ) AS returning_customer_count,
        count(
            distinct IF(
                lower(customer_retention_type) = 'new',
                user_id,
                NULL
            )
        ) AS new_customer_count,
        medium_list [ 1 ] as first_click,
        medium_list [ cardinality(medium_list) ] as last_click,
        medium_list
        from dataset_yesterday
        group by name,
            medium_list

        union all

select 'month' as window,
        name,
        sum(days_convert) as days_convert,
        count(name) as total_conversion,
        sum(view_count) as total_view,
        count(
            distinct IF(has_crm = '1', user_id, NULL)
        ) AS customer_count,
        count(distinct IF(has_crm != '1' or has_crm is null, user_id, NULL)) AS anonymous_customer_count,
        count(
            distinct IF(
                lower(customer_retention_type) = 'returning',
                user_id,
                NULL
            )
        ) AS returning_customer_count,
        count(
            distinct IF(
                lower(customer_retention_type) = 'new',
                user_id,
                NULL
            )
        ) AS new_customer_count,
        medium_list [ 1 ] as first_click,
        medium_list [ cardinality(medium_list) ] as last_click,
        medium_list
        from dataset_month
        group by name,
          medium_list

        union all

 union all

        select 'quarter' as window,
        name,
        sum(days_convert) as days_convert,
        count(name) as total_conversion,
        sum(view_count) as total_view,
        count(
            distinct IF(has_crm = '1', user_id, NULL)
        ) AS customer_count,
        count(distinct IF(has_crm != '1' or has_crm is null, user_id, NULL)) AS anonymous_customer_count,
        count(
            distinct IF(
                lower(customer_retention_type) = 'returning',
                user_id,
                NULL
            )
        ) AS returning_customer_count,
        count(
            distinct IF(
                lower(customer_retention_type) = 'new',
                user_id,
                NULL
            )
        ) AS new_customer_count,
        medium_list [ 1 ] as first_click,
        medium_list [ cardinality(medium_list) ] as last_click,
        from dataset_quarter
        group by name,
            medium_list

I found similar queries in Stack Overflow. In one post, they asked to remove the order by clause.

How can I do that in the above query?


Solution

  • We were able to resolve this by using CTAS temporary tables. In the query given in the question, everything is in cache calculation and hence it was consuming more DPU.

    We created temporary tables at runtime and stored data for session_dataset, user_conversion and then finally executed the select statement on these temporary tables.

    We are using DBT so all the above things can be handled programmatically. Please find the solution below:

    {{
        config(
            materialized='table',
            external_location="s3://" + env_var('BUCKET') + "/" + env_var('SCHEMA')  + "/full_refresh/" + this.identifier + "/" +started_at_formatted(),
            tags=['table_zero_downtime'],
            post_hook=[
                     '{{ clean_up(this.identifier) }}'
                    ]
        )
    }}
         {%- set check_relation_unified_events = adapter.get_relation(
              database= env_var('DATABASE'),
              schema=  env_var('SCHEMA'),
              identifier=unified_events_relation())
        -%}
    
        {%- set check_relation_session = adapter.get_relation(
              database= env_var('DATABASE'),
              schema=  env_var('SCHEMA'),
              identifier= session_relation())
        -%}
    
        {%- set time_grain = ["yesterday", "week", "month", "quarter"] -%}
        {%- set time_grain_period = [1, 7, 30, 90] -%}
    
        {% if check_relation_unified_events != None and check_relation_session != None%}
            {{ attribution_pre_clean_up() }}
    
            {% call statement() -%}
                CREATE TABLE {{ env_var('SCHEMA') }}.session_calc_for_attribution_window WITH (
                    partitioned_by = ARRAY [ 'event_date' ]
                ) AS
                SELECT global_user_id,
                    MAX(referrer_by_medium) AS referrer_by_medium,
                    session_id,
                    MAX(session_start_epoch_tz) AS session_start_epoch_tz,
                    MAX(event_date) AS event_date
                FROM view_session
                WHERE DATE(event_date) <= date_add('day', - 1, CURRENT_DATE)
                    AND date(event_date) >= date_add('day', - 90, CURRENT_DATE)
                    AND referrer_by_category NOT IN ('Offline Sources')
                GROUP BY global_user_id,
                    session_id
            {%- endcall %}
    
            {% call statement() -%}
                CREATE TABLE {{ env_var('SCHEMA') }}.conversion_calc_for_attribution_window WITH (
                    partitioned_by = ARRAY [ 'event_date' ]
                ) AS
                SELECT global_user_id,
                    session_id,
                    session_start_epoch_tz,
                    simple_name,
                    has_crm,
                    customer_retention_type,
                    event_date
                FROM view_session
                WHERE cohort_type = 'conversion'
                    AND simple_name IS NOT NULL
                    AND DATE(event_date) <= date_add('day', - 1, CURRENT_DATE)
                    AND DATE(event_date) >= date_add('day', - 90, CURRENT_DATE)
            {%- endcall %}
    
            {%- for grain in time_grain %}
                {% call statement() -%}
                    CREATE TABLE {{ env_var('SCHEMA') }}.attribution_window_{{ grain }} AS
                    WITH dataset_{{ grain }} AS (
                      SELECT sd.global_user_id,
                        sd.referrer_by_medium,
                        sd.session_id,
                        sd.session_start_epoch_tz,
                        uc.simple_name,
                        uc.has_crm,
                        uc.customer_retention_type,
                        uc.session_id as conversion_path_id
                      FROM (
                          select *
                          from conversion_calc_for_attribution_window
                          where DATE(event_date) <= date_add('day', - 1, CURRENT_DATE)
                            AND date(event_date) >= date_add('day', - {{ time_grain_period [ loop.index -1 ] }}, CURRENT_DATE)
                        ) uc
                        JOIN (
                          select *
                          from session_calc_for_attribution_window
                          where DATE(event_date) <= date_add('day', - 1, CURRENT_DATE)
                            AND date(event_date) >= date_add('day', - {{ time_grain_period [ loop.index -1 ] }}, CURRENT_DATE)
                        ) sd
                      ON uc.global_user_id = sd.global_user_id
                        AND uc.session_start_epoch_tz >= sd.session_start_epoch_tz
                    )
                    SELECT '{{ grain}}' AS window,
                      simple_name,
                      global_user_id,
                      referrer_by_medium,
                      has_crm,
                      customer_retention_type,
                      conversion_path_id,
                      FIRST_VALUE(referrer_by_medium) over (PARTITION BY conversion_path_id ORDER BY session_start_epoch_tz ASC) AS first_click,
                      FIRST_VALUE(referrer_by_medium) over (PARTITION BY conversion_path_id ORDER BY session_start_epoch_tz DESC) AS last_click,
                      COUNT(*) OVER (PARTITION BY conversion_path_id) AS session_count_in_conversion,
                      round(1.0 / COUNT(*) OVER (PARTITION BY conversion_path_id),2) AS linear_points
                    FROM dataset_{{ grain }}
                {%- endcall %}
            {%- endfor -%}
    
            {%- for grain in time_grain %}
                SELECT * FROM {{ env_var('SCHEMA') }}.attribution_window_{{ grain }}
                {%- if not loop.last %}
                    UNION ALL
                {% endif -%}
            {%- endfor -%}
    
    
    
        {% else %}
            select
            cast(null as varchar) as window,
            cast(null as varchar) as simple_name,
            cast(null as varchar) as global_user_id,
            cast(null as varchar) as referrer_by_medium,
            cast(null as varchar) as has_crm,
            cast(null as varchar) as customer_retention_type,
            cast(null as varchar) as conversion_path_id,
            cast(null as varchar) as first_click,
            cast(null as varchar) as last_click,
            cast(null as integer) as session_count_in_conversion,
            cast(null as DOUBLE) as linear_points
            where false
        {% endif %} 
    

    ===============

    {% macro attribution_pre_clean_up() %}
        {% call statement() -%}
            DROP TABLE IF EXISTS {{ env_var('SCHEMA') }}.session_calc_for_attribution_window
        {%- endcall %}
        {% call statement() -%}
            DROP TABLE IF EXISTS {{ env_var('SCHEMA') }}.conversion_calc_for_attribution_window
        {%- endcall %}
        {%- set time_grain = ["yesterday", "week", "month", "quarter"] -%}
        {%- for grain in time_grain %}
            {% call statement() -%}
                DROP TABLE IF EXISTS {{ env_var('SCHEMA') }}.attribution_window_{{ grain }}
            {%- endcall %}
        {%- endfor -%}
    {% endmacro %}