001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.protocol.datatransfer; 019 020import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; 021 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.util.ArrayList; 026 027import com.google.common.collect.Lists; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030 031import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; 032import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 033import com.google.protobuf.TextFormat; 034import org.apache.hadoop.hdfs.util.LongBitFormat; 035 036/** Pipeline Acknowledgment **/ 037@InterfaceAudience.Private 038@InterfaceStability.Evolving 039public class PipelineAck { 040 PipelineAckProto proto; 041 public final static long UNKOWN_SEQNO = -2; 042 final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type 043 final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type 044 045 public enum ECN { 046 DISABLED(0), 047 SUPPORTED(1), 048 SUPPORTED2(2), 049 CONGESTED(3); 050 051 private final int value; 052 private static final ECN[] VALUES = values(); 053 static ECN valueOf(int value) { 054 return VALUES[value]; 055 } 056 057 ECN(int value) { 058 this.value = value; 059 } 060 061 public int getValue() { 062 return value; 063 } 064 } 065 066 private enum StatusFormat { 067 STATUS(null, 4), 068 RESERVED(STATUS.BITS, 1), 069 ECN_BITS(RESERVED.BITS, 2); 070 071 private final LongBitFormat BITS; 072 073 StatusFormat(LongBitFormat prev, int bits) { 074 BITS = new LongBitFormat(name(), prev, bits, 0); 075 } 076 077 static Status getStatus(int header) { 078 return Status.valueOf((int) STATUS.BITS.retrieve(header)); 079 } 080 081 static ECN getECN(int header) { 082 return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header)); 083 } 084 085 public static int setStatus(int old, Status status) { 086 return (int) STATUS.BITS.combine(status.getNumber(), old); 087 } 088 089 public static int setECN(int old, ECN ecn) { 090 return (int) ECN_BITS.BITS.combine(ecn.getValue(), old); 091 } 092 } 093 094 /** default constructor **/ 095 public PipelineAck() { 096 } 097 098 /** 099 * Constructor assuming no next DN in pipeline 100 * @param seqno sequence number 101 * @param replies an array of replies 102 */ 103 public PipelineAck(long seqno, int[] replies) { 104 this(seqno, replies, 0L); 105 } 106 107 /** 108 * Constructor 109 * @param seqno sequence number 110 * @param replies an array of replies 111 * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline 112 */ 113 public PipelineAck(long seqno, int[] replies, 114 long downstreamAckTimeNanos) { 115 ArrayList<Status> statusList = Lists.newArrayList(); 116 ArrayList<Integer> flagList = Lists.newArrayList(); 117 for (int r : replies) { 118 statusList.add(StatusFormat.getStatus(r)); 119 flagList.add(r); 120 } 121 proto = PipelineAckProto.newBuilder() 122 .setSeqno(seqno) 123 .addAllReply(statusList) 124 .addAllFlag(flagList) 125 .setDownstreamAckTimeNanos(downstreamAckTimeNanos) 126 .build(); 127 } 128 129 /** 130 * Get the sequence number 131 * @return the sequence number 132 */ 133 public long getSeqno() { 134 return proto.getSeqno(); 135 } 136 137 /** 138 * Get the number of replies 139 * @return the number of replies 140 */ 141 public short getNumOfReplies() { 142 return (short)proto.getReplyCount(); 143 } 144 145 /** 146 * get the header flag of ith reply 147 */ 148 public int getHeaderFlag(int i) { 149 if (proto.getFlagCount() > 0) { 150 return proto.getFlag(i); 151 } else { 152 return combineHeader(ECN.DISABLED, proto.getReply(i)); 153 } 154 } 155 156 public int getFlag(int i) { 157 return proto.getFlag(i); 158 } 159 160 /** 161 * Get the time elapsed for downstream ack RTT in nanoseconds 162 * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline 163 */ 164 public long getDownstreamAckTimeNanos() { 165 return proto.getDownstreamAckTimeNanos(); 166 } 167 168 /** 169 * Check if this ack contains error status 170 * @return true if all statuses are SUCCESS 171 */ 172 public boolean isSuccess() { 173 for (Status s : proto.getReplyList()) { 174 if (s != Status.SUCCESS) { 175 return false; 176 } 177 } 178 return true; 179 } 180 181 /** 182 * Returns the OOB status if this ack contains one. 183 * @return null if it is not an OOB ack. 184 */ 185 public Status getOOBStatus() { 186 // Normal data transfer acks will have a valid sequence number, so 187 // this will return right away in most cases. 188 if (getSeqno() != UNKOWN_SEQNO) { 189 return null; 190 } 191 for (Status s : proto.getReplyList()) { 192 // The following check is valid because protobuf guarantees to 193 // preserve the ordering of enum elements. 194 if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) { 195 return s; 196 } 197 } 198 return null; 199 } 200 201 /** Get the Restart OOB ack status */ 202 public static Status getRestartOOBStatus() { 203 return Status.OOB_RESTART; 204 } 205 206 /** return true if it is the restart OOB status code */ 207 public static boolean isRestartOOBStatus(Status st) { 208 return st.equals(Status.OOB_RESTART); 209 } 210 211 /**** Writable interface ****/ 212 public void readFields(InputStream in) throws IOException { 213 proto = PipelineAckProto.parseFrom(vintPrefixed(in)); 214 } 215 216 public void write(OutputStream out) throws IOException { 217 proto.writeDelimitedTo(out); 218 } 219 220 @Override //Object 221 public String toString() { 222 return TextFormat.shortDebugString(proto); 223 } 224 225 public static Status getStatusFromHeader(int header) { 226 return StatusFormat.getStatus(header); 227 } 228 229 public static ECN getECNFromHeader(int header) { 230 return StatusFormat.getECN(header); 231 } 232 233 public static int setStatusForHeader(int old, Status status) { 234 return StatusFormat.setStatus(old, status); 235 } 236 237 public static int combineHeader(ECN ecn, Status status) { 238 int header = 0; 239 header = StatusFormat.setStatus(header, status); 240 header = StatusFormat.setECN(header, ecn); 241 return header; 242 } 243}