001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.mapred;
019
020 import java.io.FileNotFoundException;
021 import java.io.IOException;
022 import java.net.InetSocketAddress;
023 import java.net.URL;
024 import java.security.PrivilegedExceptionAction;
025 import java.util.ArrayList;
026 import java.util.Collection;
027 import java.util.List;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.conf.Configuration;
032 import org.apache.hadoop.fs.FileSystem;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.io.Text;
035 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
036 import org.apache.hadoop.mapreduce.Cluster;
037 import org.apache.hadoop.mapreduce.ClusterMetrics;
038 import org.apache.hadoop.mapreduce.Job;
039 import org.apache.hadoop.mapreduce.QueueInfo;
040 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
041 import org.apache.hadoop.mapreduce.TaskType;
042 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
043 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
044 import org.apache.hadoop.mapreduce.tools.CLI;
045 import org.apache.hadoop.mapreduce.util.ConfigUtil;
046 import org.apache.hadoop.security.UserGroupInformation;
047 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
048 import org.apache.hadoop.security.token.Token;
049 import org.apache.hadoop.security.token.TokenRenewer;
050 import org.apache.hadoop.util.Tool;
051 import org.apache.hadoop.util.ToolRunner;
052
053 /**
054 * <code>JobClient</code> is the primary interface for the user-job to interact
055 * with the cluster.
056 *
057 * <code>JobClient</code> provides facilities to submit jobs, track their
058 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
059 * status information etc.
060 *
061 * <p>The job submission process involves:
062 * <ol>
063 * <li>
064 * Checking the input and output specifications of the job.
065 * </li>
066 * <li>
067 * Computing the {@link InputSplit}s for the job.
068 * </li>
069 * <li>
070 * Setup the requisite accounting information for the {@link DistributedCache}
071 * of the job, if necessary.
072 * </li>
073 * <li>
074 * Copying the job's jar and configuration to the map-reduce system directory
075 * on the distributed file-system.
076 * </li>
077 * <li>
078 * Submitting the job to the cluster and optionally monitoring
079 * it's status.
080 * </li>
081 * </ol></p>
082 *
083 * Normally the user creates the application, describes various facets of the
084 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit
085 * the job and monitor its progress.
086 *
087 * <p>Here is an example on how to use <code>JobClient</code>:</p>
088 * <p><blockquote><pre>
089 * // Create a new JobConf
090 * JobConf job = new JobConf(new Configuration(), MyJob.class);
091 *
092 * // Specify various job-specific parameters
093 * job.setJobName("myjob");
094 *
095 * job.setInputPath(new Path("in"));
096 * job.setOutputPath(new Path("out"));
097 *
098 * job.setMapperClass(MyJob.MyMapper.class);
099 * job.setReducerClass(MyJob.MyReducer.class);
100 *
101 * // Submit the job, then poll for progress until the job is complete
102 * JobClient.runJob(job);
103 * </pre></blockquote></p>
104 *
105 * <h4 id="JobControl">Job Control</h4>
106 *
107 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks
108 * which cannot be done via a single map-reduce job. This is fairly easy since
109 * the output of the job, typically, goes to distributed file-system and that
110 * can be used as the input for the next job.</p>
111 *
112 * <p>However, this also means that the onus on ensuring jobs are complete
113 * (success/failure) lies squarely on the clients. In such situations the
114 * various job-control options are:
115 * <ol>
116 * <li>
117 * {@link #runJob(JobConf)} : submits the job and returns only after
118 * the job has completed.
119 * </li>
120 * <li>
121 * {@link #submitJob(JobConf)} : only submits the job, then poll the
122 * returned handle to the {@link RunningJob} to query status and make
123 * scheduling decisions.
124 * </li>
125 * <li>
126 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
127 * on job-completion, thus avoiding polling.
128 * </li>
129 * </ol></p>
130 *
131 * @see JobConf
132 * @see ClusterStatus
133 * @see Tool
134 * @see DistributedCache
135 */
136 @InterfaceAudience.Public
137 @InterfaceStability.Stable
138 public class JobClient extends CLI {
139 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
140 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
141
142 static{
143 ConfigUtil.loadResources();
144 }
145
146 /**
147 * A NetworkedJob is an implementation of RunningJob. It holds
148 * a JobProfile object to provide some info, and interacts with the
149 * remote service to provide certain functionality.
150 */
151 static class NetworkedJob implements RunningJob {
152 Job job;
153 /**
154 * We store a JobProfile and a timestamp for when we last
155 * acquired the job profile. If the job is null, then we cannot
156 * perform any of the tasks. The job might be null if the cluster
157 * has completely forgotten about the job. (eg, 24 hours after the
158 * job completes.)
159 */
160 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException {
161 job = Job.getInstance(cluster, status, new JobConf(status.getJobFile()));
162 }
163
164 public NetworkedJob(Job job) throws IOException {
165 this.job = job;
166 }
167
168 public Configuration getConfiguration() {
169 return job.getConfiguration();
170 }
171
172 /**
173 * An identifier for the job
174 */
175 public JobID getID() {
176 return JobID.downgrade(job.getJobID());
177 }
178
179 /** @deprecated This method is deprecated and will be removed. Applications should
180 * rather use {@link #getID()}.*/
181 @Deprecated
182 public String getJobID() {
183 return getID().toString();
184 }
185
186 /**
187 * The user-specified job name
188 */
189 public String getJobName() {
190 return job.getJobName();
191 }
192
193 /**
194 * The name of the job file
195 */
196 public String getJobFile() {
197 return job.getJobFile();
198 }
199
200 /**
201 * A URL where the job's status can be seen
202 */
203 public String getTrackingURL() {
204 return job.getTrackingURL();
205 }
206
207 /**
208 * A float between 0.0 and 1.0, indicating the % of map work
209 * completed.
210 */
211 public float mapProgress() throws IOException {
212 try {
213 return job.mapProgress();
214 } catch (InterruptedException ie) {
215 throw new IOException(ie);
216 }
217 }
218
219 /**
220 * A float between 0.0 and 1.0, indicating the % of reduce work
221 * completed.
222 */
223 public float reduceProgress() throws IOException {
224 try {
225 return job.reduceProgress();
226 } catch (InterruptedException ie) {
227 throw new IOException(ie);
228 }
229 }
230
231 /**
232 * A float between 0.0 and 1.0, indicating the % of cleanup work
233 * completed.
234 */
235 public float cleanupProgress() throws IOException {
236 try {
237 return job.cleanupProgress();
238 } catch (InterruptedException ie) {
239 throw new IOException(ie);
240 }
241 }
242
243 /**
244 * A float between 0.0 and 1.0, indicating the % of setup work
245 * completed.
246 */
247 public float setupProgress() throws IOException {
248 try {
249 return job.setupProgress();
250 } catch (InterruptedException ie) {
251 throw new IOException(ie);
252 }
253 }
254
255 /**
256 * Returns immediately whether the whole job is done yet or not.
257 */
258 public synchronized boolean isComplete() throws IOException {
259 try {
260 return job.isComplete();
261 } catch (InterruptedException ie) {
262 throw new IOException(ie);
263 }
264 }
265
266 /**
267 * True iff job completed successfully.
268 */
269 public synchronized boolean isSuccessful() throws IOException {
270 try {
271 return job.isSuccessful();
272 } catch (InterruptedException ie) {
273 throw new IOException(ie);
274 }
275 }
276
277 /**
278 * Blocks until the job is finished
279 */
280 public void waitForCompletion() throws IOException {
281 try {
282 job.waitForCompletion(false);
283 } catch (InterruptedException ie) {
284 throw new IOException(ie);
285 } catch (ClassNotFoundException ce) {
286 throw new IOException(ce);
287 }
288 }
289
290 /**
291 * Tells the service to get the state of the current job.
292 */
293 public synchronized int getJobState() throws IOException {
294 try {
295 return job.getJobState().getValue();
296 } catch (InterruptedException ie) {
297 throw new IOException(ie);
298 }
299 }
300
301 /**
302 * Tells the service to terminate the current job.
303 */
304 public synchronized void killJob() throws IOException {
305 try {
306 job.killJob();
307 } catch (InterruptedException ie) {
308 throw new IOException(ie);
309 }
310 }
311
312
313 /** Set the priority of the job.
314 * @param priority new priority of the job.
315 */
316 public synchronized void setJobPriority(String priority)
317 throws IOException {
318 try {
319 job.setPriority(
320 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority));
321 } catch (InterruptedException ie) {
322 throw new IOException(ie);
323 }
324 }
325
326 /**
327 * Kill indicated task attempt.
328 * @param taskId the id of the task to kill.
329 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
330 * it is just killed, w/o affecting job failure status.
331 */
332 public synchronized void killTask(TaskAttemptID taskId,
333 boolean shouldFail) throws IOException {
334 try {
335 if (shouldFail) {
336 job.failTask(taskId);
337 } else {
338 job.killTask(taskId);
339 }
340 } catch (InterruptedException ie) {
341 throw new IOException(ie);
342 }
343 }
344
345 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
346 @Deprecated
347 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
348 killTask(TaskAttemptID.forName(taskId), shouldFail);
349 }
350
351 /**
352 * Fetch task completion events from cluster for this job.
353 */
354 public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
355 int startFrom) throws IOException {
356 try {
357 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls =
358 job.getTaskCompletionEvents(startFrom, 10);
359 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length];
360 for (int i = 0 ; i < acls.length; i++ ) {
361 ret[i] = TaskCompletionEvent.downgrade(acls[i]);
362 }
363 return ret;
364 } catch (InterruptedException ie) {
365 throw new IOException(ie);
366 }
367 }
368
369 /**
370 * Dump stats to screen
371 */
372 @Override
373 public String toString() {
374 return job.toString();
375 }
376
377 /**
378 * Returns the counters for this job
379 */
380 public Counters getCounters() throws IOException {
381 try {
382 Counters result = null;
383 org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
384 if(temp != null) {
385 result = Counters.downgrade(temp);
386 }
387 return result;
388 } catch (InterruptedException ie) {
389 throw new IOException(ie);
390 }
391 }
392
393 @Override
394 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
395 try {
396 return job.getTaskDiagnostics(id);
397 } catch (InterruptedException ie) {
398 throw new IOException(ie);
399 }
400 }
401
402 public String getHistoryUrl() throws IOException {
403 try {
404 return job.getHistoryUrl();
405 } catch (InterruptedException ie) {
406 throw new IOException(ie);
407 }
408 }
409
410 public boolean isRetired() throws IOException {
411 try {
412 return job.isRetired();
413 } catch (InterruptedException ie) {
414 throw new IOException(ie);
415 }
416 }
417
418 boolean monitorAndPrintJob() throws IOException, InterruptedException {
419 return job.monitorAndPrintJob();
420 }
421
422 @Override
423 public String getFailureInfo() throws IOException {
424 try {
425 return job.getStatus().getFailureInfo();
426 } catch (InterruptedException ie) {
427 throw new IOException(ie);
428 }
429 }
430
431 @Override
432 public JobStatus getJobStatus() throws IOException {
433 try {
434 return JobStatus.downgrade(job.getStatus());
435 } catch (InterruptedException ie) {
436 throw new IOException(ie);
437 }
438 }
439 }
440
441 /**
442 * Ugi of the client. We store this ugi when the client is created and
443 * then make sure that the same ugi is used to run the various protocols.
444 */
445 UserGroupInformation clientUgi;
446
447 /**
448 * Create a job client.
449 */
450 public JobClient() {
451 }
452
453 /**
454 * Build a job client with the given {@link JobConf}, and connect to the
455 * default cluster
456 *
457 * @param conf the job configuration.
458 * @throws IOException
459 */
460 public JobClient(JobConf conf) throws IOException {
461 init(conf);
462 }
463
464 /**
465 * Build a job client with the given {@link Configuration},
466 * and connect to the default cluster
467 *
468 * @param conf the configuration.
469 * @throws IOException
470 */
471 public JobClient(Configuration conf) throws IOException {
472 init(new JobConf(conf));
473 }
474
475 /**
476 * Connect to the default cluster
477 * @param conf the job configuration.
478 * @throws IOException
479 */
480 public void init(JobConf conf) throws IOException {
481 setConf(conf);
482 cluster = new Cluster(conf);
483 clientUgi = UserGroupInformation.getCurrentUser();
484 }
485
486 /**
487 * Build a job client, connect to the indicated job tracker.
488 *
489 * @param jobTrackAddr the job tracker to connect to.
490 * @param conf configuration.
491 */
492 public JobClient(InetSocketAddress jobTrackAddr,
493 Configuration conf) throws IOException {
494 cluster = new Cluster(jobTrackAddr, conf);
495 clientUgi = UserGroupInformation.getCurrentUser();
496 }
497
498 /**
499 * Close the <code>JobClient</code>.
500 */
501 public synchronized void close() throws IOException {
502 cluster.close();
503 }
504
505 /**
506 * Get a filesystem handle. We need this to prepare jobs
507 * for submission to the MapReduce system.
508 *
509 * @return the filesystem handle.
510 */
511 public synchronized FileSystem getFs() throws IOException {
512 try {
513 return cluster.getFileSystem();
514 } catch (InterruptedException ie) {
515 throw new IOException(ie);
516 }
517 }
518
519 /**
520 * Get a handle to the Cluster
521 */
522 public Cluster getClusterHandle() {
523 return cluster;
524 }
525
526 /**
527 * Submit a job to the MR system.
528 *
529 * This returns a handle to the {@link RunningJob} which can be used to track
530 * the running-job.
531 *
532 * @param jobFile the job configuration.
533 * @return a handle to the {@link RunningJob} which can be used to track the
534 * running-job.
535 * @throws FileNotFoundException
536 * @throws InvalidJobConfException
537 * @throws IOException
538 */
539 public RunningJob submitJob(String jobFile) throws FileNotFoundException,
540 InvalidJobConfException,
541 IOException {
542 // Load in the submitted job details
543 JobConf job = new JobConf(jobFile);
544 return submitJob(job);
545 }
546
547 /**
548 * Submit a job to the MR system.
549 * This returns a handle to the {@link RunningJob} which can be used to track
550 * the running-job.
551 *
552 * @param conf the job configuration.
553 * @return a handle to the {@link RunningJob} which can be used to track the
554 * running-job.
555 * @throws FileNotFoundException
556 * @throws IOException
557 */
558 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
559 IOException {
560 try {
561 conf.setBooleanIfUnset("mapred.mapper.new-api", false);
562 conf.setBooleanIfUnset("mapred.reducer.new-api", false);
563 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
564 @Override
565 public Job run() throws IOException, ClassNotFoundException,
566 InterruptedException {
567 Job job = Job.getInstance(conf);
568 job.submit();
569 return job;
570 }
571 });
572 // update our Cluster instance with the one created by Job for submission
573 // (we can't pass our Cluster instance to Job, since Job wraps the config
574 // instance, and the two configs would then diverge)
575 cluster = job.getCluster();
576 return new NetworkedJob(job);
577 } catch (InterruptedException ie) {
578 throw new IOException("interrupted", ie);
579 }
580 }
581
582 private Job getJobUsingCluster(final JobID jobid) throws IOException,
583 InterruptedException {
584 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
585 public Job run() throws IOException, InterruptedException {
586 return cluster.getJob(jobid);
587 }
588 });
589 }
590 /**
591 * Get an {@link RunningJob} object to track an ongoing job. Returns
592 * null if the id does not correspond to any known job.
593 *
594 * @param jobid the jobid of the job.
595 * @return the {@link RunningJob} handle to track the job, null if the
596 * <code>jobid</code> doesn't correspond to any known job.
597 * @throws IOException
598 */
599 public RunningJob getJob(final JobID jobid) throws IOException {
600 try {
601
602 Job job = getJobUsingCluster(jobid);
603 if (job != null) {
604 JobStatus status = JobStatus.downgrade(job.getStatus());
605 if (status != null) {
606 return new NetworkedJob(status, cluster);
607 }
608 }
609 } catch (InterruptedException ie) {
610 throw new IOException(ie);
611 }
612 return null;
613 }
614
615 /**@deprecated Applications should rather use {@link #getJob(JobID)}.
616 */
617 @Deprecated
618 public RunningJob getJob(String jobid) throws IOException {
619 return getJob(JobID.forName(jobid));
620 }
621
622 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
623
624 /**
625 * Get the information of the current state of the map tasks of a job.
626 *
627 * @param jobId the job to query.
628 * @return the list of all of the map tips.
629 * @throws IOException
630 */
631 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
632 return getTaskReports(jobId, TaskType.MAP);
633 }
634
635 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws
636 IOException {
637 try {
638 Job j = getJobUsingCluster(jobId);
639 if(j == null) {
640 return EMPTY_TASK_REPORTS;
641 }
642 return TaskReport.downgradeArray(j.getTaskReports(type));
643 } catch (InterruptedException ie) {
644 throw new IOException(ie);
645 }
646 }
647
648 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
649 @Deprecated
650 public TaskReport[] getMapTaskReports(String jobId) throws IOException {
651 return getMapTaskReports(JobID.forName(jobId));
652 }
653
654 /**
655 * Get the information of the current state of the reduce tasks of a job.
656 *
657 * @param jobId the job to query.
658 * @return the list of all of the reduce tips.
659 * @throws IOException
660 */
661 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
662 return getTaskReports(jobId, TaskType.REDUCE);
663 }
664
665 /**
666 * Get the information of the current state of the cleanup tasks of a job.
667 *
668 * @param jobId the job to query.
669 * @return the list of all of the cleanup tips.
670 * @throws IOException
671 */
672 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
673 return getTaskReports(jobId, TaskType.JOB_CLEANUP);
674 }
675
676 /**
677 * Get the information of the current state of the setup tasks of a job.
678 *
679 * @param jobId the job to query.
680 * @return the list of all of the setup tips.
681 * @throws IOException
682 */
683 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
684 return getTaskReports(jobId, TaskType.JOB_SETUP);
685 }
686
687
688 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
689 @Deprecated
690 public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
691 return getReduceTaskReports(JobID.forName(jobId));
692 }
693
694 /**
695 * Display the information about a job's tasks, of a particular type and
696 * in a particular state
697 *
698 * @param jobId the ID of the job
699 * @param type the type of the task (map/reduce/setup/cleanup)
700 * @param state the state of the task
701 * (pending/running/completed/failed/killed)
702 */
703 public void displayTasks(final JobID jobId, String type, String state)
704 throws IOException {
705 try {
706 Job job = getJobUsingCluster(jobId);
707 super.displayTasks(job, type, state);
708 } catch (InterruptedException ie) {
709 throw new IOException(ie);
710 }
711 }
712
713 /**
714 * Get status information about the Map-Reduce cluster.
715 *
716 * @return the status information about the Map-Reduce cluster as an object
717 * of {@link ClusterStatus}.
718 * @throws IOException
719 */
720 public ClusterStatus getClusterStatus() throws IOException {
721 try {
722 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
723 public ClusterStatus run() throws IOException, InterruptedException {
724 ClusterMetrics metrics = cluster.getClusterStatus();
725 return new ClusterStatus(metrics.getTaskTrackerCount(),
726 metrics.getBlackListedTaskTrackerCount(), cluster.getTaskTrackerExpiryInterval(),
727 metrics.getOccupiedMapSlots(),
728 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
729 metrics.getReduceSlotCapacity(),
730 cluster.getJobTrackerStatus(),
731 metrics.getDecommissionedTaskTrackerCount());
732 }
733 });
734 }
735 catch (InterruptedException ie) {
736 throw new IOException(ie);
737 }
738 }
739
740 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) {
741 Collection<String> list = new ArrayList<String>();
742 for (TaskTrackerInfo info: objs) {
743 list.add(info.getTaskTrackerName());
744 }
745 return list;
746 }
747
748 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) {
749 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>();
750 for (TaskTrackerInfo info: objs) {
751 BlackListInfo binfo = new BlackListInfo();
752 binfo.setTrackerName(info.getTaskTrackerName());
753 binfo.setReasonForBlackListing(info.getReasonForBlacklist());
754 binfo.setBlackListReport(info.getBlacklistReport());
755 list.add(binfo);
756 }
757 return list;
758 }
759
760 /**
761 * Get status information about the Map-Reduce cluster.
762 *
763 * @param detailed if true then get a detailed status including the
764 * tracker names
765 * @return the status information about the Map-Reduce cluster as an object
766 * of {@link ClusterStatus}.
767 * @throws IOException
768 */
769 public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
770 try {
771 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() {
772 public ClusterStatus run() throws IOException, InterruptedException {
773 ClusterMetrics metrics = cluster.getClusterStatus();
774 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()),
775 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()),
776 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(),
777 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(),
778 metrics.getReduceSlotCapacity(),
779 cluster.getJobTrackerStatus());
780 }
781 });
782 } catch (InterruptedException ie) {
783 throw new IOException(ie);
784 }
785 }
786
787
788 /**
789 * Get the jobs that are not completed and not failed.
790 *
791 * @return array of {@link JobStatus} for the running/to-be-run jobs.
792 * @throws IOException
793 */
794 public JobStatus[] jobsToComplete() throws IOException {
795 List<JobStatus> stats = new ArrayList<JobStatus>();
796 for (JobStatus stat : getAllJobs()) {
797 if (!stat.isJobComplete()) {
798 stats.add(stat);
799 }
800 }
801 return stats.toArray(new JobStatus[0]);
802 }
803
804 /**
805 * Get the jobs that are submitted.
806 *
807 * @return array of {@link JobStatus} for the submitted jobs.
808 * @throws IOException
809 */
810 public JobStatus[] getAllJobs() throws IOException {
811 try {
812 org.apache.hadoop.mapreduce.JobStatus[] jobs =
813 clientUgi.doAs(new PrivilegedExceptionAction<
814 org.apache.hadoop.mapreduce.JobStatus[]> () {
815 public org.apache.hadoop.mapreduce.JobStatus[] run()
816 throws IOException, InterruptedException {
817 return cluster.getAllJobStatuses();
818 }
819 });
820 JobStatus[] stats = new JobStatus[jobs.length];
821 for (int i = 0; i < jobs.length; i++) {
822 stats[i] = JobStatus.downgrade(jobs[i]);
823 }
824 return stats;
825 } catch (InterruptedException ie) {
826 throw new IOException(ie);
827 }
828 }
829
830 /**
831 * Utility that submits a job, then polls for progress until the job is
832 * complete.
833 *
834 * @param job the job configuration.
835 * @throws IOException if the job fails
836 */
837 public static RunningJob runJob(JobConf job) throws IOException {
838 JobClient jc = new JobClient(job);
839 RunningJob rj = jc.submitJob(job);
840 try {
841 if (!jc.monitorAndPrintJob(job, rj)) {
842 throw new IOException("Job failed!");
843 }
844 } catch (InterruptedException ie) {
845 Thread.currentThread().interrupt();
846 }
847 return rj;
848 }
849
850 /**
851 * Monitor a job and print status in real-time as progress is made and tasks
852 * fail.
853 * @param conf the job's configuration
854 * @param job the job to track
855 * @return true if the job succeeded
856 * @throws IOException if communication to the JobTracker fails
857 */
858 public boolean monitorAndPrintJob(JobConf conf,
859 RunningJob job
860 ) throws IOException, InterruptedException {
861 return ((NetworkedJob)job).monitorAndPrintJob();
862 }
863
864 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
865 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
866 }
867
868 static Configuration getConfiguration(String jobTrackerSpec)
869 {
870 Configuration conf = new Configuration();
871 if (jobTrackerSpec != null) {
872 if (jobTrackerSpec.indexOf(":") >= 0) {
873 conf.set("mapred.job.tracker", jobTrackerSpec);
874 } else {
875 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
876 URL validate = conf.getResource(classpathFile);
877 if (validate == null) {
878 throw new RuntimeException(classpathFile + " not found on CLASSPATH");
879 }
880 conf.addResource(classpathFile);
881 }
882 }
883 return conf;
884 }
885
886 /**
887 * Sets the output filter for tasks. only those tasks are printed whose
888 * output matches the filter.
889 * @param newValue task filter.
890 */
891 @Deprecated
892 public void setTaskOutputFilter(TaskStatusFilter newValue){
893 this.taskOutputFilter = newValue;
894 }
895
896 /**
897 * Get the task output filter out of the JobConf.
898 *
899 * @param job the JobConf to examine.
900 * @return the filter level.
901 */
902 public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
903 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter",
904 "FAILED"));
905 }
906
907 /**
908 * Modify the JobConf to set the task output filter.
909 *
910 * @param job the JobConf to modify.
911 * @param newValue the value to set.
912 */
913 public static void setTaskOutputFilter(JobConf job,
914 TaskStatusFilter newValue) {
915 job.set("jobclient.output.filter", newValue.toString());
916 }
917
918 /**
919 * Returns task output filter.
920 * @return task filter.
921 */
922 @Deprecated
923 public TaskStatusFilter getTaskOutputFilter(){
924 return this.taskOutputFilter;
925 }
926
927 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs,
928 String counterGroupName, String counterName) throws IOException {
929 Counters counters = Counters.downgrade(cntrs);
930 return counters.findCounter(counterGroupName, counterName).getValue();
931 }
932
933 /**
934 * Get status information about the max available Maps in the cluster.
935 *
936 * @return the max available Maps in the cluster
937 * @throws IOException
938 */
939 public int getDefaultMaps() throws IOException {
940 try {
941 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
942 @Override
943 public Integer run() throws IOException, InterruptedException {
944 return cluster.getClusterStatus().getMapSlotCapacity();
945 }
946 });
947 } catch (InterruptedException ie) {
948 throw new IOException(ie);
949 }
950 }
951
952 /**
953 * Get status information about the max available Reduces in the cluster.
954 *
955 * @return the max available Reduces in the cluster
956 * @throws IOException
957 */
958 public int getDefaultReduces() throws IOException {
959 try {
960 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() {
961 @Override
962 public Integer run() throws IOException, InterruptedException {
963 return cluster.getClusterStatus().getReduceSlotCapacity();
964 }
965 });
966 } catch (InterruptedException ie) {
967 throw new IOException(ie);
968 }
969 }
970
971 /**
972 * Grab the jobtracker system directory path where job-specific files are to be placed.
973 *
974 * @return the system directory where job-specific files are to be placed.
975 */
976 public Path getSystemDir() {
977 try {
978 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
979 @Override
980 public Path run() throws IOException, InterruptedException {
981 return cluster.getSystemDir();
982 }
983 });
984 } catch (IOException ioe) {
985 return null;
986 } catch (InterruptedException ie) {
987 return null;
988 }
989 }
990
991 private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
992 JobQueueInfo ret = new JobQueueInfo(queue);
993 // make sure to convert any children
994 if (queue.getQueueChildren().size() > 0) {
995 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue
996 .getQueueChildren().size());
997 for (QueueInfo child : queue.getQueueChildren()) {
998 childQueues.add(getJobQueueInfo(child));
999 }
1000 ret.setChildren(childQueues);
1001 }
1002 return ret;
1003 }
1004
1005 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues)
1006 throws IOException {
1007 JobQueueInfo[] ret = new JobQueueInfo[queues.length];
1008 for (int i = 0; i < queues.length; i++) {
1009 ret[i] = getJobQueueInfo(queues[i]);
1010 }
1011 return ret;
1012 }
1013
1014 /**
1015 * Returns an array of queue information objects about root level queues
1016 * configured
1017 *
1018 * @return the array of root level JobQueueInfo objects
1019 * @throws IOException
1020 */
1021 public JobQueueInfo[] getRootQueues() throws IOException {
1022 try {
1023 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1024 public JobQueueInfo[] run() throws IOException, InterruptedException {
1025 return getJobQueueInfoArray(cluster.getRootQueues());
1026 }
1027 });
1028 } catch (InterruptedException ie) {
1029 throw new IOException(ie);
1030 }
1031 }
1032
1033 /**
1034 * Returns an array of queue information objects about immediate children
1035 * of queue queueName.
1036 *
1037 * @param queueName
1038 * @return the array of immediate children JobQueueInfo objects
1039 * @throws IOException
1040 */
1041 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException {
1042 try {
1043 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1044 public JobQueueInfo[] run() throws IOException, InterruptedException {
1045 return getJobQueueInfoArray(cluster.getChildQueues(queueName));
1046 }
1047 });
1048 } catch (InterruptedException ie) {
1049 throw new IOException(ie);
1050 }
1051 }
1052
1053 /**
1054 * Return an array of queue information objects about all the Job Queues
1055 * configured.
1056 *
1057 * @return Array of JobQueueInfo objects
1058 * @throws IOException
1059 */
1060 public JobQueueInfo[] getQueues() throws IOException {
1061 try {
1062 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() {
1063 public JobQueueInfo[] run() throws IOException, InterruptedException {
1064 return getJobQueueInfoArray(cluster.getQueues());
1065 }
1066 });
1067 } catch (InterruptedException ie) {
1068 throw new IOException(ie);
1069 }
1070 }
1071
1072 /**
1073 * Gets all the jobs which were added to particular Job Queue
1074 *
1075 * @param queueName name of the Job Queue
1076 * @return Array of jobs present in the job queue
1077 * @throws IOException
1078 */
1079
1080 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException {
1081 try {
1082 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() {
1083 @Override
1084 public QueueInfo run() throws IOException, InterruptedException {
1085 return cluster.getQueue(queueName);
1086 }
1087 });
1088 if (queue == null) {
1089 return null;
1090 }
1091 org.apache.hadoop.mapreduce.JobStatus[] stats =
1092 queue.getJobStatuses();
1093 JobStatus[] ret = new JobStatus[stats.length];
1094 for (int i = 0 ; i < stats.length; i++ ) {
1095 ret[i] = JobStatus.downgrade(stats[i]);
1096 }
1097 return ret;
1098 } catch (InterruptedException ie) {
1099 throw new IOException(ie);
1100 }
1101 }
1102
1103 /**
1104 * Gets the queue information associated to a particular Job Queue
1105 *
1106 * @param queueName name of the job queue.
1107 * @return Queue information associated to particular queue.
1108 * @throws IOException
1109 */
1110 public JobQueueInfo getQueueInfo(final String queueName) throws IOException {
1111 try {
1112 QueueInfo queueInfo = clientUgi.doAs(new
1113 PrivilegedExceptionAction<QueueInfo>() {
1114 public QueueInfo run() throws IOException, InterruptedException {
1115 return cluster.getQueue(queueName);
1116 }
1117 });
1118 if (queueInfo != null) {
1119 return new JobQueueInfo(queueInfo);
1120 }
1121 return null;
1122 } catch (InterruptedException ie) {
1123 throw new IOException(ie);
1124 }
1125 }
1126
1127 /**
1128 * Gets the Queue ACLs for current user
1129 * @return array of QueueAclsInfo object for current user.
1130 * @throws IOException
1131 */
1132 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
1133 try {
1134 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls =
1135 clientUgi.doAs(new
1136 PrivilegedExceptionAction
1137 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() {
1138 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run()
1139 throws IOException, InterruptedException {
1140 return cluster.getQueueAclsForCurrentUser();
1141 }
1142 });
1143 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length];
1144 for (int i = 0 ; i < acls.length; i++ ) {
1145 ret[i] = QueueAclsInfo.downgrade(acls[i]);
1146 }
1147 return ret;
1148 } catch (InterruptedException ie) {
1149 throw new IOException(ie);
1150 }
1151 }
1152
1153 /**
1154 * Get a delegation token for the user from the JobTracker.
1155 * @param renewer the user who can renew the token
1156 * @return the new token
1157 * @throws IOException
1158 */
1159 public Token<DelegationTokenIdentifier>
1160 getDelegationToken(final Text renewer) throws IOException, InterruptedException {
1161 return clientUgi.doAs(new
1162 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
1163 public Token<DelegationTokenIdentifier> run() throws IOException,
1164 InterruptedException {
1165 return cluster.getDelegationToken(renewer);
1166 }
1167 });
1168 }
1169
1170 /**
1171 * Renew a delegation token
1172 * @param token the token to renew
1173 * @return true if the renewal went well
1174 * @throws InvalidToken
1175 * @throws IOException
1176 * @deprecated Use {@link Token#renew} instead
1177 */
1178 public long renewDelegationToken(Token<DelegationTokenIdentifier> token
1179 ) throws InvalidToken, IOException,
1180 InterruptedException {
1181 return token.renew(getConf());
1182 }
1183
1184 /**
1185 * Cancel a delegation token from the JobTracker
1186 * @param token the token to cancel
1187 * @throws IOException
1188 * @deprecated Use {@link Token#cancel} instead
1189 */
1190 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
1191 ) throws InvalidToken, IOException,
1192 InterruptedException {
1193 token.cancel(getConf());
1194 }
1195
1196 /**
1197 */
1198 public static void main(String argv[]) throws Exception {
1199 int res = ToolRunner.run(new JobClient(), argv);
1200 System.exit(res);
1201 }
1202 }
1203