I recently updated mongodb from 2.4 to 2.6, and the new memory limit in aggregate() is causing my aggregation to fail with the following error:
Moped::Errors::OperationFailure: The operation: #<Moped::Protocol::Command
@selector={:aggregate=>"items", :pipeline=>[{"$group"=>{"_id"=>"$serial_number", "total"=>{"$sum"=>1}}}, {"$match"=>{"total"=>{"$gte"=>2}}}, {"$sort"=>{"total"=>-1}}, {"$limit"=>750000}]}
failed with error 16945: "exception: Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in."
So, I'm trying to pass allowDiskUse: true in the query:
dupes = Item.collection.aggregate([{
'$group' => {'_id' => "$serial_number", 'total' => { "$sum" => 1 } } },
{ '$match' => { 'total' => { '$gte' => 2 } } },
{ '$sort' => {'total' => -1}},
{ '$limit' => 750000 }],
{ 'allowDiskUse' => true })
But this isnt working.... no matter how I try I get this error:
Moped::Errors::OperationFailure: The operation: #<Moped::Protocol::Command
@selector={:aggregate=>"items", :pipeline=>[{"$group"=>{"_id"=>"$serial_number", "total"=>{"$sum"=>1}}}, {"$match"=>{"total"=>{"$gte"=>2}}}, {"$sort"=>{"total"=>-1}}, {"$limit"=>750000}, {"allowDiskUse"=>true}]}
failed with error 16436: "exception: Unrecognized pipeline stage name: 'allowDiskUse'"
Does anyone know how I can structure this query appropriately to pass allowDiskUse outside of the pipeline arg?
The problem is that Moped does not currently permit options for Moped::Collection#aggregate, just a pipeline for args, as can be seen here: https://github.com/mongoid/moped/blob/master/lib/moped/collection.rb#L146 - the Mongo Ruby driver supports options for Mongo::Collection#aggregate, but Mongoid 3 uses Moped for its driver.
However, thanks to the dynamic nature of Ruby, you can work around this. The following test includes a monkey-patch for Moped::Collection#aggregate provided that you supply the pipeline as an array for the first argument, allowing you to tack on options like allowDiskUse.
Hope that this helps.
require 'test_helper'
module Moped
class Collection
def aggregate(pipeline, opts = {})
database.session.command({aggregate: name, pipeline: pipeline}.merge(opts))["result"]
class ItemTest < ActiveSupport::TestCase
def setup
test "moped aggregate with allowDiskUse" do
puts "\nMongoid::VERSION:#{Mongoid::VERSION}\nMoped::VERSION:#{Moped::VERSION}"
docs = [
{serial_number: 1},
{serial_number: 2},
{serial_number: 2},
{serial_number: 3},
{serial_number: 3},
{serial_number: 3}
assert_equal(docs.count, Item.count)
dups = Item.collection.aggregate(
[{'$group' => {'_id' => "$serial_number", 'total' => {"$sum" => 1}}},
{'$match' => {'total' => {'$gte' => 2}}},
{'$sort' => {'total' => -1}},
{'$limit' => 750000}],
{'allowDiskUse' => true})
p dups
$ rake test
Run options:
# Running tests:
[1/1] ItemTest#test_moped_aggregate_with_allowDiskUse
[{"_id"=>3, "total"=>3}, {"_id"=>2, "total"=>2}]
Finished tests in 0.027865s, 35.8873 tests/s, 35.8873 assertions/s.
1 tests, 1 assertions, 0 failures, 0 errors, 0 skips