BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Implementing Aggregation Functions in MongoDB

Implementing Aggregation Functions in MongoDB

With the amount of data that organizations generate exploding from gigabytes to terabytes to petabytes, traditional databases are unable to scale up to manage such big data sets. Using these solutions, the cost of storing and processing data will significantly increase as the data grows. This is resulting in organizations looking for other economical solutions such as NoSQL databases that provide the required data storage and processing capabilities, scalability and cost effectiveness. NoSQL databases do not use SQL as the query language. There are different types of these databases such as document stores, key-value stores, graph database, object database, etc.

Typical use cases for NoSQL database includes archiving old logs, event logging, ecommerce application log, gaming data, social data, etc. due to its fast read-write capability. The stored data would then require to be processed to gain useful insights on customers and their usage of the applications.

The NoSQL database we use in this article is MongoDB which is an open source document oriented NoSQL database system written in C++. It provides a high performance document oriented storage as well as support for writing MapReduce programs to process data stored in MongoDB documents. It is easily scalable and supports auto partitioning. Map Reduce can be used for aggregation of data through batch processing. MongoDB stores data in BSON (Binary JSON) format, supports a dynamic schema and allows for dynamic queries. The Mongo Query Language is expressed as JSON and is different from the SQL queries used in an RDBMS. MongoDB provides an Aggregation Framework that includes utility functions such as count, distinct and group. However more advanced aggregation functions such as sum, average, max, min, variance and standard deviation need to be implemented using MapReduce.

This article describes the method of implementing common aggregation functions like sum, average, max, min, variance and standard deviation on a MongoDB document using its MapReduce functionality. Typical applications of aggregations include business reporting of sales data such as calculation of total sales by grouping data across geographical locations, financial reporting, etc.

Let's start with installing the required software for running the sample application discussed in this article.

Software Setup

We first install and set up the MongoDB server on a local machine.

  • Download MongoDB from the official Mongo website and unzip the files to a preferred directory on the local machine.
            For example, C:\>Mongo
  • Create a Data directory in the same folder.
            For example, C:\Mongo\Data>
    • If data files are stored elsewhere, --dbpath command line parameter needs to be specified while starting MongoDB server using the command mongod.exe.
  • Starting up the server
    • MongoDB provides a couple of executable for this purpose.
      mongod.exe is the database server daemon and mongo.exe is the administrative shell. These two executable files are located in Mongo\bin folder.
    • Change the directory to bin folder of Mongo home
      For example, C:\> cd Mongo\bin
    • There are two ways of starting the server as shown below.
      mongod.exe --dbpath C:\Mongo\data
      or
      mongod.exe --config mongodb.config
      where mongodb.config is a configuration file located in Mongo\bin folder. We specify the location of the data folder (i.e. dbpath= C:\Mongo\Data) in this configuration file.
  • Connecting to the server
              At this point, mongo server is started, and can be connected to using the URL http://localhost:27017/

Now that the MongoDB is up and running, let’s look at the aggregation functions.

Implementing Aggregation Functions

In a relational database, we can execute SQL queries with pre-defined aggregation functions such as SUM(), COUNT(), MAX() or MIN() on a numerical column. But in MongoDB, MapReduce functionality is used for aggregation and batch processing of data. It is similar to the GROUP BY clause that is used for aggregating data in SQL. The next section describes the SQL way of performing aggregations in a relational database and the corresponding implementation using the MapReduce functionality provided by MongoDB.

For this discussion, let's consider a Sales table represented as shown below, in de-normalized form in MongoDB.

Sales Table

#

Column Name

Data Type

1

OrderId

INTEGER

2

OrderDate

STRING

3

Quantity

INTEGER

4

SalesAmt

DOUBLE

5

Profit

DOUBLE

6

CustomerName

STRING

7

City

STRING

8

State

STRING

9

ZipCode

STRING

10

Region

STRING

11

ProductId

INTEGER

12

ProductCategory

STRING

13

ProductSubCategory

STRING

14

ProductName

STRING

15

ShipDate

STRING

Mapping SQL & Map Reduce based Implementations

