With the amount of data that organizations generate exploding from gigabytes to terabytes to petabytes, traditional databases are unable to scale up to manage such big data sets. Using these solutions, the cost of storing and processing data will significantly increase as the data grows. This is resulting in organizations looking for other economical solutions such as NoSQL databases that provide the required data storage and processing capabilities, scalability and cost effectiveness. NoSQL databases do not use SQL as the query language. There are different types of these databases such as document stores, key-value stores, graph database, object database, etc.
Typical use cases for NoSQL database includes archiving old logs, event logging, ecommerce application log, gaming data, social data, etc. due to its fast read-write capability. The stored data would then require to be processed to gain useful insights on customers and their usage of the applications.
The NoSQL database we use in this article is MongoDB which is an open source document oriented NoSQL database system written in C++. It provides a high performance document oriented storage as well as support for writing MapReduce programs to process data stored in MongoDB documents. It is easily scalable and supports auto partitioning. Map Reduce can be used for aggregation of data through batch processing. MongoDB stores data in BSON (Binary JSON) format, supports a dynamic schema and allows for dynamic queries. The Mongo Query Language is expressed as JSON and is different from the SQL queries used in an RDBMS. MongoDB provides an Aggregation Framework that includes utility functions such as count, distinct
and group
. However more advanced aggregation functions such as sum, average, max, min, variance
and standard deviation
need to be implemented using MapReduce.
This article describes the method of implementing common aggregation functions like sum, average, max, min, variance
and standard deviation
on a MongoDB document using its MapReduce functionality. Typical applications of aggregations include business reporting of sales data such as calculation of total sales by grouping data across geographical locations, financial reporting, etc.
Let's start with installing the required software for running the sample application discussed in this article.
Software Setup
We first install and set up the MongoDB server on a local machine.
- Download MongoDB from the official Mongo website and unzip the files to a preferred directory on the local machine.
For example,C:\>Mongo
- Create a
Data
directory in the same folder.
For example,C:\Mongo\Data>
- If data files are stored elsewhere,
--dbpath
command line parameter needs to be specified while starting MongoDB server using the commandmongod.exe
. - Starting up the server
- MongoDB provides a couple of executable for this purpose.
mongod.exe
is the database server daemon andmongo.exe
is the administrative shell. These two executable files are located inMongo\bin
folder. - Change the directory to bin folder of Mongo home
For example,C:\> cd Mongo\bin
- There are two ways of starting the server as shown below.
mongod.exe --dbpath C:\Mongo\data
or
mongod.exe --config mongodb.config
wheremongodb.config
is a configuration file located inMongo\bin
folder. We specify the location of the data folder (i.e.dbpath= C:\Mongo\Data
) in this configuration file.
- MongoDB provides a couple of executable for this purpose.
- Connecting to the server
At this point, mongo server is started, and can be connected to using the URLhttp://localhost:27017/
Now that the MongoDB is up and running, let’s look at the aggregation functions.
Implementing Aggregation Functions
In a relational database, we can execute SQL queries with pre-defined aggregation functions such as SUM(), COUNT(), MAX()
or MIN()
on a numerical column. But in MongoDB, MapReduce functionality is used for aggregation and batch processing of data. It is similar to the GROUP BY
clause that is used for aggregating data in SQL. The next section describes the SQL way of performing aggregations in a relational database and the corresponding implementation using the MapReduce functionality provided by MongoDB.
For this discussion, let's consider a Sales table represented as shown below, in de-normalized form in MongoDB.
Sales Table
# |
Column Name |
Data Type |
1 |
OrderId |
INTEGER |
2 |
OrderDate |
STRING |
3 |
Quantity |
INTEGER |
4 |
SalesAmt |
DOUBLE |
5 |
Profit |
DOUBLE |
6 |
CustomerName |
STRING |
7 |
City |
STRING |
8 |
State |
STRING |
9 |
ZipCode |
STRING |
10 |
Region |
STRING |
11 |
ProductId |
INTEGER |
12 |
ProductCategory |
STRING |
13 |
ProductSubCategory |
STRING |
14 |
ProductName |
STRING |
15 |
ShipDate |
STRING |
Mapping SQL & Map Reduce based Implementations
We have provided a sample set of queries that use some aggregation functions, filter criteria and grouping clauses, and its equivalent MapReduce implementations, which is the MongoDB equivalent of performing a GROUP BY
in SQL. This is very useful for running aggregation operations on a MongoDB document. A limitation of this approach is that the aggregation functions such as SUM, AVG, MIN
or MAX
have to be custom implemented in the mapper and reducer functions.
MongoDB does not support user defined functions (UDFs) out-of-the-box. But it allows creating and saving JavaScript functions using the db.system.js.save
command. The JavaScript functions thus created can then be reused in the MapReduce functions. The table below shows the implementations of some commonly used aggregation functions. Later, we will discuss the usage of these functions in MapReduce jobs.
Aggregation Function |
Javascript Function |
|
db.system.js.save( { _id : "Sum" , value : function(key,values) { var total = 0; for(var i = 0; i < values.length; i++) total += values[i]; return total; }}); |
|
db.system.js.save( { _id : "Avg" , value : function(key,values) { var total = Sum(key,values); var mean = total/values.length; return mean; }}); |
|
db.system.js.save( { _id : "Max" , value : function(key,values) { var maxValue=values[0]; for(var i=1;i<values.length;i++) { if(values[i]>maxValue) { maxValue=values[i]; } } returnmaxValue; }}); |
|
db.system.js.save( { _id : "Min" , value : function(key,values) { var minValue=values[0]; for(var i=1;i<values.length;i++) { if(values[i]<minValue) { minValue=values[i]; } } return minValue; }}); |
|
db.system.js.save( { _id : "Variance" , value : function(key,values) { var squared_Diff = 0; var mean = Avg(key,values); for(var i = 0; i < values.length; i++) { var deviation = values[i] - mean; squared_Diff += deviation * deviation; } var variance = squared_Diff/(values.length); return variance; }}); |
|
db.system.js.save( { _id : "Standard_Deviation" , value : function(key,values) { var variance = Variance(key,values); return Math.sqrt(variance); }}); |
The code snippets for SQL and MapReduce scripts for implementing the aggregation functions in four different use case scenarios are shown in the table below:
1. Average order quantity across geo locations
The following query is used to fetch the average order quantity across different geographic locations.
SQL Query |
MapReduce Functions |
|
db.sales.runCommand( { mapreduce : "sales" , |
|
map:function() { // emit function handles the group by emit( { // Key city:this.City, state:this.State, region:this.Region}, // Values this.Quantity); }, |
|
reduce:function(key,values) { var result = Avg(key, values); return result; } |
|
|
|
// Group By is handled by the emit(keys, values) line in the map() function above |
out : { inline : 1 } }); |
2. Total product sales across product categories
The following query is used to fetch the total sales amount grouped by multiple levels of product categories. The different product categories used in the following example as individual dimensions could also be defined as a complex hierarchy based dimension.
SQL Query |
MapReduce Functions |
|
db.sales.runCommand( { mapreduce : "sales" , |
|
map:function() { emit( // Key {key0:this.ProductCategory, key1:this.ProductSubCategory, key2:this.ProductName}, // Values this.SalesAmt); }, |
|
reduce:function(key,values) { var result = Sum(key, values); return result; } |
|
|
|
// Group By is handled by the emit(keys, values) line in the map() function above |
out : { inline : 1 } }); |
3. Maximum profit for a product
The following query is used to fetch the maximum profit for a given product based on the filter criteria.
SQL Query |
MapReduce Functions |
|
db.sales.runCommand( { mapreduce : "sales" , |
|
map:function() { if(this.ProductId==1) emit( { key0:this.ProductId, key1:this.ProductName}, this.Profit); }, |
|
reduce:function(key,values) { var maxValue=Max(key,values); return maxValue; } |
|
|
|
// WHERE condition implementation is provided in map() function |
|
// Group By is handled by the emit(keys, values) line in the map() function above |
out : { inline : 1 } }); |
4. Total quantity, total sales and average profit
The requirement for this scenario is to calculate the total quantity, total sales and average profit for orders whose ID’s are in the range 1 to 10 and ShipDate is between Jan 1 and Dec 31 of year 2011. Following query is used to perform multiple aggregations such as the total quantity, total sales and average profit in the specified year and for orders in a given range across different regions and product categories.
SQL Query |
MapReduce Functions |
|
db.sales.runCommand( { mapreduce : "sales" , |
|
map:function() { emit( { // Keys region:this.Region, productCategory:this.ProductCategory, productid:this.ProductId}, // Values {quantSum:this.Quantity, salesSum:this.SalesAmt, avgProfit:this.Profit} ); } |
|
reduce:function(key,values) { var result= {quantSum:0,salesSum:0,avgProfit:0}; var count = 0; values.forEach(function(value) { // Calculation of Sum(Quantity) result.quantSum += values[i].quantSum; // Calculation of Sum(Sales) result.salesSum += values[i].salesSum; result.avgProfit += values[i].avgProfit; count++; } // Calculation of Avg(Profit) result.avgProfit = result.avgProfit / count; return result; }, |
|
|
|
|
|
query : { "OrderId" : { "$gt" : 1 }, "OrderId" : { "$lt" : 10 }, "ShipDate" : { "$gt" : "01/01/2011" }, "ShipDate" : { "$lt" : "31/12/2011" }, }, |
|
// Group By is handled by the emit(keys, values) line in the map() function above |
|
limit : 3, |
out : { inline : 1 } }); |
Now that we have looked at the code examples of aggregation functions for different business scenarios, we are ready to test these functions.
Testing the Aggregation Functions
MapReduce functionality in MongoDB is invoked using the database command. The Map and Reduce functions are written in JavaScript syntax as described in the previous section. Following is the syntax used to execute MapReduce functions on the server.
db.runCommand( { mapreduce : <collection>, map : <mapfunction>, reduce : <reducefunction> [, query : <query filter object>] [, sort : <sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces>] [, limit : <number of objects to return from collection>] [, out : <see output options below>] [, keeptemp: <true|false>] [, finalize : <finalizefunction>] [, scope : <object where fields go into javascript global scope >] [, jsMode : true] [, verbose : true] } ) Where the Output Options include: { replace : "collectionName" } { merge : "collectionName" { reduce : "collectionName" } { inline : 1} |
Shown below are the commands needed to save an aggregation function and use it in a MapReduce function.
Start Mongo Shell and setup table
- Ensure that the Mongo Server is running. Then start the Mongo shell by running
mongo.exe
from Mongo home folder. - Switch the database by running the command:
> use mydb
- View the contents of the Sales table using the following command:
> db.sales.find()
Here is the output of the find command.
{ "_id" : ObjectId("4f7be0d3e37b457077c4b13e"), "_class" : "com.infosys.mongo.Sales", "orderId" : 1, "orderDate" : "26/03/2011", "quantity" : 20, "salesAmt" : 200, "profit" : 150, "customerName" : "CUST1", "productCategory" : "IT", "productSubCategory" : "software", "productName" : "Grad", "productId" : 1 } { "_id" : ObjectId("4f7be0d3e37b457077c4b13f"), "_class" : "com.infosys.mongo.Sales", "orderId" : 2, "orderDate" : "23/05/2011", "quantity" : 30, "salesAmt" : 200, "profit" : 40, "customerName" : "CUST2", "productCategory" : "IT", "productSubCategory" : "hardware", "productName" : "HIM", "productId" : 1 } { "_id" : ObjectId("4f7be0d3e37b457077c4b140"), "_class" : "com.infosys.mongo.Sales", "orderId" : 3, "orderDate" : "22/09/2011", "quantity" : 40, "salesAmt" : 200, "profit" : 80, "customerName" : "CUST1", "productCategory" : "BT", "productSubCategory" : "services", "productName" : "VOCI", "productId" : 2 } { "_id" : ObjectId("4f7be0d3e37b457077c4b141"), "_class" : "com.infosys.mongo.Sales", "orderId" : 4, "orderDate" : "21/10/2011", "quantity" : 30, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", "productName" : "CRUD", "productId" : 2 } { "_id" : ObjectId("4f7be0d3e37b457077c4b142"), "_class" : "com.infosys.mongo.Sales", "orderId" : 5, "orderDate" : "21/06/2011", "quantity" : 50, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", "productName" : "CRUD", "productId" : 1 }
Create and store the aggregation functions
- From the Mongo DB command prompt, run the following command.
> db.system.js.save( { _id : "Sum" , value : function(key,values) { var total = 0; for(var i = 0; i < values.length; i++) total += values[i]; return total; }});
- Now execute the MapReduce program on the sample table "Sales"
> db.sales.runCommand( { mapreduce : "sales" , map:function() { emit( {key0:this.ProductCategory, key1:this.ProductSubCategory, key2:this.ProductName}, this.SalesAmt); }, reduce:function(key,values) { var result = Sum(key, values); return result; } out : { inline : 1 } });
This would display the following output:
"results" : [ { "_id" : { "key0" : "BT", "key1" : "hardware", "key2" : "CRUD" }, "value" : 400 }, { "_id" : { "key0" : "BT", "key1" : "services", "key2" : "VOCI" }, "value" : 200 }, { "_id" : { "key0" : "IT", "key1" : "hardware", "key2" : "HIM" }, "value" : 200 }, { "_id" : { "key0" : "IT", "key1" : "software", "key2" : "Grad" }, "value" : 200 } ], "timeMillis" : 1, "timing" : { "mapTime" : NumberLong(1), "emitLoop" : 1, "total" : 1 }, "counts" : { "input" : 5, "emit" : 5, "output" : 4 }, "ok" : 1
Conclusion
MongoDB provides a document oriented storage that can easily scale to terabytes of data. It also provides Map Reduce functionality that can be used for aggregation of data using SQL-like functions through batch processing. In this article, we described the process for setting up MongoDB and performing aggregation functions using MapReduce feature. We also provided a few sample MapReduce implementations for simple SQL based aggregation queries. Using MapReduce functionality more complex aggregation functions can be implemented on the data stored in MongoDB documents.
About the Authors
Arun Viswanathan works as a Technology Architect with Cloud Center of Excellence (CoE) in Infosys Ltd, a global leader in IT & Business Consulting Services. Arun has around 9.5 years of experience in Java, Java EE, Cloud and Big Data application architecture definition and implementation. He is currently involved in design, development and consulting for Big Data solutions. He can be reached at Arun_Viswanathan01@infosys.com. |
|
Shruthi Kumar works as a Technology Analyst with Cloud Center of Excellence (CoE) in Infosys Ltd, a global leader in IT & Business Consulting Services. Shruthi has 5 years of experience in Java, Grid Computing, Cloud and Big Data application architecture. She is currently involved in development and consulting for Big Data solutions. She can be reached at Shruthi_Kumar01@infosys.com. |