001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT; 021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY; 022import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT; 023import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY; 024 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.text.DateFormat; 028import java.text.ParseException; 029import java.text.SimpleDateFormat; 030import java.util.Collection; 031import java.util.Date; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.Options.Rename; 039import org.apache.hadoop.fs.permission.FsAction; 040import org.apache.hadoop.fs.permission.FsPermission; 041import org.apache.hadoop.util.Time; 042 043import com.google.common.annotations.VisibleForTesting; 044 045/** Provides a <i>trash</i> feature. Files are moved to a user's trash 046 * directory, a subdirectory of their home directory named ".Trash". Files are 047 * initially moved to a <i>current</i> sub-directory of the trash directory. 048 * Within that sub-directory their original path is preserved. Periodically 049 * one may checkpoint the current trash and remove older checkpoints. (This 050 * design permits trash management without enumeration of the full trash 051 * content, without date support in the filesystem, and without clock 052 * synchronization.) 053 */ 054@InterfaceAudience.Private 055@InterfaceStability.Evolving 056public class TrashPolicyDefault extends TrashPolicy { 057 private static final Log LOG = 058 LogFactory.getLog(TrashPolicyDefault.class); 059 060 private static final Path CURRENT = new Path("Current"); 061 062 private static final FsPermission PERMISSION = 063 new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); 064 065 private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss"); 066 /** Format of checkpoint directories used prior to Hadoop 0.23. */ 067 private static final DateFormat OLD_CHECKPOINT = 068 new SimpleDateFormat("yyMMddHHmm"); 069 private static final int MSECS_PER_MINUTE = 60*1000; 070 071 private long emptierInterval; 072 073 public TrashPolicyDefault() { } 074 075 private TrashPolicyDefault(FileSystem fs, Configuration conf) 076 throws IOException { 077 initialize(conf, fs); 078 } 079 080 /** 081 * @deprecated Use {@link #initialize(Configuration, FileSystem)} instead. 082 */ 083 @Override 084 @Deprecated 085 public void initialize(Configuration conf, FileSystem fs, Path home) { 086 this.fs = fs; 087 this.deletionInterval = (long)(conf.getFloat( 088 FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT) 089 * MSECS_PER_MINUTE); 090 this.emptierInterval = (long)(conf.getFloat( 091 FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) 092 * MSECS_PER_MINUTE); 093 } 094 095 @Override 096 public void initialize(Configuration conf, FileSystem fs) { 097 this.fs = fs; 098 this.deletionInterval = (long)(conf.getFloat( 099 FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT) 100 * MSECS_PER_MINUTE); 101 this.emptierInterval = (long)(conf.getFloat( 102 FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) 103 * MSECS_PER_MINUTE); 104 } 105 106 private Path makeTrashRelativePath(Path basePath, Path rmFilePath) { 107 return Path.mergePaths(basePath, rmFilePath); 108 } 109 110 @Override 111 public boolean isEnabled() { 112 return deletionInterval != 0; 113 } 114 115 @Override 116 public boolean moveToTrash(Path path) throws IOException { 117 if (!isEnabled()) 118 return false; 119 120 if (!path.isAbsolute()) // make path absolute 121 path = new Path(fs.getWorkingDirectory(), path); 122 123 if (!fs.exists(path)) // check that path exists 124 throw new FileNotFoundException(path.toString()); 125 126 String qpath = fs.makeQualified(path).toString(); 127 128 Path trashRoot = fs.getTrashRoot(path); 129 Path trashCurrent = new Path(trashRoot, CURRENT); 130 if (qpath.startsWith(trashRoot.toString())) { 131 return false; // already in trash 132 } 133 134 if (trashRoot.getParent().toString().startsWith(qpath)) { 135 throw new IOException("Cannot move \"" + path + 136 "\" to the trash, as it contains the trash"); 137 } 138 139 Path trashPath = makeTrashRelativePath(trashCurrent, path); 140 Path baseTrashPath = makeTrashRelativePath(trashCurrent, path.getParent()); 141 142 IOException cause = null; 143 144 // try twice, in case checkpoint between the mkdirs() & rename() 145 for (int i = 0; i < 2; i++) { 146 try { 147 if (!fs.mkdirs(baseTrashPath, PERMISSION)) { // create current 148 LOG.warn("Can't create(mkdir) trash directory: " + baseTrashPath); 149 return false; 150 } 151 } catch (IOException e) { 152 LOG.warn("Can't create trash directory: " + baseTrashPath, e); 153 cause = e; 154 break; 155 } 156 try { 157 // if the target path in Trash already exists, then append with 158 // a current time in millisecs. 159 String orig = trashPath.toString(); 160 161 while(fs.exists(trashPath)) { 162 trashPath = new Path(orig + Time.now()); 163 } 164 165 if (fs.rename(path, trashPath)) { // move to current trash 166 LOG.info("Moved: '" + path + "' to trash at: " + trashPath); 167 return true; 168 } 169 } catch (IOException e) { 170 cause = e; 171 } 172 } 173 throw (IOException) 174 new IOException("Failed to move to trash: " + path).initCause(cause); 175 } 176 177 @SuppressWarnings("deprecation") 178 @Override 179 public void createCheckpoint() throws IOException { 180 createCheckpoint(new Date()); 181 } 182 183 @SuppressWarnings("deprecation") 184 public void createCheckpoint(Date date) throws IOException { 185 Collection<FileStatus> trashRoots = fs.getTrashRoots(false); 186 for (FileStatus trashRoot: trashRoots) { 187 LOG.info("TrashPolicyDefault#createCheckpoint for trashRoot: " + 188 trashRoot.getPath()); 189 createCheckpoint(trashRoot.getPath(), date); 190 } 191 } 192 193 @Override 194 public void deleteCheckpoint() throws IOException { 195 Collection<FileStatus> trashRoots = fs.getTrashRoots(false); 196 for (FileStatus trashRoot : trashRoots) { 197 LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + 198 trashRoot.getPath()); 199 deleteCheckpoint(trashRoot.getPath()); 200 } 201 } 202 203 @Override 204 public Path getCurrentTrashDir() { 205 return new Path(fs.getTrashRoot(null), CURRENT); 206 } 207 208 @Override 209 public Path getCurrentTrashDir(Path path) throws IOException { 210 return new Path(fs.getTrashRoot(path), CURRENT); 211 } 212 213 @Override 214 public Runnable getEmptier() throws IOException { 215 return new Emptier(getConf(), emptierInterval); 216 } 217 218 protected class Emptier implements Runnable { 219 220 private Configuration conf; 221 private long emptierInterval; 222 223 Emptier(Configuration conf, long emptierInterval) throws IOException { 224 this.conf = conf; 225 this.emptierInterval = emptierInterval; 226 if (emptierInterval > deletionInterval || emptierInterval <= 0) { 227 LOG.info("The configured checkpoint interval is " + 228 (emptierInterval / MSECS_PER_MINUTE) + " minutes." + 229 " Using an interval of " + 230 (deletionInterval / MSECS_PER_MINUTE) + 231 " minutes that is used for deletion instead"); 232 this.emptierInterval = deletionInterval; 233 } 234 LOG.info("Namenode trash configuration: Deletion interval = " 235 + (deletionInterval / MSECS_PER_MINUTE) 236 + " minutes, Emptier interval = " 237 + (emptierInterval / MSECS_PER_MINUTE) + " minutes."); 238 } 239 240 @Override 241 public void run() { 242 if (emptierInterval == 0) 243 return; // trash disabled 244 long now = Time.now(); 245 long end; 246 while (true) { 247 end = ceiling(now, emptierInterval); 248 try { // sleep for interval 249 Thread.sleep(end - now); 250 } catch (InterruptedException e) { 251 break; // exit on interrupt 252 } 253 254 try { 255 now = Time.now(); 256 if (now >= end) { 257 Collection<FileStatus> trashRoots; 258 trashRoots = fs.getTrashRoots(true); // list all trash dirs 259 260 for (FileStatus trashRoot : trashRoots) { // dump each trash 261 if (!trashRoot.isDirectory()) 262 continue; 263 try { 264 TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf); 265 trash.deleteCheckpoint(trashRoot.getPath()); 266 trash.createCheckpoint(trashRoot.getPath(), new Date(now)); 267 } catch (IOException e) { 268 LOG.warn("Trash caught: "+e+". Skipping " + 269 trashRoot.getPath() + "."); 270 } 271 } 272 } 273 } catch (Exception e) { 274 LOG.warn("RuntimeException during Trash.Emptier.run(): ", e); 275 } 276 } 277 try { 278 fs.close(); 279 } catch(IOException e) { 280 LOG.warn("Trash cannot close FileSystem: ", e); 281 } 282 } 283 284 private long ceiling(long time, long interval) { 285 return floor(time, interval) + interval; 286 } 287 private long floor(long time, long interval) { 288 return (time / interval) * interval; 289 } 290 291 @VisibleForTesting 292 protected long getEmptierInterval() { 293 return this.emptierInterval/MSECS_PER_MINUTE; 294 } 295 } 296 297 private void createCheckpoint(Path trashRoot, Date date) throws IOException { 298 if (!fs.exists(new Path(trashRoot, CURRENT))) { 299 return; 300 } 301 Path checkpointBase; 302 synchronized (CHECKPOINT) { 303 checkpointBase = new Path(trashRoot, CHECKPOINT.format(date)); 304 } 305 Path checkpoint = checkpointBase; 306 Path current = new Path(trashRoot, CURRENT); 307 308 int attempt = 0; 309 while (true) { 310 try { 311 fs.rename(current, checkpoint, Rename.NONE); 312 LOG.info("Created trash checkpoint: " + checkpoint.toUri().getPath()); 313 break; 314 } catch (FileAlreadyExistsException e) { 315 if (++attempt > 1000) { 316 throw new IOException("Failed to checkpoint trash: " + checkpoint); 317 } 318 checkpoint = checkpointBase.suffix("-" + attempt); 319 } 320 } 321 } 322 323 private void deleteCheckpoint(Path trashRoot) throws IOException { 324 LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot); 325 326 FileStatus[] dirs = null; 327 try { 328 dirs = fs.listStatus(trashRoot); // scan trash sub-directories 329 } catch (FileNotFoundException fnfe) { 330 return; 331 } 332 333 long now = Time.now(); 334 for (int i = 0; i < dirs.length; i++) { 335 Path path = dirs[i].getPath(); 336 String dir = path.toUri().getPath(); 337 String name = path.getName(); 338 if (name.equals(CURRENT.getName())) { // skip current 339 continue; 340 } 341 342 long time; 343 try { 344 time = getTimeFromCheckpoint(name); 345 } catch (ParseException e) { 346 LOG.warn("Unexpected item in trash: "+dir+". Ignoring."); 347 continue; 348 } 349 350 if ((now - deletionInterval) > time) { 351 if (fs.delete(path, true)) { 352 LOG.info("Deleted trash checkpoint: "+dir); 353 } else { 354 LOG.warn("Couldn't delete checkpoint: " + dir + " Ignoring."); 355 } 356 } 357 } 358 } 359 360 private long getTimeFromCheckpoint(String name) throws ParseException { 361 long time; 362 363 try { 364 synchronized (CHECKPOINT) { 365 time = CHECKPOINT.parse(name).getTime(); 366 } 367 } catch (ParseException pe) { 368 // Check for old-style checkpoint directories left over 369 // after an upgrade from Hadoop 1.x 370 synchronized (OLD_CHECKPOINT) { 371 time = OLD_CHECKPOINT.parse(name).getTime(); 372 } 373 } 374 375 return time; 376 } 377}