We have provided a sample set of queries that use some aggregation functions, filter criteria and grouping clauses, and its equivalent MapReduce implementations, which is the MongoDB equivalent of performing a GROUP BY in SQL. This is very useful for running aggregation operations on a MongoDB document. A limitation of this approach is that the aggregation functions such as SUM, AVG, MIN or MAX have to be custom implemented in the mapper and reducer functions.

MongoDB does not support user defined functions (UDFs) out-of-the-box. But it allows creating and saving JavaScript functions using the db.system.js.save command. The JavaScript functions thus created can then be reused in the MapReduce functions. The table below shows the implementations of some commonly used aggregation functions. Later, we will discuss the usage of these functions in MapReduce jobs.

 

Aggregation Function

Javascript Function

SUM

db.system.js.save( { _id : "Sum" ,
value : function(key,values)
{
    var total = 0;
    for(var i = 0; i < values.length; i++)
        total += values[i];
    return total;
}});

AVERAGE

db.system.js.save( { _id : "Avg" ,
value : function(key,values)
{
    var total = Sum(key,values);
    var mean = total/values.length;
    return mean;
}});

MAX

db.system.js.save( { _id : "Max" ,
value : function(key,values)
{
    var maxValue=values[0];
    for(var i=1;i<values.length;i++)
    {
        if(values[i]>maxValue)
        {
            maxValue=values[i];
        }
    }
    returnmaxValue;
}});

MIN

db.system.js.save( { _id : "Min" ,
value : function(key,values)
{
    var minValue=values[0];
    for(var i=1;i<values.length;i++)
    {
        if(values[i]<minValue)
        {
            minValue=values[i];
        }
    }
    return minValue;
}});

VARIANCE

db.system.js.save( { _id : "Variance" ,
value : function(key,values)
{
    var squared_Diff = 0;
    var mean = Avg(key,values);
    for(var i = 0; i < values.length; i++)
    {
        var deviation = values[i] - mean;
        squared_Diff += deviation * deviation;
    }
    var variance = squared_Diff/(values.length);
    return variance;
}});

STD DEVIATION

db.system.js.save( { _id : "Standard_Deviation"
, value : function(key,values)
{
    var variance = Variance(key,values);
    return Math.sqrt(variance);
}});

The code snippets for SQL and MapReduce scripts for implementing the aggregation functions in four different use case scenarios are shown in the table below:

1. Average order quantity across geo locations

The following query is used to fetch the average order quantity across different geographic locations.

SQL Query

MapReduce Functions

SELECT

db.sales.runCommand(
{
mapreduce : "sales" ,

  

City,

State,

Region,

map:function()
{ // emit function handles the group by
        emit( {
        // Key
        city:this.City,
        state:this.State,
        region:this.Region},
        // Values
        this.Quantity);
},

  

AVG(Quantity)

reduce:function(key,values)
{
    var result = Avg(key, values);
    return result;
}

FROM sales

 

GROUP BY City, State, Region

// Group By is handled by the emit(keys, values)
 line in the map() function above
 
out : { inline : 1 } });

2. Total product sales across product categories

The following query is used to fetch the total sales amount grouped by multiple levels of product categories. The different product categories used in the following example as individual dimensions could also be defined as a complex hierarchy based dimension.

SQL Query

MapReduce Functions

SELECT

db.sales.runCommand(
{
mapreduce : "sales" ,

 

ProductCategory, ProductSubCategory, ProductName,

map:function()
{
        emit(
        // Key
        {key0:this.ProductCategory,
        key1:this.ProductSubCategory,
        key2:this.ProductName},
        // Values
        this.SalesAmt);
},

 

 SUM(SalesAmt)

reduce:function(key,values)
{
    var result = Sum(key, values);
    return result;
}

FROM sales

 

GROUP BY ProductCategory, ProductSubCategory, ProductName

// Group By is handled by the emit(keys, values) 
line in the map() function above
 
out : { inline : 1 } });

 

3. Maximum profit for a product

The following query is used to fetch the maximum profit for a given product based on the filter criteria.

SQL Query

MapReduce Functions

SELECT

