Hadoopでタスクを処理するとき、複数のMap/Reduceを繋げてひとつの結果を得なければならない場合があります[1]。Hadoopのエコシステムの比較的新しいコンポーネントであるOozie[2]は、複数のMap/Reduceジョブをひとつの論理的な単位にまとめることで大きなタスクを処理することができます。この記事ではOozieの使い方を紹介します。
Oozieとは何か
OozieはJavaサーブレットコンテナで動作するJavaのWebアプリケーションです。コンテナにはTomcatを使います。また、次の2つを保存するためにデータベースを使います。
- ワークフローの定義
- 現在実行しているワークフローのインスタンス。インスタンスの状態や変数を含む
Oozieのワークフローにはアクション(例えばHadoop Map/ReduceのジョブやPigのジョブ)のコレクションで、DAG(無閉路有向グラフ)に従って並べられており、アクションの実行順が定義されています。このグラフはhPDL(XMLプロセス定義言語)が定義されています。
hPDLはかなり小さい言語で、限定されたフローコントロールとアクションノードを利用します。コントロールノードはフローの実行を定義し、フローの開始と終了(開始、終了、失敗ノード)、ワークフローの実行パスを制御する仕組み(決定、分岐、結合ノード)を含みます。アクションノードによって、ワークフローは計算/処理タスク実行を開始します。Oozieは、次のタイプのアクションをサポートします。Hadoop map-reduce、Hadoop file system、Pig、Java、Oozieのサブワークフロー(SSHアクションはOozieスキーマ0.2の時点で削除されています)。
アクションノードによって起動される計算/処理タスクはOozieとは独立しています。これらのタスクはHadoop Map/Reduceによって実行されます。こうすることでOozieは既存のHadoopの負荷分散やフェールオーバーの仕組みが利用できます。これらのタスクの大部分は非同期(ファイルシステムアクションは同期処理になる)で実行されます。つまり、ワークフローのジョブは計算/処理タスクが完了するまで次のノードへの遷移を待たなければなません。Oozieは2つの方法で計算/処理タスクの完了を検知します。それはコールバックとポーリングです。Oozieが計算/処理タスクを開始するとき、タスクに対してユニークなコールバックURLを与えます。タスクは処理が完了したことを通知するためにこのURLを実行します。タスクが何らかの理由(例えばネットワークの瞬断)でコールバックURLの実行に失敗した場合やタスク完了時にURLを実行できなかった場合に備えて、Oozieはポーリングで計算/処理タスクの完了を検知する仕組みも備えています。
Oozieのワークフローはパラメータ化(ワークフロー定義の${inputDir}のような変数を使って)できます。ワークフローのジョブを実行する場合は、パラメータの値を提供する必要があります。適切にパラメータ化(例えば出力ディレクトリを別にする)できれば複数の独立したワークフロージョブを同時に実行できます。
要求に従って実行できるワークフローもありますが、ほとんどのワークフローは一定間隔で定期的に実行されます。また、データの利用可能性や外部イベントに従って実行されることもあります。Oozie Coordinatorを使えばユーザはこれらのパラメータに従ってワークフローの実行を定義できます。また、述語の形式でワークフロー実行トリガをモデル化することができます。この述語はデータや時間、外部イベントを参照できます。この述語が満たされた場合にワークフロージョブが実行されます。
また、定期的かつ不規則な間隔で実行されるワークフロージョブを繋げる必要がある場合もあります。ワークフローで実行された複数の処理の結果が次のワークフローの入力になるからです。このようなワークフローの複数の結果を数珠つなぎにするのをデータアプリケーションパイプラインと言います。Oozie Coordinatorを使えばこのようなデータアプリケーションパイプラインの作成も行えます。
Oozieのインストール
Oozieは既存のHadoopシステムにインストールできます。tarballからでもRPMやDebian Packageからでもインストールできます。私がこの記事を書くにあたり、利用したHadoopはClouderaのCDH3です。これには元からOozieが含まれていますので、yumでCDH3を取得してエッジノードにインストールを実行すればいいだけです[1]。Oozieの配布物には2つのコンポーネント、OozieクライアントとOozieサーバが含まれています。クラスタの大きさによっては、同じエッジサーバに配置する場合もあるでしょうし、別々のサーバに置くこともあります。Oozieサーバにはジョブを準備し、制御するためのコンポーネントが含まれています。また、クライアントにはユーザがOozieのジョブを起動したりサーバと通信するためのコンポーネントが含まれています。
インストールについての詳細な情報はClouderaのウェブサイト[2]を参照ください。
注意:インストールとともに、シェル変数OOZIE_URLを
(export OOZIE_URL=http://localhost:11000/oozie)
.login, .kshrc、またはシェルのスタートアップファイルに定義しておくことをお勧めします。
簡単な例
Oozieの使い方を紹介するため簡単なサンプルを示します。ふたつのMap/Reduceジョブ[3]のうちのひとつはデータの取り込みを行い、ふたつ目は与えられた型にデータをマージします。実際に行いたいデータの取り込みは、まずデータを取り込んでLidarとMulticamという2つの型にデータを統合します。この処理を自動化するために下記の簡単なOozie Workflowを作成(Listing 1)しました。
to ="fail"/>name ="merging">start ="mergeLidar"/>start ="mergeSignage"/>name ='mergeLidar'>${jobTracker} ${nameNode} mapred.job.queue.name default com.navteq.assetmgmt.hdfs.merge.MergerLoader -Xmx2048m -drive ${driveID} -type Lidar -chunk ${lidarChunk} to ="completed"/>to ="fail"/>name ='mergeSignage'>${jobTracker} ${nameNode} mapred.job.queue.name default com.navteq.assetmgmt.hdfs.merge.MergerLoader -Xmx2048m -drive ${driveID} -type MultiCam -chunk ${signageChunk} to ="completed"/>to ="fail"/>name ="completed" to="end"/>name ="fail">Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}] name ='end'/>
Listing 1:単純なOozie Workflow
このワークフローには3つのアクションが定義されています。ingestor、mergeLidar、mergeSignageの3つです。各アクションはMap/Reduce[4]のジョブとして実装されています。ワークフローはスタートノードから始まり、Ingestorアクションに制御が移ります。Ingestorが終わると、分岐コントロールノードが実行されます。これは、 mergeLidarとmergeSignageを平行[5]して実行します。両方のアクションが完了すると、結合コントロールノード[6]が実行されます。結合ノードが正常に完了すると、制御はエンドノードに遷移し、処理が終了します。
ワークフローを作成したら、正しく配置する必要があります。Oozieの一般的な配置場所はHDFSデレクトリです。このデレクトリにworkflow.xml (Listing 1)、config-default.xml、libデレクトリを置きます。libデレクトリにはworkflowのアクションで使われるjarファイルを置きます。
(画像をクリックすると大きくなります)
Figure 1: Oozieの配置
A config-default.xmlファイルはオプションで、すべてのワークフローインスタンスに共通するのパラメータが記述されます。Listing 2がconfig-default.xmlの簡単な例です。
jobTracker sachicn003:2010 nameNode hdfs://sachicn001:8020 queueName default
Listing 2: Config-default.xml
ワークフローを配置したら、Oozieが提供するコマンドラインツールを使ってワークフローの起動や操作を行います。このコマンドラインツールは普通、Hadoopクラスタのエッジノード[7]上で実行します。また、ジョブのプロパティファイル(see sidebar – configuring workflow properties) - Listing 3.
oozie.wf.application.path=hdfs://sachicn001:8020/user/blublins/workflows/IPSIngestion jobTracker=sachicn003:2010 nameNode=hdfs://sachicn001:8020
Listing 3: ジョブのプロパティファイル
プロパティファイルを適切に配置し、Listing 4に示すコマンドを実行することでOozieのワークフローが走ります。
oozie job –oozie http://sachidn002.hq.navteq.com:11000/oozie/ -D driveID=729-pp00002-2011-02-08-09-59-34 -D lidarChunk=4 -D signageChunk=20 -config job.properties –run
Listing 4: Run workflow command
Workflowのプロパティを構成するconfig-default.xmlとジョブのプロパティファイルとコマンドラインの一部としてOozieに渡すことができるジョブの引数には重複する値があります。どれを使えばいいのかドキュメントは明確に記述していません。これについての推奨は下記の通りです。
Oozieは下記の通り、この3つのパラメータを処理します。
|
Oozieコンソール(Figure 2)はワークフローの進捗と結果を確認できる。
(クリックしてイメージ拡大)
Figure 2: Oozieコンソール
Oozieコンソールを使えば、ジョブの実行の詳細も確認できる。例えばジョブのログ[8]だ(Figure 3)。
(クリックしてイメージ拡大)
Figure 3: Oozieコンソール–ジョブのログ
プログラムからのワークフローの実行
コマンドラインインターフェイスを使えば、上記のように手動でOozieを実行できますが、プログラムから実行できた方が便利な場合があります。Oozieのワークフローを特定のアプリケーションや企業規模の処理の一部として使う場合です。このようなプログラムからの実行はOozie Web Services API[6] または Oozie Java client API[7]を使います。上述した処理をJavaから実行すると下記Listing 5のようになります。
package com.navteq.assetmgmt.oozie;import java.util.LinkedList;
import java.util.List;
import java.util.Properties;import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.WorkflowJob.Status;public class WorkflowClient {
private static String OOZIE_URL = "http://sachidn002.hq.navteq.com:11000/oozie/";
private static String JOB_PATH = "hdfs://sachicn001:8020/user/blublins/workflows/IPSIngestion";
private static String JOB_Tracker = "sachicn003:2010";
private static String NAMENode = "hdfs://sachicn001:8020";OozieClient wc = null;
public WorkflowClient(String url){
wc = new OozieClient(url);
}public String startJob(String wfDefinition, List
wfParameters)
throws OozieClientException{// create a workflow job configuration and set the workflow application path
Properties conf = wc.createConfiguration();
conf.setProperty(OozieClient.APP_PATH, wfDefinition);// setting workflow parameters
conf.setProperty("jobTracker", JOB_Tracker);
conf.setProperty("nameNode", NAMENode);
if((wfParameters != null) && (wfParameters.size() > 0)){
for(WorkflowParameter parameter : wfParameters)
conf.setProperty(parameter.getName(), parameter.getValue());
}
// submit and start the workflow job
return wc.run(conf);
}public Status getJobStatus(String jobID) throws OozieClientException{
WorkflowJob job = wc.getJobInfo(jobID);
return job.getStatus();
}public static void main(String[] args) throws OozieClientException, InterruptedException{
// Create client
WorkflowClient client = new WorkflowClient(OOZIE_URL);
// Create parameters
ListwfParameters = new LinkedList ();
WorkflowParameter drive = new WorkflowParameter("driveID","729-pp00004-2010-09-01-09-46");
WorkflowParameter lidar = new WorkflowParameter("lidarChunk","4");
WorkflowParameter signage = new WorkflowParameter("signageChunk","4");
wfParameters.add(drive);
wfParameters.add(lidar);
wfParameters.add(signage);
// Start Oozing
String jobId = client.startJob(JOB_PATH, wfParameters);
Status status = client.getJobStatus(jobId);
if(status == Status.RUNNING)
System.out.println("Workflow job running");
else
System.out.println("Problem starting Workflow job");
}
}
Listing 5: 単純なOozieのJavaクライアントの例
まず、ワークフロークライアントがOozieサーバのURLを使って初期化されます。そして、初期化が成功すると、このクライアントを使ってジョブのスタート(startJobメソッド)、実行中のジョブのステータスの取得(getStatusメソッド)やそのほかの操作が可能になります。
Javaアクションの作成、ワークフローへのパラメータの受け渡し
これまでの例では<arg>を使ってJavaノードへのパラメータの受け渡しを説明してきました。独自の計算処理をOozieに追加する場合、主に使われるのがJavaノードであることを考慮すると、JavaノードからOozieへ値を渡すことも重要です。
Javaノードのドキュメントによれば、“capture-output”エレメントを使うとJavaノードからOozieのコンテキストに値を渡すことができます。これらの値はワークフロー内の他のステップからEL関数を使ってアクセスできます。戻り値はJavaのプロパティファイルの形式で書き出す必要があります。このファイルの名前は“JavaMainMapper.OOZIE_JAVA_MAIN_CAPTURE_OUTPUT_FILE”定数で定義されているシステムプロパティから取得できます。 下記に簡単な例を示します。
package com.navteq.oozie; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.Properties;public class GenerateLookupDirs {
/**
* @param args
*/
public static final long dayMillis = 1000 * 60 * 60 * 24;
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";public static void main(String[] args) throws Exception {
Calendar curDate = new GregorianCalendar();
int year, month, date;
String propKey, propVal;String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
if (oozieProp != null) {
File propFile = new File(oozieProp);
Properties props = new Properties();for (int i = 0; I < 8; ++i)
{
year = curDate.get(Calendar.YEAR);
month = curDate.get(Calendar.MONTH) + 1;
date = curDate.get(Calendar.DATE);
propKey = "dir"+i;
propVal = year + "-" +
(month < 10 ? "0" + month : month) + "-" +
(date < 10 ? "0" + date : date);
props.setProperty(propKey, propVal);
curDate.setTimeInMillis(curDate.getTimeInMillis() - dayMillis);
}
OutputStream os = new FileOutputStream(propFile);
props.store(os, "");
os.close();
} else
throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
+ " System property not defined");
}
}
Listing 6:Oozieにパラメータを渡す
この例ではHDFSに各日付のデレクトリがあると仮定しています。このクラスは現在の日付を取得して、直近7日間(今日を含む)の日付を計算し、デレクトリの名前をOozieに渡しています。
結論
この記事ではHadoop向けワークフローエンジンであるOozieを紹介し、いくつかの簡単なサンプルを提示しました。次の記事では複雑な利用例を紹介し、Oozieの機能をさらに紹介したいと思います。
謝辞
Navteqの同僚であるいくつかの使用例を提供してくれました。感謝いたします。
著者について
Boris LublinskyはNAVTEQのプリンシパルアーキテクト。大規模データのマネジメントと処理についてのアーキテクチャの定義、SOA、NAVTEQの様々な製品の実装に携わっています。また、InfoQのSOAの編集者であり、OASISのSOA RAワーキンググループにも参加しています。講演、執筆活動も行っています。最新の著書は"Applied SOA"です。
Michael Segelは20年以上顧客の問題の発見と解決に努め、様々な業界で様々な役割を果たしてきました。独立したコンサルタントであり、解決すべき取り組みがいのある問題をいつも探しています。オハイオ州立大学のソフトウエアエンジニアリングの学位を持っています。
[1]エッジノードとはHadoopのクラスタではなく、Hadoopのライブラリをインストールしたマシンのことです。このマシンはクラスタに接続しクラスタに直接アクセスする補助的なサービスやアプリケーションをホストすることができます。
[2] Oozieインストールにはこのリンク先を参考にしてください。
[3]詳細はこの記事とは無関係なのでここでは扱いません。
[4]Oozie上でMap/Reduceジョブを実装するには2つの方法があります。Map/Reduceのアクション[2]としてMapperクラスとReducerクラスを定義する方法と構成ファイルやjavaアクション[3]でHadoop APIを使ってMap/Reduceを開始するクラスを定義する方法です。私たちはHadoop APIを使い、さらに機能を追加したJavaのコードを既に持っていますので、ふたつ目の方法を採用しています。
[5]Oozieはこの2つのアクションが並列でJobトラッカーに提供します。実際の並列実行はOozieの制御外で、ジョブの要求やインストールしたMap/Reduceのクラスタのやスケジューラの能力に依存します。
[6]結合アクションの役割は分岐アクションによって開始されたマルチスレッドの並列処理の同期を行うことです。分岐が適切に完了すれば、結合アクションは分岐したすべての処理が終了するのを待ちます。スレッドの実行が失敗した場合は停止ノードが実行している残りのスレッドを“kill”します。
[7]このノードはOozieをインストールしたマシンでなくてもかまいません。
[8]Oozieのジョブのログにはワークフローの実行に関する情報が詳細に出力されますが、アクションの実行の詳細を確認するにはHadoop Map/Reduce管理ページに切り替える必要があります。