001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs; 020 021import com.google.common.annotations.VisibleForTesting; 022 023import java.io.IOException; 024import java.lang.ref.WeakReference; 025import java.util.concurrent.DelayQueue; 026import java.util.concurrent.Delayed; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.apache.hadoop.classification.InterfaceAudience; 032import org.apache.hadoop.security.token.Token; 033import org.apache.hadoop.security.token.TokenIdentifier; 034import org.apache.hadoop.util.Time; 035 036/** 037 * A daemon thread that waits for the next file system to renew. 038 */ 039@InterfaceAudience.Private 040public class DelegationTokenRenewer 041 extends Thread { 042 private static final Log LOG = LogFactory 043 .getLog(DelegationTokenRenewer.class); 044 045 /** The renewable interface used by the renewer. */ 046 public interface Renewable { 047 /** @return the renew token. */ 048 public Token<?> getRenewToken(); 049 050 /** Set delegation token. */ 051 public <T extends TokenIdentifier> void setDelegationToken(Token<T> token); 052 } 053 054 /** 055 * An action that will renew and replace the file system's delegation 056 * tokens automatically. 057 */ 058 public static class RenewAction<T extends FileSystem & Renewable> 059 implements Delayed { 060 /** when should the renew happen */ 061 private long renewalTime; 062 /** a weak reference to the file system so that it can be garbage collected */ 063 private final WeakReference<T> weakFs; 064 private Token<?> token; 065 boolean isValid = true; 066 067 private RenewAction(final T fs) { 068 this.weakFs = new WeakReference<T>(fs); 069 this.token = fs.getRenewToken(); 070 updateRenewalTime(renewCycle); 071 } 072 073 public boolean isValid() { 074 return isValid; 075 } 076 077 /** Get the delay until this event should happen. */ 078 @Override 079 public long getDelay(final TimeUnit unit) { 080 final long millisLeft = renewalTime - Time.now(); 081 return unit.convert(millisLeft, TimeUnit.MILLISECONDS); 082 } 083 084 @Override 085 public int compareTo(final Delayed delayed) { 086 final RenewAction<?> that = (RenewAction<?>)delayed; 087 return this.renewalTime < that.renewalTime? -1 088 : this.renewalTime == that.renewalTime? 0: 1; 089 } 090 091 @Override 092 public int hashCode() { 093 return token.hashCode(); 094 } 095 096 @Override 097 public boolean equals(final Object that) { 098 if (this == that) { 099 return true; 100 } else if (that == null || !(that instanceof RenewAction)) { 101 return false; 102 } 103 return token.equals(((RenewAction<?>)that).token); 104 } 105 106 /** 107 * Set a new time for the renewal. 108 * It can only be called when the action is not in the queue or any 109 * collection because the hashCode may change 110 * @param newTime the new time 111 */ 112 private void updateRenewalTime(long delay) { 113 renewalTime = Time.now() + delay - delay/10; 114 } 115 116 /** 117 * Renew or replace the delegation token for this file system. 118 * It can only be called when the action is not in the queue. 119 * @return 120 * @throws IOException 121 */ 122 private boolean renew() throws IOException, InterruptedException { 123 final T fs = weakFs.get(); 124 final boolean b = fs != null; 125 if (b) { 126 synchronized(fs) { 127 try { 128 long expires = token.renew(fs.getConf()); 129 updateRenewalTime(expires - Time.now()); 130 } catch (IOException ie) { 131 try { 132 Token<?>[] tokens = fs.addDelegationTokens(null, null); 133 if (tokens.length == 0) { 134 throw new IOException("addDelegationTokens returned no tokens"); 135 } 136 token = tokens[0]; 137 updateRenewalTime(renewCycle); 138 fs.setDelegationToken(token); 139 } catch (IOException ie2) { 140 isValid = false; 141 throw new IOException("Can't renew or get new delegation token ", ie); 142 } 143 } 144 } 145 } 146 return b; 147 } 148 149 private void cancel() throws IOException, InterruptedException { 150 final T fs = weakFs.get(); 151 if (fs != null) { 152 token.cancel(fs.getConf()); 153 } 154 } 155 156 @Override 157 public String toString() { 158 Renewable fs = weakFs.get(); 159 return fs == null? "evaporated token renew" 160 : "The token will be renewed in " + getDelay(TimeUnit.SECONDS) 161 + " secs, renewToken=" + token; 162 } 163 } 164 165 /** assumes renew cycle for a token is 24 hours... */ 166 private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000; 167 168 @InterfaceAudience.Private 169 @VisibleForTesting 170 public static long renewCycle = RENEW_CYCLE; 171 172 /** Queue to maintain the RenewActions to be processed by the {@link #run()} */ 173 private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>(); 174 175 /** For testing purposes */ 176 @VisibleForTesting 177 protected int getRenewQueueLength() { 178 return queue.size(); 179 } 180 181 /** 182 * Create the singleton instance. However, the thread can be started lazily in 183 * {@link #addRenewAction(FileSystem)} 184 */ 185 private static DelegationTokenRenewer INSTANCE = null; 186 187 private DelegationTokenRenewer(final Class<? extends FileSystem> clazz) { 188 super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName()); 189 setDaemon(true); 190 } 191 192 public static synchronized DelegationTokenRenewer getInstance() { 193 if (INSTANCE == null) { 194 INSTANCE = new DelegationTokenRenewer(FileSystem.class); 195 } 196 return INSTANCE; 197 } 198 199 @VisibleForTesting 200 static synchronized void reset() { 201 if (INSTANCE != null) { 202 INSTANCE.queue.clear(); 203 INSTANCE.interrupt(); 204 try { 205 INSTANCE.join(); 206 } catch (InterruptedException e) { 207 LOG.warn("Failed to reset renewer"); 208 } finally { 209 INSTANCE = null; 210 } 211 } 212 } 213 214 /** Add a renew action to the queue. */ 215 @SuppressWarnings("static-access") 216 public <T extends FileSystem & Renewable> RenewAction<T> addRenewAction(final T fs) { 217 synchronized (this) { 218 if (!isAlive()) { 219 start(); 220 } 221 } 222 RenewAction<T> action = new RenewAction<T>(fs); 223 if (action.token != null) { 224 queue.add(action); 225 } else { 226 fs.LOG.error("does not have a token for renewal"); 227 } 228 return action; 229 } 230 231 /** 232 * Remove the associated renew action from the queue 233 * 234 * @throws IOException 235 */ 236 public <T extends FileSystem & Renewable> void removeRenewAction( 237 final T fs) throws IOException { 238 RenewAction<T> action = new RenewAction<T>(fs); 239 if (queue.remove(action)) { 240 try { 241 action.cancel(); 242 } catch (InterruptedException ie) { 243 LOG.error("Interrupted while canceling token for " + fs.getUri() 244 + "filesystem"); 245 if (LOG.isDebugEnabled()) { 246 LOG.debug(ie.getStackTrace()); 247 } 248 } 249 } 250 } 251 252 @SuppressWarnings("static-access") 253 @Override 254 public void run() { 255 for(;;) { 256 RenewAction<?> action = null; 257 try { 258 action = queue.take(); 259 if (action.renew()) { 260 queue.add(action); 261 } 262 } catch (InterruptedException ie) { 263 return; 264 } catch (Exception ie) { 265 action.weakFs.get().LOG.warn("Failed to renew token, action=" + action, 266 ie); 267 } 268 } 269 } 270}