db.sales.runCommand(
{
mapreduce : "sales" ,

 

 

 ProductId, ProductName,

map:function()
{
    if(this.ProductId==1)
        emit( {
            key0:this.ProductId,
            key1:this.ProductName},
            this.Profit);
},

 

MAX(SalesAmt)

reduce:function(key,values)
{
    var maxValue=Max(key,values);
    return maxValue;
}

FROM sales

 

WHERE ProductId=’1’

// WHERE condition implementation is provided in 
map() function

GROUP BY ProductId, ProductName

// Group By is handled by the emit(keys, values) 
line in the map() function above
 
out : { inline : 1 } });

4. Total quantity, total sales and average profit

The requirement for this scenario is to calculate the total quantity, total sales and average profit for orders whose ID’s are in the range 1 to 10 and ShipDate is between Jan 1 and Dec 31 of year 2011. Following query is used to perform multiple aggregations such as the total quantity, total sales and average profit in the specified year and for orders in a given range across different regions and product categories.

SQL Query

MapReduce Functions

SELECT

db.sales.runCommand(
{ mapreduce : "sales" ,

 

  

Region,

ProductCategory,

ProductId,

map:function()
{
    emit( {
        // Keys
        region:this.Region,
        productCategory:this.ProductCategory,
        productid:this.ProductId},

        // Values
        {quantSum:this.Quantity,
        salesSum:this.SalesAmt,
        avgProfit:this.Profit} );
}

 

 

 

 Sum(Quantity),

 Sum(Sales),

 Avg(Profit)

reduce:function(key,values)
{
    var result=
{quantSum:0,salesSum:0,avgProfit:0};
    var count = 0;
    values.forEach(function(value)
    {
        // Calculation of Sum(Quantity)
        result.quantSum += values[i].quantSum;
        // Calculation of Sum(Sales)
        result.salesSum += values[i].salesSum;
        result.avgProfit += values[i].avgProfit;
        count++;
    }
    // Calculation of Avg(Profit)
    result.avgProfit = result.avgProfit / count;
    return result;
},

FROM Sales

 

WHERE

 

Orderid between 1 and 10 AND

Shipdate BETWEEN ‘01/01/2011’ and

‘12/31/2011’

query : {
        "OrderId" : { "$gt" : 1 },
        "OrderId" : { "$lt" : 10 },
        "ShipDate" : { "$gt" : "01/01/2011" },
        "ShipDate" : { "$lt" : "31/12/2011" },
},

GROUP BY

Region, ProductCategory, ProductId

// Group By is handled by the emit(keys, values) 
line in the map() function above

LIMIT 3;

limit : 3,
 
out : { inline : 1 } });

Now that we have looked at the code examples of aggregation functions for different business scenarios, we are ready to test these functions.

Testing the Aggregation Functions

MapReduce functionality in MongoDB is invoked using the database command. The Map and Reduce functions are written in JavaScript syntax as described in the previous section. Following is the syntax used to execute MapReduce functions on the server.

db.runCommand(

    { mapreduce : <collection>,

        map : <mapfunction>,

        reduce : <reducefunction>

        [, query : <query filter object>]

        [, sort : <sorts the input objects using this key. Useful for 
 optimization, like sorting by the emit key for fewer reduces>]

        [, limit : <number of objects to return from collection>]

        [, out : <see output options below>]

        [, keeptemp: <true|false>]

        [, finalize : <finalizefunction>]

        [, scope : <object where fields go into javascript global scope >]

        [, jsMode : true]

        [, verbose : true]

    }

)


Where the Output Options include:

{ replace : "collectionName" }

{ merge : "collectionName"

{ reduce : "collectionName" }

{ inline : 1}

 

Shown below are the commands needed to save an aggregation function and use it in a MapReduce function.

Start Mongo Shell and setup table

  • Ensure that the Mongo Server is running. Then start the Mongo shell by running mongo.exe from Mongo home folder.
  • Switch the database by running the command:
     > use mydb
  • View the contents of the Sales table using the following command:
     > db.sales.find()

Here is the output of the find command.

{ "_id" : ObjectId("4f7be0d3e37b457077c4b13e"), "_class" : "com.infosys.mongo.Sales", "orderId" : 1, "orderDate" : "26/03/2011",
"quantity" : 20, "salesAmt" : 200, "profit" : 150, "customerName" : "CUST1", "productCategory" : "IT", "productSubCategory" : "software", 
"productName" : "Grad", "productId" : 1 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b13f"), "_class" : "com.infosys.mongo.Sales", "orderId" : 2, "orderDate" : "23/05/2011", 
"quantity" : 30, "salesAmt" : 200, "profit" : 40, "customerName" : "CUST2", "productCategory" : "IT", "productSubCategory" : "hardware",
 "productName" : "HIM", "productId" : 1 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b140"), "_class" : "com.infosys.mongo.Sales", "orderId" : 3, "orderDate" : "22/09/2011",
 "quantity" : 40, "salesAmt" : 200, "profit" : 80, "customerName" : "CUST1", "productCategory" : "BT", "productSubCategory" : "services",
 "productName" : "VOCI", "productId" : 2 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b141"), "_class" : "com.infosys.mongo.Sales", "orderId" : 4, "orderDate" : "21/10/2011", 
"quantity" : 30, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", 
"productName" : "CRUD", "productId" : 2 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b142"), "_class" : "com.infosys.mongo.Sales", "orderId" : 5, "orderDate" : "21/06/2011", 
"quantity" : 50, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", 
"productName" : "CRUD", "productId" : 1 }

Create and store the aggregation functions

  • From the Mongo DB command prompt, run the following command.
> db.system.js.save( { _id : "Sum" ,
value : function(key,values)
{
    var total = 0;
    for(var i = 0; i < values.length; i++)
        total += values[i];
    return total;
}});
  • Now execute the MapReduce program on the sample table "Sales"
> db.sales.runCommand(
{
mapreduce : "sales" ,
map:function()
{
emit(
{key0:this.ProductCategory,
key1:this.ProductSubCategory,
key2:this.ProductName},
this.SalesAmt);
},
reduce:function(key,values)
{
    var result = Sum(key, values);
    return result;
}
out : { inline : 1 } });

This would display the following output:

"results" : [
        {
                "_id" : {
                        "key0" : "BT",
                        "key1" : "hardware",
                        "key2" : "CRUD"
                },
                "value" : 400
        },
        {
                "_id" : {
                        "key0" : "BT",
                        "key1" : "services",
                        "key2" : "VOCI"
                },
                "value" : 200
        },
        {
                "_id" : {
                        "key0" : "IT",
                        "key1" : "hardware",
                        "key2" : "HIM"
                },
                "value" : 200
        },

        {
                "_id" : {
                        "key0" : "IT",
                        "key1" : "software",
                        "key2" : "Grad"
                },
                "value" : 200
        }
],
"timeMillis" : 1,
"timing" : {
        "mapTime" : NumberLong(1),
        "emitLoop" : 1,
        "total" : 1
},
"counts" : {
        "input" : 5,
        "emit" : 5,
        "output" : 4
},
"ok" : 1

Conclusion

MongoDB provides a document oriented storage that can easily scale to terabytes of data. It also provides Map Reduce functionality that can be used for aggregation of data using SQL-like functions through batch processing. In this article, we described the process for setting up MongoDB and performing aggregation functions using MapReduce feature. We also provided a few sample MapReduce implementations for simple SQL based aggregation queries. Using MapReduce functionality more complex aggregation functions can be implemented on the data stored in MongoDB documents.

About the Authors

Arun Viswanathan works as a Technology Architect with Cloud Center of Excellence (CoE) in Infosys Ltd, a global leader in IT & Business Consulting Services. Arun has around 9.5 years of experience in Java, Java EE, Cloud and Big Data application architecture definition and implementation. He is currently involved in design, development and consulting for Big Data solutions. He can be reached at Arun_Viswanathan01@infosys.com.

Shruthi Kumar works as a Technology Analyst with Cloud Center of Excellence (CoE) in Infosys Ltd, a global leader in IT & Business Consulting Services. Shruthi has 5 years of experience in Java, Grid Computing, Cloud and Big Data application architecture. She is currently involved in development and consulting for Big Data solutions. She can be reached at Shruthi_Kumar01@infosys.com.

 

Rate this Article

Adoption
Style

BT