001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hdfs.protocolPB;
020
021import java.io.IOException;
022import java.util.List;
023
024import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
025import org.apache.hadoop.hdfs.protocol.DatanodeID;
026import org.apache.hadoop.hdfs.protocol.LocatedBlock;
027import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
028import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto;
029import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
030import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
031import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
032import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto;
033import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto;
034import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
035import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
036import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
037import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
038import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
039import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
040import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
041import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
042import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
043import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
044import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
045import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
046import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
047import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
048import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
049import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RollingUpgradeStatusProto;
050import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
051import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionResponseProto;
052import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
053import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
054import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
055import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
056import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
057import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
058import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
059import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
060import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
061import org.apache.hadoop.hdfs.server.protocol.StorageReport;
062import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
063
064import com.google.common.base.Preconditions;
065import com.google.protobuf.RpcController;
066import com.google.protobuf.ServiceException;
067
068public class DatanodeProtocolServerSideTranslatorPB implements
069    DatanodeProtocolPB {
070
071  private final DatanodeProtocol impl;
072  private final int maxDataLength;
073
074  private static final ErrorReportResponseProto
075      VOID_ERROR_REPORT_RESPONSE_PROTO = 
076          ErrorReportResponseProto.newBuilder().build();
077  private static final BlockReceivedAndDeletedResponseProto 
078      VOID_BLOCK_RECEIVED_AND_DELETE_RESPONSE = 
079          BlockReceivedAndDeletedResponseProto.newBuilder().build();
080  private static final ReportBadBlocksResponseProto
081      VOID_REPORT_BAD_BLOCK_RESPONSE = 
082          ReportBadBlocksResponseProto.newBuilder().build();
083  private static final CommitBlockSynchronizationResponseProto 
084      VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO =
085          CommitBlockSynchronizationResponseProto.newBuilder().build();
086
087  public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl,
088      int maxDataLength) {
089    this.impl = impl;
090    this.maxDataLength = maxDataLength;
091  }
092
093  @Override
094  public RegisterDatanodeResponseProto registerDatanode(
095      RpcController controller, RegisterDatanodeRequestProto request)
096      throws ServiceException {
097    DatanodeRegistration registration = PBHelper.convert(request
098        .getRegistration());
099    DatanodeRegistration registrationResp;
100    try {
101      registrationResp = impl.registerDatanode(registration);
102    } catch (IOException e) {
103      throw new ServiceException(e);
104    }
105    return RegisterDatanodeResponseProto.newBuilder()
106        .setRegistration(PBHelper.convert(registrationResp)).build();
107  }
108
109  @Override
110  public HeartbeatResponseProto sendHeartbeat(RpcController controller,
111      HeartbeatRequestProto request) throws ServiceException {
112    HeartbeatResponse response;
113    try {
114      final StorageReport[] report = PBHelperClient.convertStorageReports(
115          request.getReportsList());
116      VolumeFailureSummary volumeFailureSummary =
117          request.hasVolumeFailureSummary() ? PBHelper.convertVolumeFailureSummary(
118              request.getVolumeFailureSummary()) : null;
119      response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
120          report, request.getCacheCapacity(), request.getCacheUsed(),
121          request.getXmitsInProgress(),
122          request.getXceiverCount(), request.getFailedVolumes(),
123          volumeFailureSummary, request.getRequestFullBlockReportLease());
124    } catch (IOException e) {
125      throw new ServiceException(e);
126    }
127    HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
128        .newBuilder();
129    DatanodeCommand[] cmds = response.getCommands();
130    if (cmds != null) {
131      for (int i = 0; i < cmds.length; i++) {
132        if (cmds[i] != null) {
133          builder.addCmds(PBHelper.convert(cmds[i]));
134        }
135      }
136    }
137    builder.setHaStatus(PBHelper.convert(response.getNameNodeHaState()));
138    RollingUpgradeStatus rollingUpdateStatus = response
139        .getRollingUpdateStatus();
140    if (rollingUpdateStatus != null) {
141      // V2 is always set for newer datanodes.
142      // To be compatible with older datanodes, V1 is set to null
143      //  if the RU was finalized.
144      RollingUpgradeStatusProto rus = PBHelperClient.
145          convertRollingUpgradeStatus(rollingUpdateStatus);
146      builder.setRollingUpgradeStatusV2(rus);
147      if (!rollingUpdateStatus.isFinalized()) {
148        builder.setRollingUpgradeStatus(rus);
149      }
150    }
151
152    builder.setFullBlockReportLeaseId(response.getFullBlockReportLeaseId());
153    return builder.build();
154  }
155
156  @Override
157  public BlockReportResponseProto blockReport(RpcController controller,
158      BlockReportRequestProto request) throws ServiceException {
159    DatanodeCommand cmd = null;
160    StorageBlockReport[] report = 
161        new StorageBlockReport[request.getReportsCount()];
162    
163    int index = 0;
164    for (StorageBlockReportProto s : request.getReportsList()) {
165      final BlockListAsLongs blocks;
166      if (s.hasNumberOfBlocks()) { // new style buffer based reports
167        int num = (int)s.getNumberOfBlocks();
168        Preconditions.checkState(s.getBlocksCount() == 0,
169            "cannot send both blocks list and buffers");
170        blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList(),
171            maxDataLength);
172      } else {
173        blocks = BlockListAsLongs.decodeLongs(s.getBlocksList(), maxDataLength);
174      }
175      report[index++] = new StorageBlockReport(PBHelperClient.convert(s.getStorage()),
176          blocks);
177    }
178    try {
179      cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
180          request.getBlockPoolId(), report,
181          request.hasContext() ?
182              PBHelper.convert(request.getContext()) : null);
183    } catch (IOException e) {
184      throw new ServiceException(e);
185    }
186    BlockReportResponseProto.Builder builder = 
187        BlockReportResponseProto.newBuilder();
188    if (cmd != null) {
189      builder.setCmd(PBHelper.convert(cmd));
190    }
191    return builder.build();
192  }
193
194  @Override
195  public CacheReportResponseProto cacheReport(RpcController controller,
196      CacheReportRequestProto request) throws ServiceException {
197    DatanodeCommand cmd = null;
198    try {
199      cmd = impl.cacheReport(
200          PBHelper.convert(request.getRegistration()),
201          request.getBlockPoolId(),
202          request.getBlocksList());
203    } catch (IOException e) {
204      throw new ServiceException(e);
205    }
206    CacheReportResponseProto.Builder builder =
207        CacheReportResponseProto.newBuilder();
208    if (cmd != null) {
209      builder.setCmd(PBHelper.convert(cmd));
210    }
211    return builder.build();
212  }
213
214
215  @Override
216  public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
217      RpcController controller, BlockReceivedAndDeletedRequestProto request)
218      throws ServiceException {
219    List<StorageReceivedDeletedBlocksProto> sBlocks = request.getBlocksList();
220    StorageReceivedDeletedBlocks[] info = 
221        new StorageReceivedDeletedBlocks[sBlocks.size()];
222    for (int i = 0; i < sBlocks.size(); i++) {
223      StorageReceivedDeletedBlocksProto sBlock = sBlocks.get(i);
224      List<ReceivedDeletedBlockInfoProto> list = sBlock.getBlocksList();
225      ReceivedDeletedBlockInfo[] rdBlocks = 
226          new ReceivedDeletedBlockInfo[list.size()];
227      for (int j = 0; j < list.size(); j++) {
228        rdBlocks[j] = PBHelper.convert(list.get(j));
229      }
230      if (sBlock.hasStorage()) {
231        info[i] = new StorageReceivedDeletedBlocks(
232            PBHelperClient.convert(sBlock.getStorage()), rdBlocks);
233      } else {
234        info[i] = new StorageReceivedDeletedBlocks(
235            new DatanodeStorage(sBlock.getStorageUuid()), rdBlocks);
236      }
237    }
238    try {
239      impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),
240          request.getBlockPoolId(), info);
241    } catch (IOException e) {
242      throw new ServiceException(e);
243    }
244    return VOID_BLOCK_RECEIVED_AND_DELETE_RESPONSE;
245  }
246
247  @Override
248  public ErrorReportResponseProto errorReport(RpcController controller,
249      ErrorReportRequestProto request) throws ServiceException {
250    try {
251      impl.errorReport(PBHelper.convert(request.getRegistartion()),
252          request.getErrorCode(), request.getMsg());
253    } catch (IOException e) {
254      throw new ServiceException(e);
255    }
256    return VOID_ERROR_REPORT_RESPONSE_PROTO;
257  }
258
259  @Override
260  public VersionResponseProto versionRequest(RpcController controller,
261      VersionRequestProto request) throws ServiceException {
262    NamespaceInfo info;
263    try {
264      info = impl.versionRequest();
265    } catch (IOException e) {
266      throw new ServiceException(e);
267    }
268    return VersionResponseProto.newBuilder()
269        .setInfo(PBHelper.convert(info)).build();
270  }
271
272  @Override
273  public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
274      ReportBadBlocksRequestProto request) throws ServiceException {
275    List<LocatedBlockProto> lbps = request.getBlocksList();
276    LocatedBlock [] blocks = new LocatedBlock [lbps.size()];
277    for(int i=0; i<lbps.size(); i++) {
278      blocks[i] = PBHelperClient.convert(lbps.get(i));
279    }
280    try {
281      impl.reportBadBlocks(blocks);
282    } catch (IOException e) {
283      throw new ServiceException(e);
284    }
285    return VOID_REPORT_BAD_BLOCK_RESPONSE;
286  }
287
288  @Override
289  public CommitBlockSynchronizationResponseProto commitBlockSynchronization(
290      RpcController controller, CommitBlockSynchronizationRequestProto request)
291      throws ServiceException {
292    List<DatanodeIDProto> dnprotos = request.getNewTaragetsList();
293    DatanodeID[] dns = new DatanodeID[dnprotos.size()];
294    for (int i = 0; i < dnprotos.size(); i++) {
295      dns[i] = PBHelperClient.convert(dnprotos.get(i));
296    }
297    final List<String> sidprotos = request.getNewTargetStoragesList();
298    final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]);
299    try {
300      impl.commitBlockSynchronization(PBHelperClient.convert(request.getBlock()),
301          request.getNewGenStamp(), request.getNewLength(),
302          request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs);
303    } catch (IOException e) {
304      throw new ServiceException(e);
305    }
306    return VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO;
307  }
308}