001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.io.Writable;
028import org.apache.hadoop.io.WritableUtils;
029
030/**
031 * This is used to track task completion events on 
032 * job tracker. 
033 */
034@InterfaceAudience.Public
035@InterfaceStability.Evolving
036public class TaskCompletionEvent implements Writable{
037  @InterfaceAudience.Public
038  @InterfaceStability.Evolving
039  /**
040   * Task Completion Statuses
041   */
042  static public enum Status {
043    /**
044     * Task Event Attempt failed but there are attempts remaining.
045     */
046    FAILED,
047    /**
048     * Task Event was killed.
049     */
050    KILLED,
051    /**
052     * Task Event was successful.
053     */
054    SUCCEEDED,
055    /**
056     * Used to Override a previously successful event status.
057     * Example:  Map attempt runs and a SUCCEEDED event is sent. Later a task
058     * is retroactively failed due to excessive fetch failure during shuffle
059     * phase. When the retroactive attempt failure occurs, an OBSOLETE event is
060     * sent for the map attempt indicating the prior event is no longer valid.
061     */
062    OBSOLETE,
063    /**
064     * Task Event attempt failed and no further attempts exist.
065     * reached MAX attempts. When a reducer receives a TIPFAILED event it
066     * gives up trying to shuffle data from that map task.
067     */
068    TIPFAILED
069  }
070    
071  private int eventId; 
072  private String taskTrackerHttp;
073  private int taskRunTime; // using int since runtime is the time difference
074  private TaskAttemptID taskId;
075  Status status; 
076  boolean isMap = false;
077  private int idWithinJob;
078  public static final TaskCompletionEvent[] EMPTY_ARRAY = 
079    new TaskCompletionEvent[0];
080  /**
081   * Default constructor for Writable.
082   *
083   */
084  public TaskCompletionEvent(){
085    taskId = new TaskAttemptID();
086  }
087
088  /**
089   * Constructor. eventId should be created externally and incremented
090   * per event for each job. 
091   * @param eventId event id, event id should be unique and assigned in
092   *  incrementally, starting from 0. 
093   * @param taskId task id
094   * @param status task's status 
095   * @param taskTrackerHttp task tracker's host:port for http. 
096   */
097  public TaskCompletionEvent(int eventId, 
098                             TaskAttemptID taskId,
099                             int idWithinJob,
100                             boolean isMap,
101                             Status status, 
102                             String taskTrackerHttp){
103      
104    this.taskId = taskId;
105    this.idWithinJob = idWithinJob;
106    this.isMap = isMap;
107    this.eventId = eventId; 
108    this.status =status; 
109    this.taskTrackerHttp = taskTrackerHttp;
110  }
111  /**
112   * Returns event Id. 
113   * @return event id
114   */
115  public int getEventId() {
116    return eventId;
117  }
118  
119  /**
120   * Returns task id. 
121   * @return task id
122   */
123  public TaskAttemptID getTaskAttemptId() {
124    return taskId;
125  }
126  
127  /**
128   * Returns {@link Status}
129   * @return task completion status
130   */
131  public Status getStatus() {
132    return status;
133  }
134  /**
135   * http location of the tasktracker where this task ran. 
136   * @return http location of tasktracker user logs
137   */
138  public String getTaskTrackerHttp() {
139    return taskTrackerHttp;
140  }
141
142  /**
143   * Returns time (in millisec) the task took to complete. 
144   */
145  public int getTaskRunTime() {
146    return taskRunTime;
147  }
148
149  /**
150   * Set the task completion time
151   * @param taskCompletionTime time (in millisec) the task took to complete
152   */
153  protected void setTaskRunTime(int taskCompletionTime) {
154    this.taskRunTime = taskCompletionTime;
155  }
156
157  /**
158   * set event Id. should be assigned incrementally starting from 0. 
159   * @param eventId
160   */
161  protected void setEventId(int eventId) {
162    this.eventId = eventId;
163  }
164
165  /**
166   * Sets task id. 
167   * @param taskId
168   */
169  protected void setTaskAttemptId(TaskAttemptID taskId) {
170    this.taskId = taskId;
171  }
172  
173  /**
174   * Set task status. 
175   * @param status
176   */
177  protected void setTaskStatus(Status status) {
178    this.status = status;
179  }
180  
181  /**
182   * Set task tracker http location. 
183   * @param taskTrackerHttp
184   */
185  protected void setTaskTrackerHttp(String taskTrackerHttp) {
186    this.taskTrackerHttp = taskTrackerHttp;
187  }
188    
189  @Override
190  public String toString(){
191    StringBuffer buf = new StringBuffer(); 
192    buf.append("Task Id : "); 
193    buf.append(taskId); 
194    buf.append(", Status : ");  
195    buf.append(status.name());
196    return buf.toString();
197  }
198    
199  @Override
200  public boolean equals(Object o) {
201    if(o == null)
202      return false;
203    if(o.getClass().equals(this.getClass())) {
204      TaskCompletionEvent event = (TaskCompletionEvent) o;
205      return this.isMap == event.isMapTask() 
206             && this.eventId == event.getEventId()
207             && this.idWithinJob == event.idWithinJob() 
208             && this.status.equals(event.getStatus())
209             && this.taskId.equals(event.getTaskAttemptId()) 
210             && this.taskRunTime == event.getTaskRunTime()
211             && this.taskTrackerHttp.equals(event.getTaskTrackerHttp());
212    }
213    return false;
214  }
215
216  @Override
217  public int hashCode() {
218    return toString().hashCode(); 
219  }
220
221  public boolean isMapTask() {
222    return isMap;
223  }
224    
225  public int idWithinJob() {
226    return idWithinJob;
227  }
228  //////////////////////////////////////////////
229  // Writable
230  //////////////////////////////////////////////
231  public void write(DataOutput out) throws IOException {
232    taskId.write(out); 
233    WritableUtils.writeVInt(out, idWithinJob);
234    out.writeBoolean(isMap);
235    WritableUtils.writeEnum(out, status); 
236    WritableUtils.writeString(out, taskTrackerHttp);
237    WritableUtils.writeVInt(out, taskRunTime);
238    WritableUtils.writeVInt(out, eventId);
239  }
240  
241  public void readFields(DataInput in) throws IOException {
242    taskId.readFields(in); 
243    idWithinJob = WritableUtils.readVInt(in);
244    isMap = in.readBoolean();
245    status = WritableUtils.readEnum(in, Status.class);
246    taskTrackerHttp = WritableUtils.readString(in);
247    taskRunTime = WritableUtils.readVInt(in);
248    eventId = WritableUtils.readVInt(in);
249  }
250}