001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hdfs.protocolPB; 020 021import java.io.IOException; 022import java.util.List; 023 024import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; 025import org.apache.hadoop.hdfs.protocol.DatanodeID; 026import org.apache.hadoop.hdfs.protocol.LocatedBlock; 027import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; 028import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto; 029import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto; 030import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto; 031import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto; 032import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto; 033import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto; 034import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto; 035import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto; 036import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto; 037import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto; 038import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto; 039import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto; 040import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; 041import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto; 042import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto; 043import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto; 044import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto; 045import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto; 046import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; 047import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; 048import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; 049import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RollingUpgradeStatusProto; 050import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; 051import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionResponseProto; 052import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; 053import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; 054import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; 055import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; 056import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; 057import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; 058import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; 059import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; 060import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; 061import org.apache.hadoop.hdfs.server.protocol.StorageReport; 062import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; 063 064import com.google.common.base.Preconditions; 065import com.google.protobuf.RpcController; 066import com.google.protobuf.ServiceException; 067 068public class DatanodeProtocolServerSideTranslatorPB implements 069 DatanodeProtocolPB { 070 071 private final DatanodeProtocol impl; 072 private final int maxDataLength; 073 074 private static final ErrorReportResponseProto 075 VOID_ERROR_REPORT_RESPONSE_PROTO = 076 ErrorReportResponseProto.newBuilder().build(); 077 private static final BlockReceivedAndDeletedResponseProto 078 VOID_BLOCK_RECEIVED_AND_DELETE_RESPONSE = 079 BlockReceivedAndDeletedResponseProto.newBuilder().build(); 080 private static final ReportBadBlocksResponseProto 081 VOID_REPORT_BAD_BLOCK_RESPONSE = 082 ReportBadBlocksResponseProto.newBuilder().build(); 083 private static final CommitBlockSynchronizationResponseProto 084 VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO = 085 CommitBlockSynchronizationResponseProto.newBuilder().build(); 086 087 public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl, 088 int maxDataLength) { 089 this.impl = impl; 090 this.maxDataLength = maxDataLength; 091 } 092 093 @Override 094 public RegisterDatanodeResponseProto registerDatanode( 095 RpcController controller, RegisterDatanodeRequestProto request) 096 throws ServiceException { 097 DatanodeRegistration registration = PBHelper.convert(request 098 .getRegistration()); 099 DatanodeRegistration registrationResp; 100 try { 101 registrationResp = impl.registerDatanode(registration); 102 } catch (IOException e) { 103 throw new ServiceException(e); 104 } 105 return RegisterDatanodeResponseProto.newBuilder() 106 .setRegistration(PBHelper.convert(registrationResp)).build(); 107 } 108 109 @Override 110 public HeartbeatResponseProto sendHeartbeat(RpcController controller, 111 HeartbeatRequestProto request) throws ServiceException { 112 HeartbeatResponse response; 113 try { 114 final StorageReport[] report = PBHelperClient.convertStorageReports( 115 request.getReportsList()); 116 VolumeFailureSummary volumeFailureSummary = 117 request.hasVolumeFailureSummary() ? PBHelper.convertVolumeFailureSummary( 118 request.getVolumeFailureSummary()) : null; 119 response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), 120 report, request.getCacheCapacity(), request.getCacheUsed(), 121 request.getXmitsInProgress(), 122 request.getXceiverCount(), request.getFailedVolumes(), 123 volumeFailureSummary, request.getRequestFullBlockReportLease()); 124 } catch (IOException e) { 125 throw new ServiceException(e); 126 } 127 HeartbeatResponseProto.Builder builder = HeartbeatResponseProto 128 .newBuilder(); 129 DatanodeCommand[] cmds = response.getCommands(); 130 if (cmds != null) { 131 for (int i = 0; i < cmds.length; i++) { 132 if (cmds[i] != null) { 133 builder.addCmds(PBHelper.convert(cmds[i])); 134 } 135 } 136 } 137 builder.setHaStatus(PBHelper.convert(response.getNameNodeHaState())); 138 RollingUpgradeStatus rollingUpdateStatus = response 139 .getRollingUpdateStatus(); 140 if (rollingUpdateStatus != null) { 141 // V2 is always set for newer datanodes. 142 // To be compatible with older datanodes, V1 is set to null 143 // if the RU was finalized. 144 RollingUpgradeStatusProto rus = PBHelperClient. 145 convertRollingUpgradeStatus(rollingUpdateStatus); 146 builder.setRollingUpgradeStatusV2(rus); 147 if (!rollingUpdateStatus.isFinalized()) { 148 builder.setRollingUpgradeStatus(rus); 149 } 150 } 151 152 builder.setFullBlockReportLeaseId(response.getFullBlockReportLeaseId()); 153 return builder.build(); 154 } 155 156 @Override 157 public BlockReportResponseProto blockReport(RpcController controller, 158 BlockReportRequestProto request) throws ServiceException { 159 DatanodeCommand cmd = null; 160 StorageBlockReport[] report = 161 new StorageBlockReport[request.getReportsCount()]; 162 163 int index = 0; 164 for (StorageBlockReportProto s : request.getReportsList()) { 165 final BlockListAsLongs blocks; 166 if (s.hasNumberOfBlocks()) { // new style buffer based reports 167 int num = (int)s.getNumberOfBlocks(); 168 Preconditions.checkState(s.getBlocksCount() == 0, 169 "cannot send both blocks list and buffers"); 170 blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList(), 171 maxDataLength); 172 } else { 173 blocks = BlockListAsLongs.decodeLongs(s.getBlocksList(), maxDataLength); 174 } 175 report[index++] = new StorageBlockReport(PBHelperClient.convert(s.getStorage()), 176 blocks); 177 } 178 try { 179 cmd = impl.blockReport(PBHelper.convert(request.getRegistration()), 180 request.getBlockPoolId(), report, 181 request.hasContext() ? 182 PBHelper.convert(request.getContext()) : null); 183 } catch (IOException e) { 184 throw new ServiceException(e); 185 } 186 BlockReportResponseProto.Builder builder = 187 BlockReportResponseProto.newBuilder(); 188 if (cmd != null) { 189 builder.setCmd(PBHelper.convert(cmd)); 190 } 191 return builder.build(); 192 } 193 194 @Override 195 public CacheReportResponseProto cacheReport(RpcController controller, 196 CacheReportRequestProto request) throws ServiceException { 197 DatanodeCommand cmd = null; 198 try { 199 cmd = impl.cacheReport( 200 PBHelper.convert(request.getRegistration()), 201 request.getBlockPoolId(), 202 request.getBlocksList()); 203 } catch (IOException e) { 204 throw new ServiceException(e); 205 } 206 CacheReportResponseProto.Builder builder = 207 CacheReportResponseProto.newBuilder(); 208 if (cmd != null) { 209 builder.setCmd(PBHelper.convert(cmd)); 210 } 211 return builder.build(); 212 } 213 214 215 @Override 216 public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted( 217 RpcController controller, BlockReceivedAndDeletedRequestProto request) 218 throws ServiceException { 219 List<StorageReceivedDeletedBlocksProto> sBlocks = request.getBlocksList(); 220 StorageReceivedDeletedBlocks[] info = 221 new StorageReceivedDeletedBlocks[sBlocks.size()]; 222 for (int i = 0; i < sBlocks.size(); i++) { 223 StorageReceivedDeletedBlocksProto sBlock = sBlocks.get(i); 224 List<ReceivedDeletedBlockInfoProto> list = sBlock.getBlocksList(); 225 ReceivedDeletedBlockInfo[] rdBlocks = 226 new ReceivedDeletedBlockInfo[list.size()]; 227 for (int j = 0; j < list.size(); j++) { 228 rdBlocks[j] = PBHelper.convert(list.get(j)); 229 } 230 if (sBlock.hasStorage()) { 231 info[i] = new StorageReceivedDeletedBlocks( 232 PBHelperClient.convert(sBlock.getStorage()), rdBlocks); 233 } else { 234 info[i] = new StorageReceivedDeletedBlocks( 235 new DatanodeStorage(sBlock.getStorageUuid()), rdBlocks); 236 } 237 } 238 try { 239 impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()), 240 request.getBlockPoolId(), info); 241 } catch (IOException e) { 242 throw new ServiceException(e); 243 } 244 return VOID_BLOCK_RECEIVED_AND_DELETE_RESPONSE; 245 } 246 247 @Override 248 public ErrorReportResponseProto errorReport(RpcController controller, 249 ErrorReportRequestProto request) throws ServiceException { 250 try { 251 impl.errorReport(PBHelper.convert(request.getRegistartion()), 252 request.getErrorCode(), request.getMsg()); 253 } catch (IOException e) { 254 throw new ServiceException(e); 255 } 256 return VOID_ERROR_REPORT_RESPONSE_PROTO; 257 } 258 259 @Override 260 public VersionResponseProto versionRequest(RpcController controller, 261 VersionRequestProto request) throws ServiceException { 262 NamespaceInfo info; 263 try { 264 info = impl.versionRequest(); 265 } catch (IOException e) { 266 throw new ServiceException(e); 267 } 268 return VersionResponseProto.newBuilder() 269 .setInfo(PBHelper.convert(info)).build(); 270 } 271 272 @Override 273 public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller, 274 ReportBadBlocksRequestProto request) throws ServiceException { 275 List<LocatedBlockProto> lbps = request.getBlocksList(); 276 LocatedBlock [] blocks = new LocatedBlock [lbps.size()]; 277 for(int i=0; i<lbps.size(); i++) { 278 blocks[i] = PBHelperClient.convert(lbps.get(i)); 279 } 280 try { 281 impl.reportBadBlocks(blocks); 282 } catch (IOException e) { 283 throw new ServiceException(e); 284 } 285 return VOID_REPORT_BAD_BLOCK_RESPONSE; 286 } 287 288 @Override 289 public CommitBlockSynchronizationResponseProto commitBlockSynchronization( 290 RpcController controller, CommitBlockSynchronizationRequestProto request) 291 throws ServiceException { 292 List<DatanodeIDProto> dnprotos = request.getNewTaragetsList(); 293 DatanodeID[] dns = new DatanodeID[dnprotos.size()]; 294 for (int i = 0; i < dnprotos.size(); i++) { 295 dns[i] = PBHelperClient.convert(dnprotos.get(i)); 296 } 297 final List<String> sidprotos = request.getNewTargetStoragesList(); 298 final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]); 299 try { 300 impl.commitBlockSynchronization(PBHelperClient.convert(request.getBlock()), 301 request.getNewGenStamp(), request.getNewLength(), 302 request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs); 303 } catch (IOException e) { 304 throw new ServiceException(e); 305 } 306 return VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO; 307 } 308}