001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.protocol.datatransfer;
019
020import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.util.ArrayList;
026
027import com.google.common.collect.Lists;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030
031import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
032import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
033import com.google.protobuf.TextFormat;
034import org.apache.hadoop.hdfs.util.LongBitFormat;
035
036/** Pipeline Acknowledgment **/
037@InterfaceAudience.Private
038@InterfaceStability.Evolving
039public class PipelineAck {
040  PipelineAckProto proto;
041  public final static long UNKOWN_SEQNO = -2;
042  final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type
043  final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type
044
045  public enum ECN {
046    DISABLED(0),
047    SUPPORTED(1),
048    SUPPORTED2(2),
049    CONGESTED(3);
050
051    private final int value;
052    private static final ECN[] VALUES = values();
053    static ECN valueOf(int value) {
054      return VALUES[value];
055    }
056
057    ECN(int value) {
058      this.value = value;
059    }
060
061    public int getValue() {
062      return value;
063    }
064  }
065
066  private enum StatusFormat {
067    STATUS(null, 4),
068    RESERVED(STATUS.BITS, 1),
069    ECN_BITS(RESERVED.BITS, 2);
070
071    private final LongBitFormat BITS;
072
073    StatusFormat(LongBitFormat prev, int bits) {
074      BITS = new LongBitFormat(name(), prev, bits, 0);
075    }
076
077    static Status getStatus(int header) {
078      return Status.valueOf((int) STATUS.BITS.retrieve(header));
079    }
080
081    static ECN getECN(int header) {
082      return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header));
083    }
084
085    public static int setStatus(int old, Status status) {
086      return (int) STATUS.BITS.combine(status.getNumber(), old);
087    }
088
089    public static int setECN(int old, ECN ecn) {
090      return (int) ECN_BITS.BITS.combine(ecn.getValue(), old);
091    }
092  }
093
094  /** default constructor **/
095  public PipelineAck() {
096  }
097
098  /**
099   * Constructor assuming no next DN in pipeline
100   * @param seqno sequence number
101   * @param replies an array of replies
102   */
103  public PipelineAck(long seqno, int[] replies) {
104    this(seqno, replies, 0L);
105  }
106
107  /**
108   * Constructor
109   * @param seqno sequence number
110   * @param replies an array of replies
111   * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
112   */
113  public PipelineAck(long seqno, int[] replies,
114                     long downstreamAckTimeNanos) {
115    ArrayList<Status> statusList = Lists.newArrayList();
116    ArrayList<Integer> flagList = Lists.newArrayList();
117    for (int r : replies) {
118      statusList.add(StatusFormat.getStatus(r));
119      flagList.add(r);
120    }
121    proto = PipelineAckProto.newBuilder()
122      .setSeqno(seqno)
123      .addAllReply(statusList)
124      .addAllFlag(flagList)
125      .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
126      .build();
127  }
128
129  /**
130   * Get the sequence number
131   * @return the sequence number
132   */
133  public long getSeqno() {
134    return proto.getSeqno();
135  }
136
137  /**
138   * Get the number of replies
139   * @return the number of replies
140   */
141  public short getNumOfReplies() {
142    return (short)proto.getReplyCount();
143  }
144
145  /**
146   * get the header flag of ith reply
147   */
148  public int getHeaderFlag(int i) {
149    if (proto.getFlagCount() > 0) {
150      return proto.getFlag(i);
151    } else {
152      return combineHeader(ECN.DISABLED, proto.getReply(i));
153    }
154  }
155
156  public int getFlag(int i) {
157    return proto.getFlag(i);
158  }
159
160  /**
161   * Get the time elapsed for downstream ack RTT in nanoseconds
162   * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline
163   */
164  public long getDownstreamAckTimeNanos() {
165    return proto.getDownstreamAckTimeNanos();
166  }
167
168  /**
169   * Check if this ack contains error status
170   * @return true if all statuses are SUCCESS
171   */
172  public boolean isSuccess() {
173    for (Status s : proto.getReplyList()) {
174      if (s != Status.SUCCESS) {
175        return false;
176      }
177    }
178    return true;
179  }
180
181  /**
182   * Returns the OOB status if this ack contains one.
183   * @return null if it is not an OOB ack.
184   */
185  public Status getOOBStatus() {
186    // Normal data transfer acks will have a valid sequence number, so
187    // this will return right away in most cases.
188    if (getSeqno() != UNKOWN_SEQNO) {
189      return null;
190    }
191    for (Status s : proto.getReplyList()) {
192      // The following check is valid because protobuf guarantees to
193      // preserve the ordering of enum elements.
194      if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
195        return s;
196      }
197    }
198    return null;
199  }
200
201  /** Get the Restart OOB ack status */
202  public static Status getRestartOOBStatus() {
203    return Status.OOB_RESTART;
204  }
205
206  /** return true if it is the restart OOB status code  */
207  public static boolean isRestartOOBStatus(Status st) {
208    return st.equals(Status.OOB_RESTART);
209  }
210
211  /**** Writable interface ****/
212  public void readFields(InputStream in) throws IOException {
213    proto = PipelineAckProto.parseFrom(vintPrefixed(in));
214  }
215
216  public void write(OutputStream out) throws IOException {
217    proto.writeDelimitedTo(out);
218  }
219
220  @Override //Object
221  public String toString() {
222    return TextFormat.shortDebugString(proto);
223  }
224
225  public static Status getStatusFromHeader(int header) {
226    return StatusFormat.getStatus(header);
227  }
228
229  public static ECN getECNFromHeader(int header) {
230    return StatusFormat.getECN(header);
231  }
232
233  public static int setStatusForHeader(int old, Status status) {
234    return StatusFormat.setStatus(old, status);
235  }
236
237  public static int combineHeader(ECN ecn, Status status) {
238    int header = 0;
239    header = StatusFormat.setStatus(header, status);
240    header = StatusFormat.setECN(header, ecn);
241    return header;
242  }
243}