001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
021import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
022import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
023import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
024
025import java.io.FileNotFoundException;
026import java.io.IOException;
027import java.text.DateFormat;
028import java.text.ParseException;
029import java.text.SimpleDateFormat;
030import java.util.Collection;
031import java.util.Date;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceStability;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Options.Rename;
039import org.apache.hadoop.fs.permission.FsAction;
040import org.apache.hadoop.fs.permission.FsPermission;
041import org.apache.hadoop.util.Time;
042
043import com.google.common.annotations.VisibleForTesting;
044
045/** Provides a <i>trash</i> feature.  Files are moved to a user's trash
046 * directory, a subdirectory of their home directory named ".Trash".  Files are
047 * initially moved to a <i>current</i> sub-directory of the trash directory.
048 * Within that sub-directory their original path is preserved.  Periodically
049 * one may checkpoint the current trash and remove older checkpoints.  (This
050 * design permits trash management without enumeration of the full trash
051 * content, without date support in the filesystem, and without clock
052 * synchronization.)
053 */
054@InterfaceAudience.Private
055@InterfaceStability.Evolving
056public class TrashPolicyDefault extends TrashPolicy {
057  private static final Log LOG =
058    LogFactory.getLog(TrashPolicyDefault.class);
059
060  private static final Path CURRENT = new Path("Current");
061
062  private static final FsPermission PERMISSION =
063    new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
064
065  private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss");
066  /** Format of checkpoint directories used prior to Hadoop 0.23. */
067  private static final DateFormat OLD_CHECKPOINT =
068      new SimpleDateFormat("yyMMddHHmm");
069  private static final int MSECS_PER_MINUTE = 60*1000;
070
071  private long emptierInterval;
072
073  public TrashPolicyDefault() { }
074
075  private TrashPolicyDefault(FileSystem fs, Configuration conf)
076      throws IOException {
077    initialize(conf, fs);
078  }
079
080  /**
081   * @deprecated Use {@link #initialize(Configuration, FileSystem)} instead.
082   */
083  @Override
084  @Deprecated
085  public void initialize(Configuration conf, FileSystem fs, Path home) {
086    this.fs = fs;
087    this.deletionInterval = (long)(conf.getFloat(
088        FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
089        * MSECS_PER_MINUTE);
090    this.emptierInterval = (long)(conf.getFloat(
091        FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
092        * MSECS_PER_MINUTE);
093   }
094
095  @Override
096  public void initialize(Configuration conf, FileSystem fs) {
097    this.fs = fs;
098    this.deletionInterval = (long)(conf.getFloat(
099        FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT)
100        * MSECS_PER_MINUTE);
101    this.emptierInterval = (long)(conf.getFloat(
102        FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
103        * MSECS_PER_MINUTE);
104  }
105
106  private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
107    return Path.mergePaths(basePath, rmFilePath);
108  }
109
110  @Override
111  public boolean isEnabled() {
112    return deletionInterval != 0;
113  }
114
115  @Override
116  public boolean moveToTrash(Path path) throws IOException {
117    if (!isEnabled())
118      return false;
119
120    if (!path.isAbsolute())                       // make path absolute
121      path = new Path(fs.getWorkingDirectory(), path);
122
123    if (!fs.exists(path))                         // check that path exists
124      throw new FileNotFoundException(path.toString());
125
126    String qpath = fs.makeQualified(path).toString();
127
128    Path trashRoot = fs.getTrashRoot(path);
129    Path trashCurrent = new Path(trashRoot, CURRENT);
130    if (qpath.startsWith(trashRoot.toString())) {
131      return false;                               // already in trash
132    }
133
134    if (trashRoot.getParent().toString().startsWith(qpath)) {
135      throw new IOException("Cannot move \"" + path +
136                            "\" to the trash, as it contains the trash");
137    }
138
139    Path trashPath = makeTrashRelativePath(trashCurrent, path);
140    Path baseTrashPath = makeTrashRelativePath(trashCurrent, path.getParent());
141    
142    IOException cause = null;
143
144    // try twice, in case checkpoint between the mkdirs() & rename()
145    for (int i = 0; i < 2; i++) {
146      try {
147        if (!fs.mkdirs(baseTrashPath, PERMISSION)) {      // create current
148          LOG.warn("Can't create(mkdir) trash directory: " + baseTrashPath);
149          return false;
150        }
151      } catch (IOException e) {
152        LOG.warn("Can't create trash directory: " + baseTrashPath, e);
153        cause = e;
154        break;
155      }
156      try {
157        // if the target path in Trash already exists, then append with 
158        // a current time in millisecs.
159        String orig = trashPath.toString();
160        
161        while(fs.exists(trashPath)) {
162          trashPath = new Path(orig + Time.now());
163        }
164        
165        if (fs.rename(path, trashPath)) {           // move to current trash
166          LOG.info("Moved: '" + path + "' to trash at: " + trashPath);
167          return true;
168        }
169      } catch (IOException e) {
170        cause = e;
171      }
172    }
173    throw (IOException)
174      new IOException("Failed to move to trash: " + path).initCause(cause);
175  }
176
177  @SuppressWarnings("deprecation")
178  @Override
179  public void createCheckpoint() throws IOException {
180    createCheckpoint(new Date());
181  }
182
183  @SuppressWarnings("deprecation")
184  public void createCheckpoint(Date date) throws IOException {
185    Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
186    for (FileStatus trashRoot: trashRoots) {
187      LOG.info("TrashPolicyDefault#createCheckpoint for trashRoot: " +
188          trashRoot.getPath());
189      createCheckpoint(trashRoot.getPath(), date);
190    }
191  }
192
193  @Override
194  public void deleteCheckpoint() throws IOException {
195    Collection<FileStatus> trashRoots = fs.getTrashRoots(false);
196    for (FileStatus trashRoot : trashRoots) {
197      LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " +
198          trashRoot.getPath());
199      deleteCheckpoint(trashRoot.getPath());
200    }
201  }
202
203  @Override
204  public Path getCurrentTrashDir() {
205    return new Path(fs.getTrashRoot(null), CURRENT);
206  }
207
208  @Override
209  public Path getCurrentTrashDir(Path path) throws IOException {
210    return new Path(fs.getTrashRoot(path), CURRENT);
211  }
212
213  @Override
214  public Runnable getEmptier() throws IOException {
215    return new Emptier(getConf(), emptierInterval);
216  }
217
218  protected class Emptier implements Runnable {
219
220    private Configuration conf;
221    private long emptierInterval;
222
223    Emptier(Configuration conf, long emptierInterval) throws IOException {
224      this.conf = conf;
225      this.emptierInterval = emptierInterval;
226      if (emptierInterval > deletionInterval || emptierInterval <= 0) {
227        LOG.info("The configured checkpoint interval is " +
228                 (emptierInterval / MSECS_PER_MINUTE) + " minutes." +
229                 " Using an interval of " +
230                 (deletionInterval / MSECS_PER_MINUTE) +
231                 " minutes that is used for deletion instead");
232        this.emptierInterval = deletionInterval;
233      }
234      LOG.info("Namenode trash configuration: Deletion interval = "
235          + (deletionInterval / MSECS_PER_MINUTE)
236          + " minutes, Emptier interval = "
237          + (emptierInterval / MSECS_PER_MINUTE) + " minutes.");
238    }
239
240    @Override
241    public void run() {
242      if (emptierInterval == 0)
243        return;                                   // trash disabled
244      long now = Time.now();
245      long end;
246      while (true) {
247        end = ceiling(now, emptierInterval);
248        try {                                     // sleep for interval
249          Thread.sleep(end - now);
250        } catch (InterruptedException e) {
251          break;                                  // exit on interrupt
252        }
253
254        try {
255          now = Time.now();
256          if (now >= end) {
257            Collection<FileStatus> trashRoots;
258            trashRoots = fs.getTrashRoots(true);      // list all trash dirs
259
260            for (FileStatus trashRoot : trashRoots) {   // dump each trash
261              if (!trashRoot.isDirectory())
262                continue;
263              try {
264                TrashPolicyDefault trash = new TrashPolicyDefault(fs, conf);
265                trash.deleteCheckpoint(trashRoot.getPath());
266                trash.createCheckpoint(trashRoot.getPath(), new Date(now));
267              } catch (IOException e) {
268                LOG.warn("Trash caught: "+e+". Skipping " +
269                    trashRoot.getPath() + ".");
270              } 
271            }
272          }
273        } catch (Exception e) {
274          LOG.warn("RuntimeException during Trash.Emptier.run(): ", e); 
275        }
276      }
277      try {
278        fs.close();
279      } catch(IOException e) {
280        LOG.warn("Trash cannot close FileSystem: ", e);
281      }
282    }
283
284    private long ceiling(long time, long interval) {
285      return floor(time, interval) + interval;
286    }
287    private long floor(long time, long interval) {
288      return (time / interval) * interval;
289    }
290
291    @VisibleForTesting
292    protected long getEmptierInterval() {
293      return this.emptierInterval/MSECS_PER_MINUTE;
294    }
295  }
296
297  private void createCheckpoint(Path trashRoot, Date date) throws IOException {
298    if (!fs.exists(new Path(trashRoot, CURRENT))) {
299      return;
300    }
301    Path checkpointBase;
302    synchronized (CHECKPOINT) {
303      checkpointBase = new Path(trashRoot, CHECKPOINT.format(date));
304    }
305    Path checkpoint = checkpointBase;
306    Path current = new Path(trashRoot, CURRENT);
307
308    int attempt = 0;
309    while (true) {
310      try {
311        fs.rename(current, checkpoint, Rename.NONE);
312        LOG.info("Created trash checkpoint: " + checkpoint.toUri().getPath());
313        break;
314      } catch (FileAlreadyExistsException e) {
315        if (++attempt > 1000) {
316          throw new IOException("Failed to checkpoint trash: " + checkpoint);
317        }
318        checkpoint = checkpointBase.suffix("-" + attempt);
319      }
320    }
321  }
322
323  private void deleteCheckpoint(Path trashRoot) throws IOException {
324    LOG.info("TrashPolicyDefault#deleteCheckpoint for trashRoot: " + trashRoot);
325
326    FileStatus[] dirs = null;
327    try {
328      dirs = fs.listStatus(trashRoot); // scan trash sub-directories
329    } catch (FileNotFoundException fnfe) {
330      return;
331    }
332
333    long now = Time.now();
334    for (int i = 0; i < dirs.length; i++) {
335      Path path = dirs[i].getPath();
336      String dir = path.toUri().getPath();
337      String name = path.getName();
338      if (name.equals(CURRENT.getName())) {         // skip current
339        continue;
340      }
341
342      long time;
343      try {
344        time = getTimeFromCheckpoint(name);
345      } catch (ParseException e) {
346        LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
347        continue;
348      }
349
350      if ((now - deletionInterval) > time) {
351        if (fs.delete(path, true)) {
352          LOG.info("Deleted trash checkpoint: "+dir);
353        } else {
354          LOG.warn("Couldn't delete checkpoint: " + dir + " Ignoring.");
355        }
356      }
357    }
358  }
359
360  private long getTimeFromCheckpoint(String name) throws ParseException {
361    long time;
362
363    try {
364      synchronized (CHECKPOINT) {
365        time = CHECKPOINT.parse(name).getTime();
366      }
367    } catch (ParseException pe) {
368      // Check for old-style checkpoint directories left over
369      // after an upgrade from Hadoop 1.x
370      synchronized (OLD_CHECKPOINT) {
371        time = OLD_CHECKPOINT.parse(name).getTime();
372      }
373    }
374
375    return time;
376  }
377}