mongodbmeteoraggregation-frameworkreal-timeminimongo

merging two collections time stamped data and showing real time result


The Inocmes and Expenses collections used complete separately in many places in whole app. there is only one page which have the below requirement. I don't believe there is no workaround in Mongodb, which really have millions of users :(

I am using React-Meteor in a project have two collection named Incomes and Expenses. income Doc look like below

{
    "_id" : "euAeJYArsAyFWLJs6",
    "account" : "3m5Zxsije9b6ZaNpu",
    "amount" : 3,
    "receivedAt" : ISODate("2017-07-07T06:21:00.000Z"),
    "type" : "project",
    "project" : {
        "_id" : "ec2WLt3GzHNwhK7oK",
        "name" : "test"
    },
    "owner" : "nM4ToQEbYBsC3NoQB",
    "createdAt" : ISODate("2017-07-07T06:21:37.293Z")
}

and below how the expense Doc look like

{
    "_id" : "snWDusDbkLHHY2Yry",
    "account" : "3m5Zxsije9b6ZaNpu",
    "amount" : 4,
    "spentAt" : ISODate("2017-07-07T06:21:00.000Z"),
    "description" : "4",
    "category" : {
        "_id" : "vh593tw9dZgNdNwtr",
        "name" : "test",
        "icon" : "icon-icons_tution-fee"
    },
    "owner" : "nM4ToQEbYBsC3NoQB",
    "createdAt" : ISODate("2017-07-07T06:22:04.215Z")
}

Now I have a page called transactions where I have to show all transaction (incomes and expenses) based on Time so my publication code for transactions look like below

import { Meteor } from 'meteor/meteor';
import { Incomes } from '../../incomes/incomes.js';
import { Expenses } from '../../expences/expenses.js';
import { Counter } from 'meteor/natestrauser:publish-performant-counts';


let datefilter = (options, query) => {
    let dateQuery = {$gte: new Date(options.dateFilter.start), $lte: new Date(options.dateFilter.end)};
    let temp = {$or: [{receivedAt: dateQuery}, {spentAt: dateQuery}]};
    query.$and.push(temp);
};
Meteor.publish('transactions', function(options) {
    let query = {
        owner: this.userId,
        $and: []
    };

    if(options.accounts.length)
        query['account'] = {$in: options.accounts};

    options.dateFilter && datefilter(options, query);
    //here i also apply other filter based on category and project which does not matter so i removed

    if(!query.$and.length) delete query.$and;

    //computing 'Transactions' below
    return [
        Incomes.find(query, {
            sort: {
                receivedAt: -1
            },
            limit: options.limit,
            skip: options.skip
        }),
        Expenses.find(query, {
            sort: {
                spentAt: -1
            },
            limit: options.limit,
            skip: options.skip
        })
    ]
}); 

Till here every thing working fine until I have to implement pagination on transaction page, so here data have to be sorted by Date. that's the real problem. Assume my both collections have 10 record each and my Template page have to contain first 10 result, so I sent skip 0 and limit 10, in return I got 10 incomes and 10 expenses records and on second page there is no record because skip and limit sent 10 for both. so how to deal with it? I also used counter Technic but that didn't work. remember my data is real time too. any help will be greatly appreciated :)


Solution

  • This is a Temporary Solution for a quick join Issue, You should consider your data-set amount and passed all checks before apply Meantime I will change my schema as @NeilLunn suggested and plan migrations shortly.

    For the adhoc Fix that meet the display requirements I applied the aggregate with the combination of $out. Now the code look like below

    Meteor.publish('transaction', function(options){
        //here I perform many filter based on query and options
        //which deleted to shorten code on SO
    
        //call asynchronous method just to get result delay :)
        Meteor.call('copyTransactions', (err, res) => {
            //something do here
        });
        //note the we return results from **Transactions** which is comes as third collection
        return [
            Transactions.find(query, {
                sort: sortbyDate,
                skip: options.skip,
                limit: options.limit,
            }),
            new Counter('transactionsCount', Transactions.find(query, {
                sort: sortbyDate
            }))
        ];
    });
    

    Now publish Transactions (a separate collection) which I desired as a merge collection . Note this one ends with s in name as transactions so don't confuse with above one (transaction)

    publish the merge collection separately as "transactions"

    Meteor.publish('transactions', function(limit){
        return Transactions.find(
            {
                owner: this.userId
            });
    });
    

    and here is the most important method which called in publication to merge two collection in third collection in which I first aggregated all result with $out and then append second collection with batch insert

    import { Expenses } from '../../../api/expences/expenses'
    import { Incomes } from '../../../api/incomes/incomes'
    import { Transactions } from '../transactions'
    import Future from 'fibers/future';
    export const copyTransactions = new ValidatedMethod({
        name: 'copyTransactions',
        validate:null,
        run() {
            //we are using future here to make it asynchronous
            let fu = new Future();
            //here Expenses directly copied in new collection name Transactions
            // TODO: use $rename or $addField in aggregate instead of $project
            Expenses.aggregate([{
                $project: {
                    account : "$account",
                    amount : "$amount",
                    transactionAt : "$spentAt",
                    description : "$description",
                    category : "$category",
                    type: {
                        $literal: 'expense'
                    },
                    owner : "$owner",
                    createdAt : "$createdAt"
                }
            }, {
                $out: "transactions"
            } ]);
            //now append Transactions collection with incomes with batch insert
            Incomes.aggregate([{
                $project: {
                    account : "$account",
                    amount : "$amount",
                    transactionAt : "$receivedAt",
                    type:{
                        $literal: 'income'
                    },
                    project : "$project",
                    owner : "$owner",
                    createdAt : "$createdAt"
                }
            }], function (err, result) {
                //if no doc found then just return
                if(!result.length){
                    fu.return('completed')
                }
                else{
                    Transactions.batchInsert(result, function(err, res){
                        fu.return('completed')
                    })
                }
    
            });
            return fu.wait();
        }
    });
    

    If Second Collection aggregated too with $out then it will overwrite :(

    @client I just have to subscribe the 'transaction' with my options and query and got the real time merge results from Transactions