rubyruby-on-rails-3mongodbmongoidmongoid3

adding allowDiskUse parameter to db.collection.aggregate() query using Mongoid


I recently updated mongodb from 2.4 to 2.6, and the new memory limit in aggregate() is causing my aggregation to fail with the following error:

Moped::Errors::OperationFailure: The operation: #<Moped::Protocol::Command
  @length=251
  @request_id=6
  @response_to=0
  @op_code=2004
  @flags=[:slave_ok]
  @full_collection_name="items.$cmd"
  @skip=0
  @limit=-1
  @selector={:aggregate=>"items", :pipeline=>[{"$group"=>{"_id"=>"$serial_number", "total"=>{"$sum"=>1}}}, {"$match"=>{"total"=>{"$gte"=>2}}}, {"$sort"=>{"total"=>-1}}, {"$limit"=>750000}]}
  @fields=nil>
failed with error 16945: "exception: Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in."

So, I'm trying to pass allowDiskUse: true in the query:

dupes = Item.collection.aggregate([{
                                     '$group' => {'_id' => "$serial_number", 'total' =>  { "$sum" =>  1 } } },
                                                 { '$match' =>  { 'total' =>  { '$gte' =>  2 } } },
                                                 { '$sort' =>  {'total' =>  -1}},
                                                 { '$limit' => 750000 }],
                                     { 'allowDiskUse' => true })

But this isnt working.... no matter how I try I get this error:

Moped::Errors::OperationFailure: The operation: #<Moped::Protocol::Command
  @length=274
  @request_id=2
  @response_to=0
  @op_code=2004
  @flags=[:slave_ok]
  @full_collection_name="items.$cmd"
  @skip=0
  @limit=-1
  @selector={:aggregate=>"items", :pipeline=>[{"$group"=>{"_id"=>"$serial_number", "total"=>{"$sum"=>1}}}, {"$match"=>{"total"=>{"$gte"=>2}}}, {"$sort"=>{"total"=>-1}}, {"$limit"=>750000}, {"allowDiskUse"=>true}]}
  @fields=nil>
failed with error 16436: "exception: Unrecognized pipeline stage name: 'allowDiskUse'"

Does anyone know how I can structure this query appropriately to pass allowDiskUse outside of the pipeline arg?


Solution

  • The problem is that Moped does not currently permit options for Moped::Collection#aggregate, just a pipeline for args, as can be seen here: https://github.com/mongoid/moped/blob/master/lib/moped/collection.rb#L146 - the Mongo Ruby driver supports options for Mongo::Collection#aggregate, but Mongoid 3 uses Moped for its driver.

    However, thanks to the dynamic nature of Ruby, you can work around this. The following test includes a monkey-patch for Moped::Collection#aggregate provided that you supply the pipeline as an array for the first argument, allowing you to tack on options like allowDiskUse.

    Hope that this helps.

    test/unit/item_test.rb

    require 'test_helper'
    
    module Moped
      class Collection
        def aggregate(pipeline, opts = {})
          database.session.command({aggregate: name, pipeline: pipeline}.merge(opts))["result"]
        end
      end
    end
    
    class ItemTest < ActiveSupport::TestCase
      def setup
        Item.delete_all
      end
    
      test "moped aggregate with allowDiskUse" do
        puts "\nMongoid::VERSION:#{Mongoid::VERSION}\nMoped::VERSION:#{Moped::VERSION}"
        docs = [
            {serial_number: 1},
            {serial_number: 2},
            {serial_number: 2},
            {serial_number: 3},
            {serial_number: 3},
            {serial_number: 3}
        ]
        Item.create(docs)
        assert_equal(docs.count, Item.count)
        dups = Item.collection.aggregate(
            [{'$group' => {'_id' => "$serial_number", 'total' => {"$sum" => 1}}},
             {'$match' => {'total' => {'$gte' => 2}}},
             {'$sort' => {'total' => -1}},
             {'$limit' => 750000}],
            {'allowDiskUse' => true})
        p dups
      end
    end
    

    $ rake test

    Run options:
    
    # Running tests:
    
    [1/1] ItemTest#test_moped_aggregate_with_allowDiskUse
    Mongoid::VERSION:3.1.6
    Moped::VERSION:1.5.2
    [{"_id"=>3, "total"=>3}, {"_id"=>2, "total"=>2}]
    Finished tests in 0.027865s, 35.8873 tests/s, 35.8873 assertions/s.
    1 tests, 1 assertions, 0 failures, 0 errors, 0 skips