In today’s world, data is money. Companies are scrambling to collect as much data as possible, in an attempt to find hidden patterns that can be acted upon to drive revenue. However, if those companies aren’t using that data, and they’re not analyzing it to find those hidden gems, the data is worthless.
One of the most challenging tasks when getting started with Hadoop and building a big data solution is figuring out how to take the tools you have and put them together. The Hadoop ecosystem encompasses about a dozen different open-source projects. How do we pick the right tools for the job?
Just Another Data Management System
Most data management systems have a minimum of three pieces: data ingestion, data storage, and data analysis. The flow of these systems looks like the following:
A data ingestion system is the connection between the data source and the storage location where the data will reside, while at rest. A data analysis system is used to process the data, and produce actionable insights. Translating into a relational architecture, we can replace the generic terms:
We can take this basic architecture of ingestion, storage, and processing, and map it onto the Hadoop ecosystem, as well:
Certainly, this is not the only Hadoop architecture, and by pulling in other ecosystem projects, much more complicated systems can be built. However, this is a very common Hadoop architecture, and can be used to bootstrap a foray into the world of big data. In the rest of this article, we’ll work through an example application, and use Apache Flume, Apache HDFS,Apache Oozie, and Apache Hive to design an end-to-end data pipeline that will enable us to analyze Twitter data. All the code and instructions necessary to reproduce this pipeline is available on the Cloudera Github.
Motivation: Measuring Influence
Social media has gained immense popularity with marketing teams, and Twitter is an effective tool for a company to get people excited about their products. Twitter makes it easy to engage users and communicate directly with them, and in turn, users can provide word-of-mouth marketing for companies by discussing the products. Given limited resources, and knowing we may not be able to talk to everyone we want to target directly, marketing departments can be more efficient by being selective about whom we reach out to.
In order to understand whom we should target, let’s take a step back and try to understand the mechanics of Twitter. A user -- let’s call him Joe -- follows a set of people, and has a set of followers. When Joe sends an update out, that update is seen by all of his followers. Joe can also retweet other users’ updates. A retweet is a repost of an update, much like you might forward an email. If Joe sees a tweet from Sue, and retweets it, all of Joe’s followers see Sue’s tweet, even if they don’t follow Sue. Through retweets, messages can get passed much further than just the followers of the person who sent the original tweet. Knowing that, we can try to engage users whose updates tend to generate lots of retweets. Since Twitter tracks retweet counts for all tweets, we can find the users we’re looking for by analyzing Twitter data.
Now we know the question we want to ask: which Twitter users get the most retweets? Who is influential within our industry?
How do we answer these questions?
SQL queries can be used to answer this question: we want to look at which users are responsible for the most retweets, in descending order of most retweeted. However, querying Twitter data in a traditional RDBMS is inconvenient, since the Twitter Streaming API outputs tweets in a JSON format which can be arbitrarily complex. In the Hadoop ecosystem, the Hive project provides a query interface which can be used to query data that resides in HDFS. The query language looks very similar to SQL, but allows us to easily model complex types, so we can easily query the type of data we have. Seems like a good place to start. So how do we get Twitter data into Hive? First, we need to get Twitter data into HDFS, and then we’ll be able to tell Hive where the data resides and how to read it.
The diagram above shows a high-level view of how some of the CDH components can be pieced together to build the data pipeline we need to answer the questions we have.
Gathering Data with Apache Flume
The Twitter Streaming API will give us a constant stream of tweets coming from the service. One option would be to use a simple utility like curl to access the API and then periodically load the files. However, this would require us to write code to control where the data goes in HDFS, and if we have a secure cluster, we will have to integrate with security mechanisms. It will be much simpler to use components within CDH to automatically move the files from the API to HDFS, without our manual intervention.
Apache Flume is a data ingestion system that is configured by defining endpoints in a data flow called sources and sinks. In Flume, each individual piece of data (tweets, in our case) is called an event; sources produce events, and send the events through a channel, which connects the source to the sink. The sink then writes the events out to a predefined location. Flume supports some standard data sources, such as syslog or netcat. For this use case, we’ll need to design a custom source that accesses the Twitter Streaming API, and sends the tweets through a channel to a sink that writes to HDFS files. Additionally, we can use the custom source to filter the tweets on a set of search keywords to help identify relevant tweets, rather than a pure sample of the entire Twitter firehose. The custom Flume source code can be found here.
Partition Management with Apache Oozie
Once we have the Twitter data loaded into HDFS, we can stage it for querying by creating an external table in Hive. Using an external table will allow us to query the table without moving the data from the location where it ends up in HDFS. To ensure scalability, as we add more and more data, we’ll need to also partition the table. A partitioned table allows us to prune the files that we read when querying, which results in better performance when dealing with large data sets. However, the Twitter API will continue to stream tweets and Flume will perpetually create new files. We can automate the periodic process of adding partitions to our table as the new data comes in.
Apache Oozie is a workflow coordination system that can be used to solve this problem. Oozie is an extremely flexible system for designing job workflows, which can be scheduled to run based on a set of criteria. We can configure the workflow to run an ALTER TABLE command that adds a partition containing the last hour's worth of data into Hive, and we can instruct the workflow to occur every hour. This will ensure that we're always looking at up-to-date data.
The configuration files for the Oozie workflow are located here.
Querying Complex Data with Hive
Before we can query the data, we need to ensure that the Hive table can properly interpret the JSON data. By default, Hive expects that input files use a delimited row format, but our Twitter data is in a JSON format, which will not work with the defaults. This is actually one of Hive's biggest strengths. Hive allows us to flexibly define, and redefine, how the data is represented on disk. The schema is only really enforced when we read the data, and we can use the Hive SerDe interface to specify how to interpret what we've loaded. SerDe stands for Serializer and Deserializer, which are interfaces that tell Hive how it should translate the data into something that Hive can process. In particular, the Deserializer interface is used when we read data off of disk, and converts the data into objects that Hive knows how to manipulate. We can write a custom SerDe that reads the JSON data in and translates the objects for Hive. Once that's put into place, we can start querying. The JSON SerDe code can be found here. The SerDe will take a tweet in JSON form, and translate the JSON entities into queryable columns:
SELECT created_at, entities, text, user FROM tweets WHERE user.screen_name='ParvezJugon' AND retweeted_status.user.screen_name='ScottOstby';
results in:
created_at | entities | text | user |
Mon Sep 10 21:19:23 +0000 2012 | {"urls":[],"user_mentions": [ {"screen_name":"ScottOstby", "name":"Scott Ostby"}], "hashtags":[{"text":"Crowdsourcing"}]} | RT @ScottOstby: #Crowdsourcing – drivers already generate traffic data for your smartphone to suggest alternative routes when a road is ... | {"screen_name":"ParvezJugon", "name":"Parvez Jugon", "friends_count":299, "followers_count":70, "statuses_count":1294, "verified":false, "utc_offset":null, "time_zone":null} |
We've now managed to put together an end-to-end system, which gathers data from the Twitter Streaming API, sends the tweets to files on HDFS through Flume, and uses Oozie to periodically load the files into Hive, where we can query the raw JSON data, through the use of a Hive SerDe.
Some Results
In my own testing, I let Flume collect data for about three days, filtering on a set of keywords:
hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
The collected data was about half a GB of JSON data, like the tweet above. The data has some structure, but certain fields may or may not exist. The retweeted_status field, for example, will only be present if the tweet was a retweet. Additionally, some of the fields may be arbitrarily complex. The hashtags field is an array of all the hashtags present in the tweets, but most RDBMS’s do not support arrays as a column type. This semi-structured quality of the data makes the data very difficult to query in a traditional RDBMS. Hive can handle this data much more gracefully.
The query below will find usernames, and the number of retweets they have generated across all the tweets that we have data for:
SELECT t.retweeted_screen_name, sum(retweets) AS total_retweets, count(*) AS tweet_count FROM (SELECT retweeted_status.user.screen_name as retweeted_screen_name, retweeted_status.text, max(retweet_count) as retweets FROM tweets GROUP BY retweeted_status.user.screen_name, retweeted_status.text) t GROUP BY t.retweeted_screen_name ORDER BY total_retweets DESC LIMIT 10;
For the few days of data, I found that these were the most retweeted users for the industry:
retweeted_screen_name total_retweets tweet_count mauricefreedman 493 1 HarvardBiz 362 6 TechCrunch 314 7 googleanalytics 244 10 BigDataBorat 201 6 stephen_wolfram 182 1 CloudExpo 153 28 TheNextWeb 150 1 GonzalezCarmen 121 10 IBMbigdata 100 37
From these results, we can see whose tweets are getting heard by the widest audience, and also determine whether these people are communicating on a regular basis or not. We can use this information to more carefully target our messaging, in order to get them talking about our products, which, in turn, will get other people talking about our products.
Conclusion
In this article, we’ve seen how we can take some of the components of CDH and combine them to create an end-to-end data management system. This same architecture could be used for a variety of applications designed to look at Twitter data, such as identifying spam accounts, or identifying clusters of keywords. Taking the system even further, the more general architecture can be used across numerous applications. By plugging in different Flume sources and Hive SerDes, this application can be customized for many other applications, like analyzing web logs, to give an example. Grab the code, and give it a shot yourself.
About the Author
Jon Natkins (@nattybnatkins) is a software engineer at Cloudera, where he is focused on educating users to more effectively use Hadoop and CDH. He is a periodic contributor to the related Apache projects, and previously, he was a developer on Cloudera Manager.