001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.namenode;
019
020import java.io.File;
021import java.io.FileInputStream;
022import java.io.FileNotFoundException;
023import java.io.FileOutputStream;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.net.HttpURLConnection;
028import java.net.URISyntaxException;
029import java.net.URL;
030import java.security.DigestInputStream;
031import java.security.MessageDigest;
032import java.util.ArrayList;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036
037import javax.servlet.http.HttpServletRequest;
038import javax.servlet.http.HttpServletResponse;
039
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.apache.hadoop.classification.InterfaceAudience;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileUtil;
045import org.apache.hadoop.hdfs.DFSConfigKeys;
046import org.apache.hadoop.hdfs.DFSUtilClient;
047import org.apache.hadoop.hdfs.HdfsConfiguration;
048import org.apache.hadoop.hdfs.server.common.Storage;
049import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
050import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
051import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
052import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
053import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
054import org.apache.hadoop.hdfs.util.Canceler;
055import org.apache.hadoop.hdfs.util.DataTransferThrottler;
056import org.apache.hadoop.hdfs.web.URLConnectionFactory;
057import org.apache.hadoop.io.IOUtils;
058import org.apache.hadoop.io.MD5Hash;
059import org.apache.hadoop.security.UserGroupInformation;
060import org.apache.hadoop.security.authentication.client.AuthenticationException;
061import org.apache.hadoop.util.Time;
062import org.apache.http.client.utils.URIBuilder;
063
064import com.google.common.annotations.VisibleForTesting;
065import com.google.common.collect.Lists;
066import org.mortbay.jetty.EofException;
067
068/**
069 * This class provides fetching a specified file from the NameNode.
070 */
071@InterfaceAudience.Private
072public class TransferFsImage {
073  
074  public final static String CONTENT_LENGTH = "Content-Length";
075  public final static String FILE_LENGTH = "File-Length";
076  public final static String MD5_HEADER = "X-MD5-Digest";
077
078  private final static String CONTENT_TYPE = "Content-Type";
079  private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
080  private final static int IO_FILE_BUFFER_SIZE;
081
082  @VisibleForTesting
083  static int timeout = 0;
084  private static final URLConnectionFactory connectionFactory;
085  private static final boolean isSpnegoEnabled;
086
087  static {
088    Configuration conf = new Configuration();
089    connectionFactory = URLConnectionFactory
090        .newDefaultURLConnectionFactory(conf);
091    isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
092    IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf);
093  }
094
095  private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
096  
097  public static void downloadMostRecentImageToDirectory(URL infoServer,
098      File dir) throws IOException {
099    String fileId = ImageServlet.getParamStringForMostRecentImage();
100    getFileClient(infoServer, fileId, Lists.newArrayList(dir),
101        null, false);
102  }
103
104  public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
105      Storage dstStorage, boolean needDigest, boolean isBootstrapStandby)
106      throws IOException {
107    String fileid = ImageServlet.getParamStringForImage(null,
108        imageTxId, dstStorage, isBootstrapStandby);
109    String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
110    
111    List<File> dstFiles = dstStorage.getFiles(
112        NameNodeDirType.IMAGE, fileName);
113    if (dstFiles.isEmpty()) {
114      throw new IOException("No targets in destination storage!");
115    }
116    
117    MD5Hash hash = getFileClient(fsName, fileid, dstFiles, dstStorage, needDigest);
118    LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +
119        dstFiles.get(0).length() + " bytes.");
120    return hash;
121  }
122
123  static MD5Hash handleUploadImageRequest(HttpServletRequest request,
124      long imageTxId, Storage dstStorage, InputStream stream,
125      long advertisedSize, DataTransferThrottler throttler) throws IOException {
126
127    String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
128
129    List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
130    if (dstFiles.isEmpty()) {
131      throw new IOException("No targets in destination storage!");
132    }
133
134    MD5Hash advertisedDigest = parseMD5Header(request);
135    MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
136        advertisedSize, advertisedDigest, fileName, stream, throttler);
137    LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
138        + dstFiles.get(0).length() + " bytes.");
139    return hash;
140  }
141
142  static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
143      NNStorage dstStorage) throws IOException {
144    assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
145      "bad log: " + log;
146    String fileid = ImageServlet.getParamStringForLog(
147        log, dstStorage);
148    String finalFileName = NNStorage.getFinalizedEditsFileName(
149        log.getStartTxId(), log.getEndTxId());
150
151    List<File> finalFiles = dstStorage.getFiles(NameNodeDirType.EDITS,
152        finalFileName);
153    assert !finalFiles.isEmpty() : "No checkpoint targets.";
154    
155    for (File f : finalFiles) {
156      if (f.exists() && FileUtil.canRead(f)) {
157        LOG.info("Skipping download of remote edit log " +
158            log + " since it already is stored locally at " + f);
159        return;
160      } else if (LOG.isDebugEnabled()) {
161        LOG.debug("Dest file: " + f);
162      }
163    }
164
165    final long milliTime = Time.monotonicNow();
166    String tmpFileName = NNStorage.getTemporaryEditsFileName(
167        log.getStartTxId(), log.getEndTxId(), milliTime);
168    List<File> tmpFiles = dstStorage.getFiles(NameNodeDirType.EDITS,
169        tmpFileName);
170    getFileClient(fsName, fileid, tmpFiles, dstStorage, false);
171    LOG.info("Downloaded file " + tmpFiles.get(0).getName() + " size " +
172        finalFiles.get(0).length() + " bytes.");
173
174    CheckpointFaultInjector.getInstance().beforeEditsRename();
175
176    for (StorageDirectory sd : dstStorage.dirIterable(NameNodeDirType.EDITS)) {
177      File tmpFile = NNStorage.getTemporaryEditsFile(sd,
178          log.getStartTxId(), log.getEndTxId(), milliTime);
179      File finalizedFile = NNStorage.getFinalizedEditsFile(sd,
180          log.getStartTxId(), log.getEndTxId());
181      if (LOG.isDebugEnabled()) {
182        LOG.debug("Renaming " + tmpFile + " to " + finalizedFile);
183      }
184      boolean success = tmpFile.renameTo(finalizedFile);
185      if (!success) {
186        LOG.warn("Unable to rename edits file from " + tmpFile
187            + " to " + finalizedFile);
188      }
189    }
190  }
191 
192  /**
193   * Requests that the NameNode download an image from this node.
194   *
195   * @param fsName the http address for the remote NN
196   * @param conf Configuration
197   * @param storage the storage directory to transfer the image from
198   * @param nnf the NameNodeFile type of the image
199   * @param txid the transaction ID of the image to be uploaded
200   * @throws IOException if there is an I/O error
201   */
202  public static void uploadImageFromStorage(URL fsName, Configuration conf,
203      NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
204    uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
205  }
206
207  /**
208   * Requests that the NameNode download an image from this node.  Allows for
209   * optional external cancelation.
210   *
211   * @param fsName the http address for the remote NN
212   * @param conf Configuration
213   * @param storage the storage directory to transfer the image from
214   * @param nnf the NameNodeFile type of the image
215   * @param txid the transaction ID of the image to be uploaded
216   * @param canceler optional canceler to check for abort of upload
217   * @throws IOException if there is an I/O error or cancellation
218   */
219  public static void uploadImageFromStorage(URL fsName, Configuration conf,
220      NNStorage storage, NameNodeFile nnf, long txid, Canceler canceler)
221      throws IOException {
222    URL url = new URL(fsName, ImageServlet.PATH_SPEC);
223    long startTime = Time.monotonicNow();
224    try {
225      uploadImage(url, conf, storage, nnf, txid, canceler);
226    } catch (HttpPutFailedException e) {
227      if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
228        // this is OK - this means that a previous attempt to upload
229        // this checkpoint succeeded even though we thought it failed.
230        LOG.info("Image upload with txid " + txid + 
231            " conflicted with a previous image upload to the " +
232            "same NameNode. Continuing...", e);
233        return;
234      } else {
235        throw e;
236      }
237    }
238    double xferSec = Math.max(
239        ((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
240    LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
241        + " in " + xferSec + " seconds");
242  }
243
244  /*
245   * Uploads the imagefile using HTTP PUT method
246   */
247  private static void uploadImage(URL url, Configuration conf,
248      NNStorage storage, NameNodeFile nnf, long txId, Canceler canceler)
249      throws IOException {
250
251    File imageFile = storage.findImageFile(nnf, txId);
252    if (imageFile == null) {
253      throw new IOException("Could not find image with txid " + txId);
254    }
255
256    HttpURLConnection connection = null;
257    try {
258      URIBuilder uriBuilder = new URIBuilder(url.toURI());
259
260      // write all params for image upload request as query itself.
261      // Request body contains the image to be uploaded.
262      Map<String, String> params = ImageServlet.getParamsForPutImage(storage,
263          txId, imageFile.length(), nnf);
264      for (Entry<String, String> entry : params.entrySet()) {
265        uriBuilder.addParameter(entry.getKey(), entry.getValue());
266      }
267
268      URL urlWithParams = uriBuilder.build().toURL();
269      connection = (HttpURLConnection) connectionFactory.openConnection(
270          urlWithParams, UserGroupInformation.isSecurityEnabled());
271      // Set the request to PUT
272      connection.setRequestMethod("PUT");
273      connection.setDoOutput(true);
274
275      
276      int chunkSize = conf.getInt(
277          DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY,
278          DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT);
279      if (imageFile.length() > chunkSize) {
280        // using chunked streaming mode to support upload of 2GB+ files and to
281        // avoid internal buffering.
282        // this mode should be used only if more than chunkSize data is present
283        // to upload. otherwise upload may not happen sometimes.
284        connection.setChunkedStreamingMode(chunkSize);
285      }
286
287      setTimeout(connection);
288
289      // set headers for verification
290      ImageServlet.setVerificationHeadersForPut(connection, imageFile);
291
292      // Write the file to output stream.
293      writeFileToPutRequest(conf, connection, imageFile, canceler);
294
295      int responseCode = connection.getResponseCode();
296      if (responseCode != HttpURLConnection.HTTP_OK) {
297        throw new HttpPutFailedException(String.format(
298            "Image uploading failed, status: %d, url: %s, message: %s",
299            responseCode, urlWithParams, connection.getResponseMessage()),
300            responseCode);
301      }
302    } catch (AuthenticationException e) {
303      throw new IOException(e);
304    } catch (URISyntaxException e) {
305      throw new IOException(e);
306    } finally {
307      if (connection != null) {
308        connection.disconnect();
309      }
310    }
311  }
312
313  private static void writeFileToPutRequest(Configuration conf,
314      HttpURLConnection connection, File imageFile, Canceler canceler)
315      throws FileNotFoundException, IOException {
316    connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
317    connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
318    OutputStream output = connection.getOutputStream();
319    FileInputStream input = new FileInputStream(imageFile);
320    try {
321      copyFileToStream(output, imageFile, input,
322          ImageServlet.getThrottler(conf), canceler);
323    } finally {
324      IOUtils.closeStream(input);
325      IOUtils.closeStream(output);
326    }
327  }
328
329  /**
330   * A server-side method to respond to a getfile http request
331   * Copies the contents of the local file into the output stream.
332   */
333  public static void copyFileToStream(OutputStream out, File localfile,
334      FileInputStream infile, DataTransferThrottler throttler)
335    throws IOException {
336    copyFileToStream(out, localfile, infile, throttler, null);
337  }
338
339  private static void copyFileToStream(OutputStream out, File localfile,
340      FileInputStream infile, DataTransferThrottler throttler,
341      Canceler canceler) throws IOException {
342    byte buf[] = new byte[IO_FILE_BUFFER_SIZE];
343    try {
344      CheckpointFaultInjector.getInstance()
345          .aboutToSendFile(localfile);
346
347      if (CheckpointFaultInjector.getInstance().
348            shouldSendShortFile(localfile)) {
349          // Test sending image shorter than localfile
350          long len = localfile.length();
351          buf = new byte[(int)Math.min(len/2, IO_FILE_BUFFER_SIZE)];
352          // This will read at most half of the image
353          // and the rest of the image will be sent over the wire
354          infile.read(buf);
355      }
356      int num = 1;
357      while (num > 0) {
358        if (canceler != null && canceler.isCancelled()) {
359          throw new SaveNamespaceCancelledException(
360            canceler.getCancellationReason());
361        }
362        num = infile.read(buf);
363        if (num <= 0) {
364          break;
365        }
366        if (CheckpointFaultInjector.getInstance()
367              .shouldCorruptAByte(localfile)) {
368          // Simulate a corrupted byte on the wire
369          LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!");
370          buf[0]++;
371        }
372        
373        out.write(buf, 0, num);
374        if (throttler != null) {
375          throttler.throttle(num, canceler);
376        }
377      }
378    } catch (EofException e) {
379      LOG.info("Connection closed by client");
380      out = null; // so we don't close in the finally
381    } finally {
382      if (out != null) {
383        out.close();
384      }
385    }
386  }
387
388  /**
389   * Client-side Method to fetch file from a server
390   * Copies the response from the URL to a list of local files.
391   * @param dstStorage if an error occurs writing to one of the files,
392   *                   this storage object will be notified. 
393   * @Return a digest of the received file if getChecksum is true
394   */
395  static MD5Hash getFileClient(URL infoServer,
396      String queryString, List<File> localPaths,
397      Storage dstStorage, boolean getChecksum) throws IOException {
398    URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString);
399    LOG.info("Opening connection to " + url);
400    return doGetUrl(url, localPaths, dstStorage, getChecksum);
401  }
402  
403  public static MD5Hash doGetUrl(URL url, List<File> localPaths,
404      Storage dstStorage, boolean getChecksum) throws IOException {
405    HttpURLConnection connection;
406    try {
407      connection = (HttpURLConnection)
408        connectionFactory.openConnection(url, isSpnegoEnabled);
409    } catch (AuthenticationException e) {
410      throw new IOException(e);
411    }
412
413    setTimeout(connection);
414
415    if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
416      throw new HttpGetFailedException(
417          "Image transfer servlet at " + url +
418          " failed with status code " + connection.getResponseCode() +
419          "\nResponse message:\n" + connection.getResponseMessage(),
420          connection);
421    }
422    
423    long advertisedSize;
424    String contentLength = connection.getHeaderField(CONTENT_LENGTH);
425    if (contentLength != null) {
426      advertisedSize = Long.parseLong(contentLength);
427    } else {
428      throw new IOException(CONTENT_LENGTH + " header is not provided " +
429                            "by the namenode when trying to fetch " + url);
430    }
431    MD5Hash advertisedDigest = parseMD5Header(connection);
432    String fsImageName = connection
433        .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
434    InputStream stream = connection.getInputStream();
435
436    return receiveFile(url.toExternalForm(), localPaths, dstStorage,
437        getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
438        null);
439  }
440
441  private static void setTimeout(HttpURLConnection connection) {
442    if (timeout <= 0) {
443      Configuration conf = new HdfsConfiguration();
444      timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
445          DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
446      LOG.info("Image Transfer timeout configured to " + timeout
447          + " milliseconds");
448    }
449
450    if (timeout > 0) {
451      connection.setConnectTimeout(timeout);
452      connection.setReadTimeout(timeout);
453    }
454  }
455
456  private static MD5Hash receiveFile(String url, List<File> localPaths,
457      Storage dstStorage, boolean getChecksum, long advertisedSize,
458      MD5Hash advertisedDigest, String fsImageName, InputStream stream,
459      DataTransferThrottler throttler) throws IOException {
460    long startTime = Time.monotonicNow();
461    if (localPaths != null) {
462      // If the local paths refer to directories, use the server-provided header
463      // as the filename within that directory
464      List<File> newLocalPaths = new ArrayList<File>();
465      for (File localPath : localPaths) {
466        if (localPath.isDirectory()) {
467          if (fsImageName == null) {
468            throw new IOException("No filename header provided by server");
469          }
470          newLocalPaths.add(new File(localPath, fsImageName));
471        } else {
472          newLocalPaths.add(localPath);
473        }
474      }
475      localPaths = newLocalPaths;
476    }
477    
478
479    long received = 0;
480    MessageDigest digester = null;
481    if (getChecksum) {
482      digester = MD5Hash.getDigester();
483      stream = new DigestInputStream(stream, digester);
484    }
485    boolean finishedReceiving = false;
486
487    List<FileOutputStream> outputStreams = Lists.newArrayList();
488
489    try {
490      if (localPaths != null) {
491        for (File f : localPaths) {
492          try {
493            if (f.exists()) {
494              LOG.warn("Overwriting existing file " + f
495                  + " with file downloaded from " + url);
496            }
497            outputStreams.add(new FileOutputStream(f));
498          } catch (IOException ioe) {
499            LOG.warn("Unable to download file " + f, ioe);
500            // This will be null if we're downloading the fsimage to a file
501            // outside of an NNStorage directory.
502            if (dstStorage != null &&
503                (dstStorage instanceof StorageErrorReporter)) {
504              ((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
505            }
506          }
507        }
508        
509        if (outputStreams.isEmpty()) {
510          throw new IOException(
511              "Unable to download to any storage directory");
512        }
513      }
514      
515      int num = 1;
516      byte[] buf = new byte[IO_FILE_BUFFER_SIZE];
517      while (num > 0) {
518        num = stream.read(buf);
519        if (num > 0) {
520          received += num;
521          for (FileOutputStream fos : outputStreams) {
522            fos.write(buf, 0, num);
523          }
524          if (throttler != null) {
525            throttler.throttle(num);
526          }
527        }
528      }
529      finishedReceiving = true;
530    } finally {
531      stream.close();
532      for (FileOutputStream fos : outputStreams) {
533        fos.getChannel().force(true);
534        fos.close();
535      }
536
537      // Something went wrong and did not finish reading.
538      // Remove the temporary files.
539      if (!finishedReceiving) {
540        deleteTmpFiles(localPaths);
541      }
542
543      if (finishedReceiving && received != advertisedSize) {
544        // only throw this exception if we think we read all of it on our end
545        // -- otherwise a client-side IOException would be masked by this
546        // exception that makes it look like a server-side problem!
547        deleteTmpFiles(localPaths);
548        throw new IOException("File " + url + " received length " + received +
549                              " is not of the advertised size " +
550                              advertisedSize);
551      }
552    }
553    double xferSec = Math.max(
554        ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001);
555    long xferKb = received / 1024;
556    LOG.info(String.format("Transfer took %.2fs at %.2f KB/s",
557        xferSec, xferKb / xferSec));
558
559    if (digester != null) {
560      MD5Hash computedDigest = new MD5Hash(digester.digest());
561      
562      if (advertisedDigest != null &&
563          !computedDigest.equals(advertisedDigest)) {
564        deleteTmpFiles(localPaths);
565        throw new IOException("File " + url + " computed digest " +
566            computedDigest + " does not match advertised digest " + 
567            advertisedDigest);
568      }
569      return computedDigest;
570    } else {
571      return null;
572    }    
573  }
574
575  private static void deleteTmpFiles(List<File> files) {
576    if (files == null) {
577      return;
578    }
579
580    LOG.info("Deleting temporary files: " + files);
581    for (File file : files) {
582      if (!file.delete()) {
583        LOG.warn("Deleting " + file + " has failed");
584      }
585    }
586  }
587
588  private static MD5Hash parseMD5Header(HttpURLConnection connection) {
589    String header = connection.getHeaderField(MD5_HEADER);
590    return (header != null) ? new MD5Hash(header) : null;
591  }
592
593  private static MD5Hash parseMD5Header(HttpServletRequest request) {
594    String header = request.getHeader(MD5_HEADER);
595    return (header != null) ? new MD5Hash(header) : null;
596  }
597
598  public static class HttpGetFailedException extends IOException {
599    private static final long serialVersionUID = 1L;
600    private final int responseCode;
601
602    HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException {
603      super(msg);
604      this.responseCode = connection.getResponseCode();
605    }
606    
607    public int getResponseCode() {
608      return responseCode;
609    }
610  }
611
612  public static class HttpPutFailedException extends IOException {
613    private static final long serialVersionUID = 1L;
614    private final int responseCode;
615
616    HttpPutFailedException(String msg, int responseCode) throws IOException {
617      super(msg);
618      this.responseCode = responseCode;
619    }
620
621    public int getResponseCode() {
622      return responseCode;
623    }
624  }
625
626}