pythonpython-3.xdjangodjango-aggregation

Aggregate not working in prefetched queryset in Django


I have a query that I am trying to aggregate values so I can calculate balances more quickly than querying multiple times to get the values needed.

Overall I simply want to be able to run, but the aggregation that seems to be what would allow that isn't working:

header_accounts = custom_report.account_tree_header.all()

for header_account in header_accounts:
    for regular_account in header_account.associated_regular_account_tree_accounts.all():
        gl_account = regular_account.associated_account_from_chart_of_accounts
        gl_entries = gl_account.range_gl # range entries
        # this does not work below...
        prior_credit =  gl_account.old_gl.prior_credit_amount
        prior_debit =  gl_account.old_gl.prior_debit_amount

When I run the query below with aggregate instead of annotate, I get an AttributeError 'dict' object has no attribute '_add_hints'

How can I do this?

custom_report = AccountTree.objects.select_related().prefetch_related(
    'account_tree_total', 'account_tree_regular',
    Prefetch('account_tree_header', queryset=AccountTreeHeader.objects.select_related(
        'associated_account_from_chart_of_accounts', 'associated_total_account_tree_account__associated_account_from_chart_of_accounts'
    ).prefetch_related(
        'associated_regular_account_tree_accounts',
        Prefetch('associated_regular_account_tree_accounts__associated_account_from_chart_of_accounts__general_ledger',
                 queryset=GeneralLedger.objects.select_related(
                 ).filter(Q(
                     accounts_payable_line_item__property__pk__in=property_pks,
                     journal_line_item__property__pk__in=property_pks,
                     _connector=Q.OR,
                 ), date_entered__date__gte=start_date, date_entered__date__lte=end_date).order_by('date_entered'), to_attr='range_gl'),
        # ISSUE IS HERE....
        Prefetch('associated_regular_account_tree_accounts__associated_account_from_chart_of_accounts__general_ledger',
                 queryset=GeneralLedger.objects.select_related(
                 ).filter(Q(
                     accounts_payable_line_item__property__pk__in=property_pks,
                     journal_line_item__property__pk__in=property_pks,
                     _connector=Q.OR,
                 ), date_entered__date__lte=start_date).aggregate(prior_credit_amount=Sum('credit_amount'), prior_debit_amount=Sum('debit_amount')), to_attr='old_gl'),
    )),
).get(pk=custom_report.pk)

As a note in the traceback the error occurs in .get(pk=custom_report.pk)


