001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapred;
019
020 import java.io.FileNotFoundException;
021 import java.io.IOException;
022 import java.net.InetSocketAddress;
023 import java.net.URL;
024 import java.security.PrivilegedExceptionAction;
025 import java.util.ArrayList;
026 import java.util.Collection;
027 import java.util.List;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.hadoop.fs.FileStatus;
033 import org.apache.hadoop.fs.FileSystem;
034 import org.apache.hadoop.fs.Path;
035 import org.apache.hadoop.io.Text;
036 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
037 import org.apache.hadoop.mapreduce.Cluster;
038 import org.apache.hadoop.mapreduce.ClusterMetrics;
039 import org.apache.hadoop.mapreduce.Job;
040 import org.apache.hadoop.mapreduce.QueueInfo;
041 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
042 import org.apache.hadoop.mapreduce.TaskType;
043 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
044 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
045 import org.apache.hadoop.mapreduce.tools.CLI;
046 import org.apache.hadoop.mapreduce.util.ConfigUtil;
047 import org.apache.hadoop.security.UserGroupInformation;
048 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
049 import org.apache.hadoop.security.token.Token;
050 import org.apache.hadoop.security.token.TokenRenewer;
051 import org.apache.hadoop.util.Tool;
052 import org.apache.hadoop.util.ToolRunner;
053
054 /**
055 * <code>JobClient</code> is the primary interface for the user-job to interact
056 * with the cluster.
057 *
058 * <code>JobClient</code> provides facilities to submit jobs, track their
059 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
060 * status information etc.
061 *
062 * <p>The job submission process involves:
063 * <ol>
064 * <li>
065 * Checking the input and output specifications of the job.
066 * </li>
067 * <li>
068 * Computing the {@link InputSplit}s for the job.
069 * </li>
070 * <li>
071 * Setup the requisite accounting information for the {@link DistributedCache}
072 * of the job, if necessary.
073 * </li>
074 * <li>
075 * Copying the job's jar and configuration to the map-reduce system directory
076 * on the distributed file-system.
077 * </li>
078 * <li>
079 * Submitting the job to the cluster and optionally monitoring
080 * it's status.
081 * </li>
082 * </ol></p>
083 *
084 * Normally the user creates the application, describes various facets of the
085 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit
086 * the job and monitor its progress.
087 *
088 * <p>Here is an example on how to use <code>JobClient</code>:</p>
089 * <p><blockquote><pre>
090 * // Create a new JobConf
091 * JobConf job = new JobConf(new Configuration(), MyJob.class);
092 *
093 * // Specify various job-specific parameters
094 * job.setJobName("myjob");
095 *
096 * job.setInputPath(new Path("in"));
097 * job.setOutputPath(new Path("out"));
098 *
099 * job.setMapperClass(MyJob.MyMapper.class);
100 * job.setReducerClass(MyJob.MyReducer.class);
101 *
102 * // Submit the job, then poll for progress until the job is complete
103 * JobClient.runJob(job);
104 * </pre></blockquote></p>
105 *
106 * <h4 id="JobControl">Job Control</h4>
107 *
108 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks
109 * which cannot be done via a single map-reduce job. This is fairly easy since
110 * the output of the job, typically, goes to distributed file-system and that
111 * can be used as the input for the next job.</p>
112 *
113 * <p>However, this also means that the onus on ensuring jobs are complete
114 * (success/failure) lies squarely on the clients. In such situations the
115 * various job-control options are:
116 * <ol>
117 * <li>
118 * {@link #runJob(JobConf)} : submits the job and returns only after
119 * the job has completed.
120 * </li>
121 * <li>
122 * {@link #submitJob(JobConf)} : only submits the job, then poll the
123 * returned handle to the {@link RunningJob} to query status and make
124 * scheduling decisions.
125 * </li>
126 * <li>
127 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
128 * on job-completion, thus avoiding polling.
129 * </li>
130 * </ol></p>
131 *
132 * @see JobConf
133 * @see ClusterStatus
134 * @see Tool
135 * @see DistributedCache
136 */
137 @InterfaceAudience.Public
138 @InterfaceStability.Stable
139 public class JobClient extends CLI {
140
141 @InterfaceAudience.Private
142 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
143 "mapreduce.jobclient.retry.policy.enabled";
144 @InterfaceAudience.Private
145 public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
146 false;
147 @InterfaceAudience.Private
148 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
149 "mapreduce.jobclient.retry.policy.spec";
150 @InterfaceAudience.Private
151 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
152 "10000,6,60000,10"; // t1,n1,t2,n2,...
153
154 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
155 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
156
157 static{
158 ConfigUtil.loadResources();
159 }
160
161 /**
162 * A NetworkedJob is an implementation of RunningJob. It holds
163 * a JobProfile object to provide some info, and interacts with the
164 * remote service to provide certain functionality.
165 */
166 static class NetworkedJob implements RunningJob {
167 Job job;
168 /**
169 * We store a JobProfile and a timestamp for when we last
170 * acquired the job profile. If the job is null, then we cannot
171 * perform any of the tasks. The job might be null if the cluster
172 * has completely forgotten about the job. (eg, 24 hours after the
173 * job completes.)
174 */
175 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
176 this(status, cluster, new JobConf(status.getJobFile()));
177 }
178
179 private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf)
180 throws IOException {
181 this(Job.getInstance(cluster, status, conf));
182 }
183
184 public NetworkedJob(Job job) throws IOException {
185 this.job = job;
186 }
187
188 public Configuration getConfiguration() {
189 return job.getConfiguration();
190 }
191
192 /**
193 * An identifier for the job
194 */
195 public JobID getID() {
196 return JobID.downgrade(job.getJobID());
197 }
198
199 /** @deprecated This method is deprecated and will be removed. Applications should
200 * rather use {@link #getID()}.*/
201 @Deprecated
202 public String getJobID() {
203 return getID().toString();
204 }
205
206 /**
207 * The user-specified job name
208 */
209 public String getJobName() {
210 return job.getJobName();
211 }
212
213 /**
214 * The name of the job file
215 */
216 public String getJobFile() {
217 return job.getJobFile();
218 }
219
220 /**
221 * A URL where the job's status can be seen
222 */
223 public String getTrackingURL() {
224 return job.getTrackingURL();
225 }
226
227 /**
228 * A float between 0.0 and 1.0, indicating the % of map work
229 * completed.
230 */
231 public float mapProgress() throws IOException {
232 return job.mapProgress();
233 }
234
235 /**
236 * A float between 0.0 and 1.0, indicating the % of reduce work
237 * completed.
238 */
239 public float reduceProgress() throws IOException {
240 return job.reduceProgress();
241 }
242
243 /**
244 * A float between 0.0 and 1.0, indicating the % of cleanup work
245 * completed.
246 */
247 public float cleanupProgress() throws IOException {
248 try {
249 return job.cleanupProgress();
250 } catch (InterruptedException ie) {
251 throw new IOException(ie);
252 }
253 }
254
255 /**
256 * A float between 0.0 and 1.0, indicating the % of setup work
257 * completed.
258 */
259 public float setupProgress() throws IOException {
260 return job.setupProgress();
261 }
262
263 /**
264 * Returns immediately whether the whole job is done yet or not.
265 */
266 public synchronized boolean isComplete() throws IOException {
267 return job.isComplete();
268 }
269
270 /**
271 * True iff job completed successfully.
272 */
273 public synchronized boolean isSuccessful() throws IOException {
274 return job.isSuccessful();
275 }
276
277 /**
278 * Blocks until the job is finished
279 */
280 public void waitForCompletion() throws IOException {
281 try {
282 job.waitForCompletion(false);
283 } catch (InterruptedException ie) {
284 throw new IOException(ie);
285 } catch (ClassNotFoundException ce) {
286 throw new IOException(ce);
287 }
288 }
289
290 /**
291 * Tells the service to get the state of the current job.
292 */
293 public synchronized int getJobState() throws IOException {
294 try {
295 return job.getJobState().getValue();
296 } catch (InterruptedException ie) {
297 throw new IOException(ie);
298 }
299 }
300
301 /**
302 * Tells the service to terminate the current job.
303 */
304 public synchronized void killJob() throws IOException {
305 job.killJob();
306 }
307
308
309 /** Set the priority of the job.
310 * @param priority new priority of the job.
311 */
312 public synchronized void setJobPriority(String priority)
313 throws IOException {
314 try {
315 job.setPriority(
316 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
317 } catch (InterruptedException ie) {
318 throw new IOException(ie);
319 }
320 }
321
322 /**
323 * Kill indicated task attempt.
324 * @param taskId the id of the task to kill.
325 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
326 * it is just killed, w/o affecting job failure status.
327 */
328 public synchronized void killTask(TaskAttemptID taskId,
329 boolean shouldFail) throws IOException {
330 if (shouldFail) {
331 job.failTask(taskId);
332 } else {
333 job.killTask(taskId);
334 }
335 }
336
337 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
338 @Deprecated
339 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
340 killTask(TaskAttemptID.forName(taskId), shouldFail);
341 }
342
343 /**
344 * Fetch task completion events from cluster for this job.
345 */
346 public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
347 int startFrom) throws IOException {
348 try {
349 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls =
350 job.getTaskCompletionEvents(startFrom, 10);
351 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
352 for (int i = 0 ; i < acls.length; i++ ) {
353 ret[i] = TaskCompletionEvent.downgrade(acls[i]);
354 }
355 return ret;
356 } catch (InterruptedException ie) {
357 throw new IOException(ie);
358 }
359 }
360
361 /**
362 * Dump stats to screen
363 */
364 @Override
365 public String toString() {
366 return job.toString();
367 }
368
369 /**
370 * Returns the counters for this job
371 */
372 public Counters getCounters() throws IOException {
373 Counters result = null;
374 org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
375 if(temp != null) {
376 result = Counters.downgrade(temp);
377 }
378 return result;
379 }
380
381 @Override
382 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
383 try {
384 return job.getTaskDiagnostics(id);
385 } catch (InterruptedException ie) {
386 throw new IOException(ie);
387 }
388 }
389
390 public String getHistoryUrl() throws IOException {
391 try {
392 return job.getHistoryUrl();
393 } catch (InterruptedException ie) {
394 throw new IOException(ie);
395 }
396 }
397
398 public boolean isRetired() throws IOException {
399 try {
400 return job.isRetired();
401 } catch (InterruptedException ie) {
402 throw new IOException(ie);
403 }
404 }
405
406 boolean monitorAndPrintJob() throws IOException, InterruptedException {
407 return job.monitorAndPrintJob();
408 }
409
410 @Override
411 public String getFailureInfo() throws IOException {
412 try {
413 return job.getStatus().getFailureInfo();
414 } catch (InterruptedException ie) {
415 throw new IOException(ie);
416 }
417 }
418
419 @Override
420 public JobStatus getJobStatus() throws IOException {
421 try {
422 return JobStatus.downgrade(job.getStatus());
423 } catch (InterruptedException ie) {
424 throw new IOException(ie);
425 }
426 }
427 }
428
429 /**
430 * Ugi of the client. We store this ugi when the client is created and
431 * then make sure that the same ugi is used to run the various protocols.
432 */
433 UserGroupInformation clientUgi;
434
435 /**
436 * Create a job client.
437 */
438 public JobClient() {
439 }
440
441 /**
442 * Build a job client with the given {@link JobConf}, and connect to the
443 * default cluster
444 *
445 * @param conf the job configuration.
446 * @throws IOException
447 */
448 public JobClient(JobConf conf) throws IOException {
449 init(conf);
450 }
451
452 /**
453 * Build a job client with the given {@link Configuration},
454 * and connect to the default cluster
455 *
456 * @param conf the configuration.
457 * @throws IOException
458 */
459 public JobClient(Configuration conf) throws IOException {
460 init(new JobConf(conf));
461 }
462
463 /**
464 * Connect to the default cluster
465 * @param conf the job configuration.
466 * @throws IOException
467 */
468 public void init(JobConf conf) throws IOException {
469 setConf(conf);
470 cluster = new Cluster(conf);
471 clientUgi = UserGroupInformation.getCurrentUser();
472 }
473
474 /**
475 * Build a job client, connect to the indicated job tracker.
476 *
477 * @param jobTrackAddr the job tracker to connect to.
478 * @param conf configuration.
479 */
480 public JobClient(InetSocketAddress jobTrackAddr,
481 Configuration conf) throws IOException {
482 cluster = new Cluster(jobTrackAddr, conf);
483 clientUgi = UserGroupInformation.getCurrentUser();
484 }
485
486 /**
487 * Close the <code>JobClient</code>.
488 */
489 public synchronized void close() throws IOException {
490 cluster.close();
491 }
492
493 /**
494 * Get a filesystem handle. We need this to prepare jobs
495 * for submission to the MapReduce system.
496 *
497 * @return the filesystem handle.
498 */
499 public synchronized FileSystem getFs() throws IOException {
500 try {
501 return cluster.getFileSystem();
502 } catch (InterruptedException ie) {
503 throw new IOException(ie);
504 }
505 }
506
507 /**
508 * Get a handle to the Cluster
509 */
510 public Cluster getClusterHandle() {
511 return cluster;
512 }
513
514 /**
515 * Submit a job to the MR system.
516 *
517 * This returns a handle to the {@link RunningJob} which can be used to track
518 * the running-job.
519 *
520 * @param jobFile the job configuration.
521 * @return a handle to the {@link RunningJob} which can be used to track the
522 * running-job.
523 * @throws FileNotFoundException
524 * @throws InvalidJobConfException
525 * @throws IOException
526 */
527 public RunningJob submitJob(String jobFile) throws FileNotFoundException,
528 InvalidJobConfException,
529 IOException {
530 // Load in the submitted job details
531 JobConf job = new JobConf(jobFile);
532 return submitJob(job);
533 }
534
535 /**
536 * Submit a job to the MR system.
537 * This returns a handle to the {@link RunningJob} which can be used to track
538 * the running-job.
539 *
540 * @param conf the job configuration.
541 * @return a handle to the {@link RunningJob} which can be used to track the
542 * running-job.
543 * @throws FileNotFoundException
544 * @throws IOException
545 */
546 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
547 IOException {
548 return submitJobInternal(conf);
549 }
550
551 @InterfaceAudience.Private
552 public RunningJob submitJobInternal(final JobConf conf)
553 throws FileNotFoundException, IOException {
554 try {
555 conf.setBooleanIfUnset("mapred.mapper.new-api", false);
556 conf.setBooleanIfUnset("mapred.reducer.new-api", false);
557 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
558 @Override
559 public Job run() throws IOException, ClassNotFoundException,
560 InterruptedException {
561 Job job = Job.getInstance(conf);
562 job.submit();
563 return job;
564 }
565 });
566 // update our Cluster instance with the one created by Job for submission
567 // (we can't pass our Cluster instance to Job, since Job wraps the config
568 // instance, and the two configs would then diverge)
569 cluster = job.getCluster();
570 return new NetworkedJob(job);
571 } catch (InterruptedException ie) {
572 throw new IOException("interrupted", ie);
573 }
574 }
575
576 private Job getJobUsingCluster(final JobID jobid) throws IOException,
577 InterruptedException {
578 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
579 public Job run() throws IOException, InterruptedException {
580 return cluster.getJob(jobid);
581 }
582 });
583 }
584 /**
585 * Get an {@link RunningJob} object to track an ongoing job. Returns
586 * null if the id does not correspond to any known job.
587 *
588 * @param jobid the jobid of the job.
589 * @return the {@link RunningJob} handle to track the job, null if the
590 * <code>jobid</code> doesn't correspond to any known job.
591 * @throws IOException
592 */
593 public RunningJob getJob(final JobID jobid) throws IOException {
594 try {
595
596 Job job = getJobUsingCluster(jobid);
597 if (job != null) {
598 JobStatus status = JobStatus.downgrade(job.getStatus());
599 if (status != null) {
600 return new NetworkedJob(status, cluster,
601 new JobConf(job.getConfiguration()));
602 }
603 }
604 } catch (InterruptedException ie) {
605 throw new IOException(ie);
606 }
607 return null;
608 }
609
610 /**@deprecated Applications should rather use {@link #getJob(JobID)}.
611 */
612 @Deprecated
613 public RunningJob getJob(String jobid) throws IOException {
614 return getJob(JobID.forName(jobid));
615 }
616
617 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
618
619 /**
620 * Get the information of the current state of the map tasks of a job.
621 *
622 * @param jobId the job to query.
623 * @return the list of all of the map tips.
624 * @throws IOException
625 */
626 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
627 return getTaskReports(jobId, TaskType.MAP);
628 }
629
630 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws
631 IOException {
632 try {
633 Job j = getJobUsingCluster(jobId);
634 if(j == null) {
635 return EMPTY_TASK_REPORTS;
636 }
637 return TaskReport.downgradeArray(j.getTaskReports(type));
638 } catch (InterruptedException ie) {
639 throw new IOException(ie);
640 }
641 }
642
643 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
644 @Deprecated
645 public TaskReport[] getMapTaskReports(String jobId) throws IOException {
646 return getMapTaskReports(JobID.forName(jobId));
647 }
648
649 /**
650 * Get the information of the current state of the reduce tasks of a job.
651 *
652 * @param jobId the job to query.
653 * @return the list of all of the reduce tips.
654 * @throws IOException
655 */
656 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
657 return getTaskReports(jobId, TaskType.REDUCE);
658 }
659
660 /**
661 * Get the information of the current state of the cleanup tasks of a job.
662 *
663 * @param jobId the job to query.
664 * @return the list of all of the cleanup tips.
665 * @throws IOException
666 */
667 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
668 return getTaskReports(jobId, TaskType.JOB_CLEANUP);
669 }
670
671 /**
672 * Get the information of the current state of the setup tasks of a job.
673 *
674 * @param jobId the job to query.
675 * @return the list of all of the setup tips.
676 * @throws IOException
677 */
678 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
679 return getTaskReports(jobId, TaskType.JOB_SETUP);
680 }
681
682
683 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
684 @Deprecated
685 public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
686 return getReduceTaskReports(JobID.forName(jobId));
687 }
688
689 /**
690 * Display the information about a job's tasks, of a particular type and
691 * in a particular state
692 *
693 * @param jobId the ID of the job
694 * @param type the type of the task (map/reduce/setup/cleanup)
695 * @param state the state of the task
696 * (pending/running/completed/failed/killed)
697 */
698 public void displayTasks(final JobID jobId, String type, String state)
699 throws IOException {
700 try {
701 Job job = getJobUsingCluster(jobId);
702 super.displayTasks(job, type, state);
703 } catch (InterruptedException ie) {
704 throw new IOException(ie);
705 }
706 }
707
708 /**
709 * Get status information about the Map-Reduce cluster.
710 *
711 * @return the status information about the Map-Reduce cluster as an object
712 * of {@link ClusterStatus}.
713 * @throws IOException
714 */
715 public ClusterStatus getClusterStatus() throws IOException {
716 try {
717 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
718 public ClusterStatus run() throws IOException, InterruptedException {
719 ClusterMetrics metrics = cluster.getClusterStatus();
720 return new ClusterStatus(metrics.getTaskTrackerCount(), metrics
721 .getBlackListedTaskTrackerCount(), cluster
722 .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
723 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
724 metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(),
725 metrics.getDecommissionedTaskTrackerCount(), metrics
726 .getGrayListedTaskTrackerCount());
727 }
728 });
729 } catch (InterruptedException ie) {
730 throw new IOException(ie);
731 }
732 }
733
734 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
735 Collection<String> list = new ArrayList<String>();
736 for (TaskTrackerInfo info: objs) {
737 list.add(info.getTaskTrackerName());
738 }
739 return list;
740 }
741
742 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
743 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
744 for (TaskTrackerInfo info: objs) {
745 BlackListInfo binfo = new BlackListInfo();
746 binfo.setTrackerName(info.getTaskTrackerName());
747 binfo.setReasonForBlackListing(info.getReasonForBlacklist());
748 binfo.setBlackListReport(info.getBlacklistReport());
749 list.add(binfo);
750 }
751 return list;
752 }
753
754 /**
755 * Get status information about the Map-Reduce cluster.
756 *
757 * @param detailed if true then get a detailed status including the
758 * tracker names
759 * @return the status information about the Map-Reduce cluster as an object
760 * of {@link ClusterStatus}.
761 * @throws IOException
762 */
763 public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
764 try {
765 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
766 public ClusterStatus run() throws IOException, InterruptedException {
767 ClusterMetrics metrics = cluster.getClusterStatus();
768 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
769 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
770 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
771 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
772 metrics.getReduceSlotCapacity(),
773 cluster.getJobTrackerStatus());
774 }
775 });
776 } catch (InterruptedException ie) {
777 throw new IOException(ie);
778 }
779 }
780
781
782 /**
783 * Get the jobs that are not completed and not failed.
784 *
785 * @return array of {@link JobStatus} for the running/to-be-run jobs.
786 * @throws IOException
787 */
788 public JobStatus[] jobsToComplete() throws IOException {
789 List<JobStatus> stats = new ArrayList<JobStatus>();
790 for (JobStatus stat : getAllJobs()) {
791 if (!stat.isJobComplete()) {
792 stats.add(stat);
793 }
794 }
795 return stats.toArray(new JobStatus[0]);
796 }
797
798 /**
799 * Get the jobs that are submitted.
800 *
801 * @return array of {@link JobStatus} for the submitted jobs.
802 * @throws IOException
803 */
804 public JobStatus[] getAllJobs() throws IOException {
805 try {
806 org.apache.hadoop.mapreduce.JobStatus[] jobs =
807 clientUgi.doAs(new PrivilegedExceptionAction<
808 org.apache.hadoop.mapreduce.JobStatus[]> () {
809 public org.apache.hadoop.mapreduce.JobStatus[] run()
810 throws IOException, InterruptedException {
811 return cluster.getAllJobStatuses();
812 }
813 });
814 JobStatus[] stats = new JobStatus[jobs.length];
815 for (int i = 0; i < jobs.length; i++) {
816 stats[i] = JobStatus.downgrade(jobs[i]);
817 }
818 return stats;
819 } catch (InterruptedException ie) {
820 throw new IOException(ie);
821 }
822 }
823
824 /**
825 * Utility that submits a job, then polls for progress until the job is
826 * complete.
827 *
828 * @param job the job configuration.
829 * @throws IOException if the job fails
830 */
831 public static RunningJob runJob(JobConf job) throws IOException {
832 JobClient jc = new JobClient(job);
833 RunningJob rj = jc.submitJob(job);
834 try {
835 if (!jc.monitorAndPrintJob(job, rj)) {
836 throw new IOException("Job failed!");
837 }
838 } catch (InterruptedException ie) {
839 Thread.currentThread().interrupt();
840 }
841 return rj;
842 }
843
844 /**
845 * Monitor a job and print status in real-time as progress is made and tasks
846 * fail.
847 * @param conf the job's configuration
848 * @param job the job to track
849 * @return true if the job succeeded
850 * @throws IOException if communication to the JobTracker fails
851 */
852 public boolean monitorAndPrintJob(JobConf conf,
853 RunningJob job
854 ) throws IOException, InterruptedException {
855 return ((NetworkedJob)job).monitorAndPrintJob();
856 }
857
858 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
859 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
860 }
861
862 static Configuration getConfiguration(String jobTrackerSpec)
863 {
864 Configuration conf = new Configuration();
865 if (jobTrackerSpec != null) {
866 if (jobTrackerSpec.indexOf(":") >= 0) {
867 conf.set("mapred.job.tracker", jobTrackerSpec);
868 } else {
869 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
870 URL validate = conf.getResource(classpathFile);
871 if (validate == null) {
872 throw new RuntimeException(classpathFile + " not found on CLASSPATH");
873 }
874 conf.addResource(classpathFile);
875 }
876 }
877 return conf;
878 }
879
880 /**
881 * Sets the output filter for tasks. only those tasks are printed whose
882 * output matches the filter.
883 * @param newValue task filter.
884 */
885 @Deprecated
886 public void setTaskOutputFilter(TaskStatusFilter newValue){
887 this.taskOutputFilter = newValue;
888 }
889
890 /**
891 * Get the task output filter out of the JobConf.
892 *
893 * @param job the JobConf to examine.
894 * @return the filter level.
895 */
896 public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
897 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter",
898 "FAILED"));
899 }
900
901 /**
902 * Modify the JobConf to set the task output filter.
903 *
904 * @param job the JobConf to modify.
905 * @param newValue the value to set.
906 */
907 public static void setTaskOutputFilter(JobConf job,
908 TaskStatusFilter newValue) {
909 job.set("jobclient.output.filter", newValue.toString());
910 }
911
912 /**
913 * Returns task output filter.
914 * @return task filter.
915 */
916 @Deprecated
917 public TaskStatusFilter getTaskOutputFilter(){
918 return this.taskOutputFilter;
919 }
920
921 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
922 String counterGroupName, String counterName) throws IOException {
923 Counters counters = Counters.downgrade(cntrs);
924 return counters.findCounter(counterGroupName, counterName).getValue();
925 }
926
927 /**
928 * Get status information about the max available Maps in the cluster.
929 *
930 * @return the max available Maps in the cluster
931 * @throws IOException
932 */
933 public int getDefaultMaps() throws IOException {
934 try {
935 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
936 @Override
937 public Integer run() throws IOException, InterruptedException {
938 return cluster.getClusterStatus().getMapSlotCapacity();
939 }
940 });
941 } catch (InterruptedException ie) {
942 throw new IOException(ie);
943 }
944 }
945
946 /**
947 * Get status information about the max available Reduces in the cluster.
948 *
949 * @return the max available Reduces in the cluster
950 * @throws IOException
951 */
952 public int getDefaultReduces() throws IOException {
953 try {
954 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
955 @Override
956 public Integer run() throws IOException, InterruptedException {
957 return cluster.getClusterStatus().getReduceSlotCapacity();
958 }
959 });
960 } catch (InterruptedException ie) {
961 throw new IOException(ie);
962 }
963 }
964
965 /**
966 * Grab the jobtracker system directory path where job-specific files are to be placed.
967 *
968 * @return the system directory where job-specific files are to be placed.
969 */
970 public Path getSystemDir() {
971 try {
972 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
973 @Override
974 public Path run() throws IOException, InterruptedException {
975 return cluster.getSystemDir();
976 }
977 });
978 } catch (IOException ioe) {
979 return null;
980 } catch (InterruptedException ie) {
981 return null;
982 }
983 }
984
985 /**
986 * Checks if the job directory is clean and has all the required components
987 * for (re) starting the job
988 */
989 public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
990 throws IOException {
991 FileStatus[] contents = fs.listStatus(jobDirPath);
992 int matchCount = 0;
993 if (contents != null && contents.length >= 2) {
994 for (FileStatus status : contents) {
995 if ("job.xml".equals(status.getPath().getName())) {
996 ++matchCount;
997 }
998 if ("job.split".equals(status.getPath().getName())) {
999 ++matchCount;
1000 }
1001 }
1002 if (matchCount == 2) {
1003 return true;
1004 }
1005 }
1006 return false;
1007 }
1008
1009 /**
1010 * Fetch the staging area directory for the application
1011 *
1012 * @return path to staging area directory
1013 * @throws IOException
1014 */
1015 public Path getStagingAreaDir() throws IOException {
1016 try {
1017 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
1018 @Override
1019 public Path run() throws IOException, InterruptedException {
1020 return cluster.getStagingAreaDir();
1021 }
1022 });
1023 } catch (InterruptedException ie) {
1024 // throw RuntimeException instead for compatibility reasons
1025 throw new RuntimeException(ie);
1026 }
1027 }
1028
1029 private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
1030 JobQueueInfo ret = new JobQueueInfo(queue);
1031 // make sure to convert any children
1032 if (queue.getQueueChildren().size() > 0) {
1033 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
1034 .getQueueChildren().size());
1035 for (QueueInfo child : queue.getQueueChildren()) {
1036 childQueues.add(getJobQueueInfo(child));
1037 }
1038 ret.setChildren(childQueues);
1039 }
1040 return ret;
1041 }
1042
1043 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1044 throws IOException {
1045 JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1046 for (int i = 0; i < queues.length; i++) {
1047 ret[i] = getJobQueueInfo(queues[i]);
1048 }
1049 return ret;
1050 }
1051
1052 /**
1053 * Returns an array of queue information objects about root level queues
1054 * configured
1055 *
1056 * @return the array of root level JobQueueInfo objects
1057 * @throws IOException
1058 */
1059 public JobQueueInfo[] getRootQueues() throws IOException {
1060 try {
1061 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1062 public JobQueueInfo[] run() throws IOException, InterruptedException {
1063 return getJobQueueInfoArray(cluster.getRootQueues());
1064 }
1065 });
1066 } catch (InterruptedException ie) {
1067 throw new IOException(ie);
1068 }
1069 }
1070
1071 /**
1072 * Returns an array of queue information objects about immediate children
1073 * of queue queueName.
1074 *
1075 * @param queueName
1076 * @return the array of immediate children JobQueueInfo objects
1077 * @throws IOException
1078 */
1079 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1080 try {
1081 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1082 public JobQueueInfo[] run() throws IOException, InterruptedException {
1083 return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1084 }
1085 });
1086 } catch (InterruptedException ie) {
1087 throw new IOException(ie);
1088 }
1089 }
1090
1091 /**
1092 * Return an array of queue information objects about all the Job Queues
1093 * configured.
1094 *
1095 * @return Array of JobQueueInfo objects
1096 * @throws IOException
1097 */
1098 public JobQueueInfo[] getQueues() throws IOException {
1099 try {
1100 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1101 public JobQueueInfo[] run() throws IOException, InterruptedException {
1102 return getJobQueueInfoArray(cluster.getQueues());
1103 }
1104 });
1105 } catch (InterruptedException ie) {
1106 throw new IOException(ie);
1107 }
1108 }
1109
1110 /**
1111 * Gets all the jobs which were added to particular Job Queue
1112 *
1113 * @param queueName name of the Job Queue
1114 * @return Array of jobs present in the job queue
1115 * @throws IOException
1116 */
1117
1118 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1119 try {
1120 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1121 @Override
1122 public QueueInfo run() throws IOException, InterruptedException {
1123 return cluster.getQueue(queueName);
1124 }
1125 });
1126 if (queue == null) {
1127 return null;
1128 }
1129 org.apache.hadoop.mapreduce.JobStatus[] stats =
1130 queue.getJobStatuses();
1131 JobStatus[] ret = new JobStatus[stats.length];
1132 for (int i = 0 ; i < stats.length; i++ ) {
1133 ret[i] = JobStatus.downgrade(stats[i]);
1134 }
1135 return ret;
1136 } catch (InterruptedException ie) {
1137 throw new IOException(ie);
1138 }
1139 }
1140
1141 /**
1142 * Gets the queue information associated to a particular Job Queue
1143 *
1144 * @param queueName name of the job queue.
1145 * @return Queue information associated to particular queue.
1146 * @throws IOException
1147 */
1148 public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1149 try {
1150 QueueInfo queueInfo = clientUgi.doAs(new
1151 PrivilegedExceptionAction<QueueInfo>() {
1152 public QueueInfo run() throws IOException, InterruptedException {
1153 return cluster.getQueue(queueName);
1154 }
1155 });
1156 if (queueInfo != null) {
1157 return new JobQueueInfo(queueInfo);
1158 }
1159 return null;
1160 } catch (InterruptedException ie) {
1161 throw new IOException(ie);
1162 }
1163 }
1164
1165 /**
1166 * Gets the Queue ACLs for current user
1167 * @return array of QueueAclsInfo object for current user.
1168 * @throws IOException
1169 */
1170 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1171 try {
1172 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls =
1173 clientUgi.doAs(new
1174 PrivilegedExceptionAction
1175 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1176 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run()
1177 throws IOException, InterruptedException {
1178 return cluster.getQueueAclsForCurrentUser();
1179 }
1180 });
1181 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1182 for (int i = 0 ; i < acls.length; i++ ) {
1183 ret[i] = QueueAclsInfo.downgrade(acls[i]);
1184 }
1185 return ret;
1186 } catch (InterruptedException ie) {
1187 throw new IOException(ie);
1188 }
1189 }
1190
1191 /**
1192 * Get a delegation token for the user from the JobTracker.
1193 * @param renewer the user who can renew the token
1194 * @return the new token
1195 * @throws IOException
1196 */
1197 public Token<DelegationTokenIdentifier>
1198 getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1199 return clientUgi.doAs(new
1200 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1201 public Token<DelegationTokenIdentifier> run() throws IOException,
1202 InterruptedException {
1203 return cluster.getDelegationToken(renewer);
1204 }
1205 });
1206 }
1207
1208 /**
1209 * Renew a delegation token
1210 * @param token the token to renew
1211 * @return true if the renewal went well
1212 * @throws InvalidToken
1213 * @throws IOException
1214 * @deprecated Use {@link Token#renew} instead
1215 */
1216 public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1217 ) throws InvalidToken, IOException,
1218 InterruptedException {
1219 return token.renew(getConf());
1220 }
1221
1222 /**
1223 * Cancel a delegation token from the JobTracker
1224 * @param token the token to cancel
1225 * @throws IOException
1226 * @deprecated Use {@link Token#cancel} instead
1227 */
1228 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1229 ) throws InvalidToken, IOException,
1230 InterruptedException {
1231 token.cancel(getConf());
1232 }
1233
1234 /**
1235 */
1236 public static void main(String argv[]) throws Exception {
1237 int res = ToolRunner.run(new JobClient(), argv);
1238 System.exit(res);
1239 }
1240 }
1241