001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs;
020
021import com.google.common.annotations.VisibleForTesting;
022
023import java.io.IOException;
024import java.lang.ref.WeakReference;
025import java.util.concurrent.DelayQueue;
026import java.util.concurrent.Delayed;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.security.token.Token;
033import org.apache.hadoop.security.token.TokenIdentifier;
034import org.apache.hadoop.util.Time;
035
036/**
037 * A daemon thread that waits for the next file system to renew.
038 */
039@InterfaceAudience.Private
040public class DelegationTokenRenewer
041    extends Thread {
042  private static final Log LOG = LogFactory
043      .getLog(DelegationTokenRenewer.class);
044
045  /** The renewable interface used by the renewer. */
046  public interface Renewable {
047    /** @return the renew token. */
048    public Token<?> getRenewToken();
049
050    /** Set delegation token. */
051    public <T extends TokenIdentifier> void setDelegationToken(Token<T> token);
052  }
053
054  /**
055   * An action that will renew and replace the file system's delegation 
056   * tokens automatically.
057   */
058  public static class RenewAction<T extends FileSystem & Renewable>
059      implements Delayed {
060    /** when should the renew happen */
061    private long renewalTime;
062    /** a weak reference to the file system so that it can be garbage collected */
063    private final WeakReference<T> weakFs;
064    private Token<?> token; 
065    boolean isValid = true;
066
067    private RenewAction(final T fs) {
068      this.weakFs = new WeakReference<T>(fs);
069      this.token = fs.getRenewToken();
070      updateRenewalTime(renewCycle);
071    }
072 
073    public boolean isValid() {
074      return isValid;
075    }
076    
077    /** Get the delay until this event should happen. */
078    @Override
079    public long getDelay(final TimeUnit unit) {
080      final long millisLeft = renewalTime - Time.now();
081      return unit.convert(millisLeft, TimeUnit.MILLISECONDS);
082    }
083
084    @Override
085    public int compareTo(final Delayed delayed) {
086      final RenewAction<?> that = (RenewAction<?>)delayed;
087      return this.renewalTime < that.renewalTime? -1
088          : this.renewalTime == that.renewalTime? 0: 1;
089    }
090
091    @Override
092    public int hashCode() {
093      return token.hashCode();
094    }
095
096    @Override
097    public boolean equals(final Object that) {
098      if (this == that) {
099        return true;
100      } else if (that == null || !(that instanceof RenewAction)) {
101        return false;
102      }
103      return token.equals(((RenewAction<?>)that).token);
104    }
105
106    /**
107     * Set a new time for the renewal.
108     * It can only be called when the action is not in the queue or any
109     * collection because the hashCode may change
110     * @param newTime the new time
111     */
112    private void updateRenewalTime(long delay) {
113      renewalTime = Time.now() + delay - delay/10;
114    }
115
116    /**
117     * Renew or replace the delegation token for this file system.
118     * It can only be called when the action is not in the queue.
119     * @return
120     * @throws IOException
121     */
122    private boolean renew() throws IOException, InterruptedException {
123      final T fs = weakFs.get();
124      final boolean b = fs != null;
125      if (b) {
126        synchronized(fs) {
127          try {
128            long expires = token.renew(fs.getConf());
129            updateRenewalTime(expires - Time.now());
130          } catch (IOException ie) {
131            try {
132              Token<?>[] tokens = fs.addDelegationTokens(null, null);
133              if (tokens.length == 0) {
134                throw new IOException("addDelegationTokens returned no tokens");
135              }
136              token = tokens[0];
137              updateRenewalTime(renewCycle);
138              fs.setDelegationToken(token);
139            } catch (IOException ie2) {
140              isValid = false;
141              throw new IOException("Can't renew or get new delegation token ", ie);
142            }
143          }
144        }
145      }
146      return b;
147    }
148
149    private void cancel() throws IOException, InterruptedException {
150      final T fs = weakFs.get();
151      if (fs != null) {
152        token.cancel(fs.getConf());
153      }
154    }
155
156    @Override
157    public String toString() {
158      Renewable fs = weakFs.get();
159      return fs == null? "evaporated token renew"
160          : "The token will be renewed in " + getDelay(TimeUnit.SECONDS)
161            + " secs, renewToken=" + token;
162    }
163  }
164
165  /** assumes renew cycle for a token is 24 hours... */
166  private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000; 
167
168  @InterfaceAudience.Private
169  @VisibleForTesting
170  public static long renewCycle = RENEW_CYCLE;
171
172  /** Queue to maintain the RenewActions to be processed by the {@link #run()} */
173  private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
174  
175  /** For testing purposes */
176  @VisibleForTesting
177  protected int getRenewQueueLength() {
178    return queue.size();
179  }
180
181  /**
182   * Create the singleton instance. However, the thread can be started lazily in
183   * {@link #addRenewAction(FileSystem)}
184   */
185  private static DelegationTokenRenewer INSTANCE = null;
186
187  private DelegationTokenRenewer(final Class<? extends FileSystem> clazz) {
188    super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
189    setDaemon(true);
190  }
191
192  public static synchronized DelegationTokenRenewer getInstance() {
193    if (INSTANCE == null) {
194      INSTANCE = new DelegationTokenRenewer(FileSystem.class);
195    }
196    return INSTANCE;
197  }
198
199  @VisibleForTesting
200  static synchronized void reset() {
201    if (INSTANCE != null) {
202      INSTANCE.queue.clear();
203      INSTANCE.interrupt();
204      try {
205        INSTANCE.join();
206      } catch (InterruptedException e) {
207        LOG.warn("Failed to reset renewer");
208      } finally {
209        INSTANCE = null;
210      }
211    }
212  }
213  
214  /** Add a renew action to the queue. */
215  @SuppressWarnings("static-access")
216  public <T extends FileSystem & Renewable> RenewAction<T> addRenewAction(final T fs) {
217    synchronized (this) {
218      if (!isAlive()) {
219        start();
220      }
221    }
222    RenewAction<T> action = new RenewAction<T>(fs);
223    if (action.token != null) {
224      queue.add(action);
225    } else {
226      fs.LOG.error("does not have a token for renewal");
227    }
228    return action;
229  }
230
231  /**
232   * Remove the associated renew action from the queue
233   * 
234   * @throws IOException
235   */
236  public <T extends FileSystem & Renewable> void removeRenewAction(
237      final T fs) throws IOException {
238    RenewAction<T> action = new RenewAction<T>(fs);
239    if (queue.remove(action)) {
240      try {
241        action.cancel();
242      } catch (InterruptedException ie) {
243        LOG.error("Interrupted while canceling token for " + fs.getUri()
244            + "filesystem");
245        if (LOG.isDebugEnabled()) {
246          LOG.debug(ie.getStackTrace());
247        }
248      }
249    }
250  }
251
252  @SuppressWarnings("static-access")
253  @Override
254  public void run() {
255    for(;;) {
256      RenewAction<?> action = null;
257      try {
258        action = queue.take();
259        if (action.renew()) {
260          queue.add(action);
261        }
262      } catch (InterruptedException ie) {
263        return;
264      } catch (Exception ie) {
265        action.weakFs.get().LOG.warn("Failed to renew token, action=" + action,
266            ie);
267      }
268    }
269  }
270}