Solution

  • This overall was a Subquery issue where I needed to Annotate an Aggregate to figure out the solution. The issue became that I had to pass the filters to the Prefetched aggregation, and for whatever reason that was not directly or easily possible in Django.

    For example as an easy thing to reproduce:

    # does not work...the general_ledger date filters are not there, not sure why?
    # calculates from all time...
    GLAccount.objects.filter(account_identifier='1011.00').prefetch_related(
        Prefetch('general_ledger', queryset=GeneralLedger.objects.filter(
            date_entered__date__lte=start_date).distinct()),
    ).aggregate(
        Sum('general_ledger__credit_amount'), Sum('general_ledger__debit_amount')
    )
    #{'general_ledger__credit_amount__sum': Decimal('525.00'), 'general_ledger__debit_amount__sum': Decimal('707782.18')}
    
    # works fine...and filters as expected with date range intact
    GeneralLedger.objects.filter(
        date_entered__date__lte=start_date,
        account__account_identifier='1011.00').aggregate(
        Sum('credit_amount'), Sum('debit_amount')
    )
    # {'credit_amount__sum': Decimal('525.00'), 'debit_amount__sum': Decimal('629231.18')}
    

    I ended up finding a package django-sql-utils: https://github.com/martsberger/django-sql-utils/tree/master/sql_util

    And it assists in the subquery pretty easily as shown below:

    custom_report = AccountTree.objects.select_related().prefetch_related(
        Prefetch('account_tree_header', queryset=AccountTreeHeader.objects.prefetch_related(
            Prefetch('associated_regular_account_tree_accounts__associated_account_from_chart_of_accounts',
                     queryset=GLAccount.objects.prefetch_related(
                         Prefetch('general_ledger', queryset=GeneralLedger.objects.filter(
                             property__pk__in=property_pks,
                             date_entered__date__gte=start_date,
                             date_entered__date__lte=end_date).order_by('date_entered'), to_attr='range_gl'),
                     )
                     .annotate(
                         prior_credit=SubqueryAggregate(
                             'general_ledger__credit_amount',
                             filter=(Q(date_entered__date__lte=start_date) & Q(property__pk__in=property_pks)),
                             aggregate=Sum),
                         prior_debit=SubqueryAggregate(
                             'general_ledger__debit_amount',
                             filter=(Q(date_entered__date__lte=start_date) & Q(property__pk__in=property_pks)),
                             aggregate=Sum)
                     )),
        )),
    ).get(pk=custom_report.pk)
    

    And that annotated aggregation of a prefetched item is possible using this code from the package:

    from django.core.exceptions import FieldError
    from django.db.models import Q, F, QuerySet, BooleanField, Sum, Avg, ForeignKey
    from django.db.models import Subquery as DjangoSubquery, OuterRef, IntegerField, Min, Max, Count
    from django.db.models.constants import LOOKUP_SEP
    
    class Subquery(DjangoSubquery):
        def __init__(self, queryset_or_expression, **extra):
            if isinstance(queryset_or_expression, QuerySet):
                self.queryset = queryset_or_expression
                self.query = self.queryset.query
                super(Subquery, self).__init__(queryset_or_expression, **extra)
            else:
                expression = queryset_or_expression
                if not hasattr(expression, 'resolve_expression'):
                    expression = F(expression)
                self.expression = expression
                self.query = None
                self.queryset = None
                self.output_field = extra.get('output_field')
                self.extra = extra
                self.filter = extra.pop('filter', Q())
                self.distinct = extra.pop('distinct', None)
                self.outer_ref = extra.pop('outer_ref', None)
                self.unordered = extra.pop('unordered', self.unordered)
    
        def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
            # The parent class, Subquery, takes queryset as an initialization parameter
            # so self.queryset needs to be set before we call `resolve_expression`.
            # We can set it here because we now have access to the outer query object,
            # which is the first parameter of this method.
            if self.query is None or self.queryset is None:
                # Don't pass allow_joins = False here
                queryset = self.get_queryset(query.clone(), True, reuse, summarize)
                self.queryset = queryset
                self.query = queryset.query
            return super(Subquery, self).resolve_expression(query, allow_joins, reuse, summarize, for_save)
    
        def get_queryset(self, query, allow_joins, reuse, summarize):
            # This is a customization hook for child classes to override the base queryset computed automatically
            return self._get_base_queryset(query, allow_joins, reuse, summarize)
    
        def _get_base_queryset(self, query, allow_joins, reuse, summarize):
            resolved_expression = self.expression.resolve_expression(query, allow_joins, reuse, summarize)
            model = self._get_model_from_resolved_expression(resolved_expression)
    
            reverse, outer_ref = self._get_reverse_outer_ref_from_expression(model, query)
    
            outer_ref = self.outer_ref or outer_ref
            q = self.filter & Q(**{reverse: OuterRef(outer_ref)})
            queryset = model._default_manager.filter(q)
            if self.unordered:
                queryset = queryset.order_by()
            return queryset.values(reverse)
    
        def _get_model_from_resolved_expression(self, resolved_expression):
            """
            Retrieve the correct model from the resolved_expression.
    
            For simple expressions like F('child__field_name'), both of these are equivalent and correct:
            resolved_expression.field.model
            resolved_expression.target.model
    
            For many to many relations, resolved_expression.field.model goes one table deeper than
            necessary. We get more efficient SQL only going as far as we need. In this case only
            resolved_expression.target.model is correct.
    
            For functions of multiple columns like Coalesce, there is no resolved_expression.target,
            we have to recursively go through the source_expressions until we get to the bottom and
            get the target from there.
            """
            def get_target(res_expr):
                for expression in res_expr.get_source_expressions():
                    return get_target(expression)
                return res_expr.field if res_expr.target.null else res_expr.target
            return get_target(resolved_expression).model
    
        def _get_fields_model_from_path(self, path, model, target_model):
            fields = []
    
            # We want the paths reversed because we have the forward join info
            # and we need the string that tells us how to go back
            paths = list(reversed(path))
            for p in paths:
                if p.to_opts.model == model and ((p.from_opts.model != target_model or p.m2m) or not fields):
                    if getattr(p.join_field, 'related_query_name', None) and isinstance(p.join_field, ForeignKey):
                        try:
                            fields.append(p.join_field.related_query_name())
                        except TypeError:  # Sometimes related_query_name is a string instead of a callable that returns a string
                            fields.append(p.join_field.related_query_name)
                    elif hasattr(p.join_field, 'field'):
                        fields.append(p.join_field.field.name)
                    model = p.from_opts.model
    
            return fields, model
    
        def _get_reverse_outer_ref_from_expression(self, model, query):
            source = self.expression
            while hasattr(source, 'get_source_expressions'):
                source = source.get_source_expressions()[0]
            field_list = source.name.split(LOOKUP_SEP)
            path, _, _, _ = query.names_to_path(field_list, query.get_meta(), allow_many=True, fail_on_missing=True)
    
            fields, model = self._get_fields_model_from_path(path, model, query.model)
            reverse = LOOKUP_SEP.join(fields)
    
            join_field = path[0].join_field
            if model == query.model or len(path) == 1:
                try:
                    outer_ref = join_field.get_related_field().name
                except AttributeError:
                    outer_ref = 'pk'
            else:
                outer_ref = join_field.name
    
            return reverse, outer_ref
    
    
    class SubqueryAggregate(Subquery):
        """
        The intention of this class is to provide an API similar to other aggregate
        classes like Count, Min, Max, Sum, etc but generate SQL that performs the
        calculation in a subquery instead of adding joins to the outer query. This
        is commonly a performance improvement. It also reduces the risk of
        forgetting to add `distinct` when the joins duplicate data.
    
        E.g.,
        queryset.annotate(min_field=Min('field'))
    
        is replaced by
    
        queryset.annotate(min_field=SubqueryAggregate('field', aggregate=Min))
    
        A child class of SubqueryAggregate with `aggregate=Min` allows:
    
        queryset.annotate(min_field=SubqueryMin('field'))
    
        """
        aggregate = None  # Must be set by the subclass, or passed as kwarg
        unordered = None
    
        def __init__(self, *args, **extra):
            self.aggregate = extra.pop('aggregate', self.aggregate)
            self.ordering = extra.pop('ordering', None)
            assert self.aggregate is not None, "Error: Attempt to instantiate a " \
                                               "SubqueryAggregate with no aggregate function"
            super(SubqueryAggregate, self).__init__(*args, **extra)
    
        def get_queryset(self, query, allow_joins, reuse, summarize):
            queryset = self._get_base_queryset(query, allow_joins, reuse, summarize)
            annotation = self._get_annotation(query, allow_joins, reuse, summarize)
            return queryset.annotate(**annotation).values('aggregation')
    
        def aggregate_kwargs(self):
            aggregate_kwargs = dict()
            if self.distinct:
                aggregate_kwargs['distinct'] = self.distinct
            if self.ordering:
                aggregate_kwargs['ordering'] = self.ordering
    
            return aggregate_kwargs
    
        def _get_annotation(self, query, allow_joins, reuse, summarize):
            resolved_expression = self.expression.resolve_expression(query, allow_joins, reuse, summarize)
            model = self._get_model_from_resolved_expression(resolved_expression)
            queryset = model._default_manager.all()
            # resolved_expression was resolved in the outer query to get the model
            # target_expression is resolved in the subquery to get the field to aggregate
            target_expression = self._resolve_to_target(resolved_expression, queryset.query, allow_joins, reuse,
                                                        summarize)
    
            # Add test for output_field, distinct, and when resolved_expression.field.name isn't what we're aggregating
    
            if not self.output_field:
                self._output_field = self.output_field = target_expression.field
    
            kwargs = self.aggregate_kwargs()
    
            aggregation = self.aggregate(target_expression, **kwargs)
    
            annotation = {
                'aggregation': aggregation
            }
    
            return annotation
    
        def _resolve_to_target(self, resolved_expression, query, allow_joins, reuse, summarize):
            if resolved_expression.get_source_expressions():
                c = resolved_expression.copy()
                c.is_summary = summarize
                new_source_expressions = [self._resolve_to_target(source_expressions, query, allow_joins, reuse, summarize)
                                          for source_expressions in resolved_expression.get_source_expressions()]
                c.set_source_expressions(new_source_expressions)
                return c
    
            else:
                try:
                    return F(resolved_expression.target.name).resolve_expression(query, allow_joins, reuse, summarize)
                except (FieldError, AttributeError):
                    return resolved_expression