001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.server.namenode; 019 020import java.io.File; 021import java.io.FileInputStream; 022import java.io.FileNotFoundException; 023import java.io.FileOutputStream; 024import java.io.IOException; 025import java.io.InputStream; 026import java.io.OutputStream; 027import java.net.HttpURLConnection; 028import java.net.URISyntaxException; 029import java.net.URL; 030import java.security.DigestInputStream; 031import java.security.MessageDigest; 032import java.util.ArrayList; 033import java.util.List; 034import java.util.Map; 035import java.util.Map.Entry; 036 037import javax.servlet.http.HttpServletRequest; 038import javax.servlet.http.HttpServletResponse; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.apache.hadoop.classification.InterfaceAudience; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileUtil; 045import org.apache.hadoop.hdfs.DFSConfigKeys; 046import org.apache.hadoop.hdfs.DFSUtilClient; 047import org.apache.hadoop.hdfs.HdfsConfiguration; 048import org.apache.hadoop.hdfs.server.common.Storage; 049import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; 050import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; 051import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; 052import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; 053import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; 054import org.apache.hadoop.hdfs.util.Canceler; 055import org.apache.hadoop.hdfs.util.DataTransferThrottler; 056import org.apache.hadoop.hdfs.web.URLConnectionFactory; 057import org.apache.hadoop.io.IOUtils; 058import org.apache.hadoop.io.MD5Hash; 059import org.apache.hadoop.security.UserGroupInformation; 060import org.apache.hadoop.security.authentication.client.AuthenticationException; 061import org.apache.hadoop.util.Time; 062import org.apache.http.client.utils.URIBuilder; 063 064import com.google.common.annotations.VisibleForTesting; 065import com.google.common.collect.Lists; 066import org.mortbay.jetty.EofException; 067 068/** 069 * This class provides fetching a specified file from the NameNode. 070 */ 071@InterfaceAudience.Private 072public class TransferFsImage { 073 074 public final static String CONTENT_LENGTH = "Content-Length"; 075 public final static String FILE_LENGTH = "File-Length"; 076 public final static String MD5_HEADER = "X-MD5-Digest"; 077 078 private final static String CONTENT_TYPE = "Content-Type"; 079 private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding"; 080 private final static int IO_FILE_BUFFER_SIZE; 081 082 @VisibleForTesting 083 static int timeout = 0; 084 private static final URLConnectionFactory connectionFactory; 085 private static final boolean isSpnegoEnabled; 086 087 static { 088 Configuration conf = new Configuration(); 089 connectionFactory = URLConnectionFactory 090 .newDefaultURLConnectionFactory(conf); 091 isSpnegoEnabled = UserGroupInformation.isSecurityEnabled(); 092 IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize(conf); 093 } 094 095 private static final Log LOG = LogFactory.getLog(TransferFsImage.class); 096 097 public static void downloadMostRecentImageToDirectory(URL infoServer, 098 File dir) throws IOException { 099 String fileId = ImageServlet.getParamStringForMostRecentImage(); 100 getFileClient(infoServer, fileId, Lists.newArrayList(dir), 101 null, false); 102 } 103 104 public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId, 105 Storage dstStorage, boolean needDigest, boolean isBootstrapStandby) 106 throws IOException { 107 String fileid = ImageServlet.getParamStringForImage(null, 108 imageTxId, dstStorage, isBootstrapStandby); 109 String fileName = NNStorage.getCheckpointImageFileName(imageTxId); 110 111 List<File> dstFiles = dstStorage.getFiles( 112 NameNodeDirType.IMAGE, fileName); 113 if (dstFiles.isEmpty()) { 114 throw new IOException("No targets in destination storage!"); 115 } 116 117 MD5Hash hash = getFileClient(fsName, fileid, dstFiles, dstStorage, needDigest); 118 LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " + 119 dstFiles.get(0).length() + " bytes."); 120 return hash; 121 } 122 123 static MD5Hash handleUploadImageRequest(HttpServletRequest request, 124 long imageTxId, Storage dstStorage, InputStream stream, 125 long advertisedSize, DataTransferThrottler throttler) throws IOException { 126 127 String fileName = NNStorage.getCheckpointImageFileName(imageTxId); 128 129 List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName); 130 if (dstFiles.isEmpty()) { 131 throw new IOException("No targets in destination storage!"); 132 } 133 134 MD5Hash advertisedDigest = parseMD5Header(request); 135 MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true, 136 advertisedSize, advertisedDigest, fileName, stream, throttler); 137 LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " 138 + dstFiles.get(0).length() + " bytes."); 139 return hash; 140 } 141 142 static void downloadEditsToStorage(URL fsName, RemoteEditLog log, 143 NNStorage dstStorage) throws IOException { 144 assert log.getStartTxId() > 0 && log.getEndTxId() > 0 : 145 "bad log: " + log; 146 String fileid = ImageServlet.getParamStringForLog( 147 log, dstStorage); 148 String finalFileName = NNStorage.getFinalizedEditsFileName( 149 log.getStartTxId(), log.getEndTxId()); 150 151 List<File> finalFiles = dstStorage.getFiles(NameNodeDirType.EDITS, 152 finalFileName); 153 assert !finalFiles.isEmpty() : "No checkpoint targets."; 154 155 for (File f : finalFiles) { 156 if (f.exists() && FileUtil.canRead(f)) { 157 LOG.info("Skipping download of remote edit log " + 158 log + " since it already is stored locally at " + f); 159 return; 160 } else if (LOG.isDebugEnabled()) { 161 LOG.debug("Dest file: " + f); 162 } 163 } 164 165 final long milliTime = Time.monotonicNow(); 166 String tmpFileName = NNStorage.getTemporaryEditsFileName( 167 log.getStartTxId(), log.getEndTxId(), milliTime); 168 List<File> tmpFiles = dstStorage.getFiles(NameNodeDirType.EDITS, 169 tmpFileName); 170 getFileClient(fsName, fileid, tmpFiles, dstStorage, false); 171 LOG.info("Downloaded file " + tmpFiles.get(0).getName() + " size " + 172 finalFiles.get(0).length() + " bytes."); 173 174 CheckpointFaultInjector.getInstance().beforeEditsRename(); 175 176 for (StorageDirectory sd : dstStorage.dirIterable(NameNodeDirType.EDITS)) { 177 File tmpFile = NNStorage.getTemporaryEditsFile(sd, 178 log.getStartTxId(), log.getEndTxId(), milliTime); 179 File finalizedFile = NNStorage.getFinalizedEditsFile(sd, 180 log.getStartTxId(), log.getEndTxId()); 181 if (LOG.isDebugEnabled()) { 182 LOG.debug("Renaming " + tmpFile + " to " + finalizedFile); 183 } 184 boolean success = tmpFile.renameTo(finalizedFile); 185 if (!success) { 186 LOG.warn("Unable to rename edits file from " + tmpFile 187 + " to " + finalizedFile); 188 } 189 } 190 } 191 192 /** 193 * Requests that the NameNode download an image from this node. 194 * 195 * @param fsName the http address for the remote NN 196 * @param conf Configuration 197 * @param storage the storage directory to transfer the image from 198 * @param nnf the NameNodeFile type of the image 199 * @param txid the transaction ID of the image to be uploaded 200 * @throws IOException if there is an I/O error 201 */ 202 public static void uploadImageFromStorage(URL fsName, Configuration conf, 203 NNStorage storage, NameNodeFile nnf, long txid) throws IOException { 204 uploadImageFromStorage(fsName, conf, storage, nnf, txid, null); 205 } 206 207 /** 208 * Requests that the NameNode download an image from this node. Allows for 209 * optional external cancelation. 210 * 211 * @param fsName the http address for the remote NN 212 * @param conf Configuration 213 * @param storage the storage directory to transfer the image from 214 * @param nnf the NameNodeFile type of the image 215 * @param txid the transaction ID of the image to be uploaded 216 * @param canceler optional canceler to check for abort of upload 217 * @throws IOException if there is an I/O error or cancellation 218 */ 219 public static void uploadImageFromStorage(URL fsName, Configuration conf, 220 NNStorage storage, NameNodeFile nnf, long txid, Canceler canceler) 221 throws IOException { 222 URL url = new URL(fsName, ImageServlet.PATH_SPEC); 223 long startTime = Time.monotonicNow(); 224 try { 225 uploadImage(url, conf, storage, nnf, txid, canceler); 226 } catch (HttpPutFailedException e) { 227 if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) { 228 // this is OK - this means that a previous attempt to upload 229 // this checkpoint succeeded even though we thought it failed. 230 LOG.info("Image upload with txid " + txid + 231 " conflicted with a previous image upload to the " + 232 "same NameNode. Continuing...", e); 233 return; 234 } else { 235 throw e; 236 } 237 } 238 double xferSec = Math.max( 239 ((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001); 240 LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName 241 + " in " + xferSec + " seconds"); 242 } 243 244 /* 245 * Uploads the imagefile using HTTP PUT method 246 */ 247 private static void uploadImage(URL url, Configuration conf, 248 NNStorage storage, NameNodeFile nnf, long txId, Canceler canceler) 249 throws IOException { 250 251 File imageFile = storage.findImageFile(nnf, txId); 252 if (imageFile == null) { 253 throw new IOException("Could not find image with txid " + txId); 254 } 255 256 HttpURLConnection connection = null; 257 try { 258 URIBuilder uriBuilder = new URIBuilder(url.toURI()); 259 260 // write all params for image upload request as query itself. 261 // Request body contains the image to be uploaded. 262 Map<String, String> params = ImageServlet.getParamsForPutImage(storage, 263 txId, imageFile.length(), nnf); 264 for (Entry<String, String> entry : params.entrySet()) { 265 uriBuilder.addParameter(entry.getKey(), entry.getValue()); 266 } 267 268 URL urlWithParams = uriBuilder.build().toURL(); 269 connection = (HttpURLConnection) connectionFactory.openConnection( 270 urlWithParams, UserGroupInformation.isSecurityEnabled()); 271 // Set the request to PUT 272 connection.setRequestMethod("PUT"); 273 connection.setDoOutput(true); 274 275 276 int chunkSize = conf.getInt( 277 DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY, 278 DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT); 279 if (imageFile.length() > chunkSize) { 280 // using chunked streaming mode to support upload of 2GB+ files and to 281 // avoid internal buffering. 282 // this mode should be used only if more than chunkSize data is present 283 // to upload. otherwise upload may not happen sometimes. 284 connection.setChunkedStreamingMode(chunkSize); 285 } 286 287 setTimeout(connection); 288 289 // set headers for verification 290 ImageServlet.setVerificationHeadersForPut(connection, imageFile); 291 292 // Write the file to output stream. 293 writeFileToPutRequest(conf, connection, imageFile, canceler); 294 295 int responseCode = connection.getResponseCode(); 296 if (responseCode != HttpURLConnection.HTTP_OK) { 297 throw new HttpPutFailedException(String.format( 298 "Image uploading failed, status: %d, url: %s, message: %s", 299 responseCode, urlWithParams, connection.getResponseMessage()), 300 responseCode); 301 } 302 } catch (AuthenticationException e) { 303 throw new IOException(e); 304 } catch (URISyntaxException e) { 305 throw new IOException(e); 306 } finally { 307 if (connection != null) { 308 connection.disconnect(); 309 } 310 } 311 } 312 313 private static void writeFileToPutRequest(Configuration conf, 314 HttpURLConnection connection, File imageFile, Canceler canceler) 315 throws FileNotFoundException, IOException { 316 connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream"); 317 connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary"); 318 OutputStream output = connection.getOutputStream(); 319 FileInputStream input = new FileInputStream(imageFile); 320 try { 321 copyFileToStream(output, imageFile, input, 322 ImageServlet.getThrottler(conf), canceler); 323 } finally { 324 IOUtils.closeStream(input); 325 IOUtils.closeStream(output); 326 } 327 } 328 329 /** 330 * A server-side method to respond to a getfile http request 331 * Copies the contents of the local file into the output stream. 332 */ 333 public static void copyFileToStream(OutputStream out, File localfile, 334 FileInputStream infile, DataTransferThrottler throttler) 335 throws IOException { 336 copyFileToStream(out, localfile, infile, throttler, null); 337 } 338 339 private static void copyFileToStream(OutputStream out, File localfile, 340 FileInputStream infile, DataTransferThrottler throttler, 341 Canceler canceler) throws IOException { 342 byte buf[] = new byte[IO_FILE_BUFFER_SIZE]; 343 try { 344 CheckpointFaultInjector.getInstance() 345 .aboutToSendFile(localfile); 346 347 if (CheckpointFaultInjector.getInstance(). 348 shouldSendShortFile(localfile)) { 349 // Test sending image shorter than localfile 350 long len = localfile.length(); 351 buf = new byte[(int)Math.min(len/2, IO_FILE_BUFFER_SIZE)]; 352 // This will read at most half of the image 353 // and the rest of the image will be sent over the wire 354 infile.read(buf); 355 } 356 int num = 1; 357 while (num > 0) { 358 if (canceler != null && canceler.isCancelled()) { 359 throw new SaveNamespaceCancelledException( 360 canceler.getCancellationReason()); 361 } 362 num = infile.read(buf); 363 if (num <= 0) { 364 break; 365 } 366 if (CheckpointFaultInjector.getInstance() 367 .shouldCorruptAByte(localfile)) { 368 // Simulate a corrupted byte on the wire 369 LOG.warn("SIMULATING A CORRUPT BYTE IN IMAGE TRANSFER!"); 370 buf[0]++; 371 } 372 373 out.write(buf, 0, num); 374 if (throttler != null) { 375 throttler.throttle(num, canceler); 376 } 377 } 378 } catch (EofException e) { 379 LOG.info("Connection closed by client"); 380 out = null; // so we don't close in the finally 381 } finally { 382 if (out != null) { 383 out.close(); 384 } 385 } 386 } 387 388 /** 389 * Client-side Method to fetch file from a server 390 * Copies the response from the URL to a list of local files. 391 * @param dstStorage if an error occurs writing to one of the files, 392 * this storage object will be notified. 393 * @Return a digest of the received file if getChecksum is true 394 */ 395 static MD5Hash getFileClient(URL infoServer, 396 String queryString, List<File> localPaths, 397 Storage dstStorage, boolean getChecksum) throws IOException { 398 URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString); 399 LOG.info("Opening connection to " + url); 400 return doGetUrl(url, localPaths, dstStorage, getChecksum); 401 } 402 403 public static MD5Hash doGetUrl(URL url, List<File> localPaths, 404 Storage dstStorage, boolean getChecksum) throws IOException { 405 HttpURLConnection connection; 406 try { 407 connection = (HttpURLConnection) 408 connectionFactory.openConnection(url, isSpnegoEnabled); 409 } catch (AuthenticationException e) { 410 throw new IOException(e); 411 } 412 413 setTimeout(connection); 414 415 if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { 416 throw new HttpGetFailedException( 417 "Image transfer servlet at " + url + 418 " failed with status code " + connection.getResponseCode() + 419 "\nResponse message:\n" + connection.getResponseMessage(), 420 connection); 421 } 422 423 long advertisedSize; 424 String contentLength = connection.getHeaderField(CONTENT_LENGTH); 425 if (contentLength != null) { 426 advertisedSize = Long.parseLong(contentLength); 427 } else { 428 throw new IOException(CONTENT_LENGTH + " header is not provided " + 429 "by the namenode when trying to fetch " + url); 430 } 431 MD5Hash advertisedDigest = parseMD5Header(connection); 432 String fsImageName = connection 433 .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER); 434 InputStream stream = connection.getInputStream(); 435 436 return receiveFile(url.toExternalForm(), localPaths, dstStorage, 437 getChecksum, advertisedSize, advertisedDigest, fsImageName, stream, 438 null); 439 } 440 441 private static void setTimeout(HttpURLConnection connection) { 442 if (timeout <= 0) { 443 Configuration conf = new HdfsConfiguration(); 444 timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY, 445 DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT); 446 LOG.info("Image Transfer timeout configured to " + timeout 447 + " milliseconds"); 448 } 449 450 if (timeout > 0) { 451 connection.setConnectTimeout(timeout); 452 connection.setReadTimeout(timeout); 453 } 454 } 455 456 private static MD5Hash receiveFile(String url, List<File> localPaths, 457 Storage dstStorage, boolean getChecksum, long advertisedSize, 458 MD5Hash advertisedDigest, String fsImageName, InputStream stream, 459 DataTransferThrottler throttler) throws IOException { 460 long startTime = Time.monotonicNow(); 461 if (localPaths != null) { 462 // If the local paths refer to directories, use the server-provided header 463 // as the filename within that directory 464 List<File> newLocalPaths = new ArrayList<File>(); 465 for (File localPath : localPaths) { 466 if (localPath.isDirectory()) { 467 if (fsImageName == null) { 468 throw new IOException("No filename header provided by server"); 469 } 470 newLocalPaths.add(new File(localPath, fsImageName)); 471 } else { 472 newLocalPaths.add(localPath); 473 } 474 } 475 localPaths = newLocalPaths; 476 } 477 478 479 long received = 0; 480 MessageDigest digester = null; 481 if (getChecksum) { 482 digester = MD5Hash.getDigester(); 483 stream = new DigestInputStream(stream, digester); 484 } 485 boolean finishedReceiving = false; 486 487 List<FileOutputStream> outputStreams = Lists.newArrayList(); 488 489 try { 490 if (localPaths != null) { 491 for (File f : localPaths) { 492 try { 493 if (f.exists()) { 494 LOG.warn("Overwriting existing file " + f 495 + " with file downloaded from " + url); 496 } 497 outputStreams.add(new FileOutputStream(f)); 498 } catch (IOException ioe) { 499 LOG.warn("Unable to download file " + f, ioe); 500 // This will be null if we're downloading the fsimage to a file 501 // outside of an NNStorage directory. 502 if (dstStorage != null && 503 (dstStorage instanceof StorageErrorReporter)) { 504 ((StorageErrorReporter)dstStorage).reportErrorOnFile(f); 505 } 506 } 507 } 508 509 if (outputStreams.isEmpty()) { 510 throw new IOException( 511 "Unable to download to any storage directory"); 512 } 513 } 514 515 int num = 1; 516 byte[] buf = new byte[IO_FILE_BUFFER_SIZE]; 517 while (num > 0) { 518 num = stream.read(buf); 519 if (num > 0) { 520 received += num; 521 for (FileOutputStream fos : outputStreams) { 522 fos.write(buf, 0, num); 523 } 524 if (throttler != null) { 525 throttler.throttle(num); 526 } 527 } 528 } 529 finishedReceiving = true; 530 } finally { 531 stream.close(); 532 for (FileOutputStream fos : outputStreams) { 533 fos.getChannel().force(true); 534 fos.close(); 535 } 536 537 // Something went wrong and did not finish reading. 538 // Remove the temporary files. 539 if (!finishedReceiving) { 540 deleteTmpFiles(localPaths); 541 } 542 543 if (finishedReceiving && received != advertisedSize) { 544 // only throw this exception if we think we read all of it on our end 545 // -- otherwise a client-side IOException would be masked by this 546 // exception that makes it look like a server-side problem! 547 deleteTmpFiles(localPaths); 548 throw new IOException("File " + url + " received length " + received + 549 " is not of the advertised size " + 550 advertisedSize); 551 } 552 } 553 double xferSec = Math.max( 554 ((float)(Time.monotonicNow() - startTime)) / 1000.0, 0.001); 555 long xferKb = received / 1024; 556 LOG.info(String.format("Transfer took %.2fs at %.2f KB/s", 557 xferSec, xferKb / xferSec)); 558 559 if (digester != null) { 560 MD5Hash computedDigest = new MD5Hash(digester.digest()); 561 562 if (advertisedDigest != null && 563 !computedDigest.equals(advertisedDigest)) { 564 deleteTmpFiles(localPaths); 565 throw new IOException("File " + url + " computed digest " + 566 computedDigest + " does not match advertised digest " + 567 advertisedDigest); 568 } 569 return computedDigest; 570 } else { 571 return null; 572 } 573 } 574 575 private static void deleteTmpFiles(List<File> files) { 576 if (files == null) { 577 return; 578 } 579 580 LOG.info("Deleting temporary files: " + files); 581 for (File file : files) { 582 if (!file.delete()) { 583 LOG.warn("Deleting " + file + " has failed"); 584 } 585 } 586 } 587 588 private static MD5Hash parseMD5Header(HttpURLConnection connection) { 589 String header = connection.getHeaderField(MD5_HEADER); 590 return (header != null) ? new MD5Hash(header) : null; 591 } 592 593 private static MD5Hash parseMD5Header(HttpServletRequest request) { 594 String header = request.getHeader(MD5_HEADER); 595 return (header != null) ? new MD5Hash(header) : null; 596 } 597 598 public static class HttpGetFailedException extends IOException { 599 private static final long serialVersionUID = 1L; 600 private final int responseCode; 601 602 HttpGetFailedException(String msg, HttpURLConnection connection) throws IOException { 603 super(msg); 604 this.responseCode = connection.getResponseCode(); 605 } 606 607 public int getResponseCode() { 608 return responseCode; 609 } 610 } 611 612 public static class HttpPutFailedException extends IOException { 613 private static final long serialVersionUID = 1L; 614 private final int responseCode; 615 616 HttpPutFailedException(String msg, int responseCode) throws IOException { 617 super(msg); 618 this.responseCode = responseCode; 619 } 620 621 public int getResponseCode() { 622 return responseCode; 623 } 624 } 625 626}