001    /**
002    * Licensed to the Apache Software Foundation (ASF) under one
003    * or more contributor license agreements.  See the NOTICE file
004    * distributed with this work for additional information
005    * regarding copyright ownership.  The ASF licenses this file
006    * to you under the Apache License, Version 2.0 (the
007    * "License"); you may not use this file except in compliance
008    * with the License.  You may obtain a copy of the License at
009    *
010    *     http://www.apache.org/licenses/LICENSE-2.0
011    *
012    * Unless required by applicable law or agreed to in writing, software
013    * distributed under the License is distributed on an "AS IS" BASIS,
014    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015    * See the License for the specific language governing permissions and
016    * limitations under the License.
017    */
018    
019    package org.apache.hadoop.yarn.client.api;
020    
021    import java.io.IOException;
022    import java.util.Collection;
023    import java.util.List;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceAudience.Private;
029    import org.apache.hadoop.classification.InterfaceAudience.Public;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.service.AbstractService;
032    import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
033    import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
034    import org.apache.hadoop.yarn.api.records.ContainerId;
035    import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
036    import org.apache.hadoop.yarn.api.records.Priority;
037    import org.apache.hadoop.yarn.api.records.Resource;
038    import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
039    import org.apache.hadoop.yarn.exceptions.YarnException;
040    
041    import com.google.common.base.Preconditions;
042    import com.google.common.base.Supplier;
043    import com.google.common.collect.ImmutableList;
044    
045    @InterfaceAudience.Public
046    @InterfaceStability.Stable
047    public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
048        AbstractService {
049      private static final Log LOG = LogFactory.getLog(AMRMClient.class);
050    
051      /**
052       * Create a new instance of AMRMClient.
053       * For usage:
054       * <pre>
055       * {@code
056       * AMRMClient.<T>createAMRMClientContainerRequest()
057       * }</pre>
058       * @return the newly create AMRMClient instance.
059       */
060      @Public
061      public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() {
062        AMRMClient<T> client = new AMRMClientImpl<T>();
063        return client;
064      }
065    
066      private NMTokenCache nmTokenCache;
067    
068      @Private
069      protected AMRMClient(String name) {
070        super(name);
071        nmTokenCache = NMTokenCache.getSingleton();
072      }
073    
074      /**
075       * Object to represent a single container request for resources. Scheduler
076       * documentation should be consulted for the specifics of how the parameters
077       * are honored.
078       * 
079       * By default, YARN schedulers try to allocate containers at the requested
080       * locations but they may relax the constraints in order to expedite meeting
081       * allocations limits. They first relax the constraint to the same rack as the
082       * requested node and then to anywhere in the cluster. The relaxLocality flag
083       * may be used to disable locality relaxation and request containers at only 
084       * specific locations. The following conditions apply.
085       * <ul>
086       * <li>Within a priority, all container requests must have the same value for
087       * locality relaxation. Either enabled or disabled.</li>
088       * <li>If locality relaxation is disabled, then across requests, locations at
089       * different network levels may not be specified. E.g. its invalid to make a
090       * request for a specific node and another request for a specific rack.</li>
091       * <li>If locality relaxation is disabled, then only within the same request,  
092       * a node and its rack may be specified together. This allows for a specific   
093       * rack with a preference for a specific node within that rack.</li>
094       * <li></li>
095       * </ul>
096       * To re-enable locality relaxation at a given priority, all pending requests 
097       * with locality relaxation disabled must be first removed. Then they can be 
098       * added back with locality relaxation enabled.
099       * 
100       * All getters return immutable values.
101       */
102      public static class ContainerRequest {
103        final Resource capability;
104        final List<String> nodes;
105        final List<String> racks;
106        final Priority priority;
107        final boolean relaxLocality;
108        final String nodeLabelsExpression;
109        
110        /**
111         * Instantiates a {@link ContainerRequest} with the given constraints and
112         * locality relaxation enabled.
113         * 
114         * @param capability
115         *          The {@link Resource} to be requested for each container.
116         * @param nodes
117         *          Any hosts to request that the containers are placed on.
118         * @param racks
119         *          Any racks to request that the containers are placed on. The
120         *          racks corresponding to any hosts requested will be automatically
121         *          added to this list.
122         * @param priority
123         *          The priority at which to request the containers. Higher
124         *          priorities have lower numerical values.
125         */
126        public ContainerRequest(Resource capability, String[] nodes,
127            String[] racks, Priority priority) {
128          this(capability, nodes, racks, priority, true, null);
129        }
130        
131        /**
132         * Instantiates a {@link ContainerRequest} with the given constraints.
133         * 
134         * @param capability
135         *          The {@link Resource} to be requested for each container.
136         * @param nodes
137         *          Any hosts to request that the containers are placed on.
138         * @param racks
139         *          Any racks to request that the containers are placed on. The
140         *          racks corresponding to any hosts requested will be automatically
141         *          added to this list.
142         * @param priority
143         *          The priority at which to request the containers. Higher
144         *          priorities have lower numerical values.
145         * @param relaxLocality
146         *          If true, containers for this request may be assigned on hosts
147         *          and racks other than the ones explicitly requested.
148         */
149        public ContainerRequest(Resource capability, String[] nodes,
150            String[] racks, Priority priority, boolean relaxLocality) {
151          this(capability, nodes, racks, priority, relaxLocality, null);
152        }
153              
154        /**
155         * Instantiates a {@link ContainerRequest} with the given constraints.
156         * 
157         * @param capability
158         *          The {@link Resource} to be requested for each container.
159         * @param nodes
160         *          Any hosts to request that the containers are placed on.
161         * @param racks
162         *          Any racks to request that the containers are placed on. The
163         *          racks corresponding to any hosts requested will be automatically
164         *          added to this list.
165         * @param priority
166         *          The priority at which to request the containers. Higher
167         *          priorities have lower numerical values.
168         * @param relaxLocality
169         *          If true, containers for this request may be assigned on hosts
170         *          and racks other than the ones explicitly requested.
171         * @param nodeLabelsExpression
172         *          Set node labels to allocate resource
173         */
174        public ContainerRequest(Resource capability, String[] nodes,
175            String[] racks, Priority priority, boolean relaxLocality,
176            String nodeLabelsExpression) {
177          // Validate request
178          Preconditions.checkArgument(capability != null,
179              "The Resource to be requested for each container " +
180                  "should not be null ");
181          Preconditions.checkArgument(priority != null,
182              "The priority at which to request containers should not be null ");
183          Preconditions.checkArgument(
184                  !(!relaxLocality && (racks == null || racks.length == 0) 
185                      && (nodes == null || nodes.length == 0)),
186                  "Can't turn off locality relaxation on a " + 
187                  "request with no location constraints");
188          this.capability = capability;
189          this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
190          this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
191          this.priority = priority;
192          this.relaxLocality = relaxLocality;
193          this.nodeLabelsExpression = nodeLabelsExpression;
194        }
195        
196        public Resource getCapability() {
197          return capability;
198        }
199        
200        public List<String> getNodes() {
201          return nodes;
202        }
203        
204        public List<String> getRacks() {
205          return racks;
206        }
207        
208        public Priority getPriority() {
209          return priority;
210        }
211        
212        public boolean getRelaxLocality() {
213          return relaxLocality;
214        }
215        
216        public String getNodeLabelExpression() {
217          return nodeLabelsExpression;
218        }
219        
220        public String toString() {
221          StringBuilder sb = new StringBuilder();
222          sb.append("Capability[").append(capability).append("]");
223          sb.append("Priority[").append(priority).append("]");
224          return sb.toString();
225        }
226      }
227     
228      /**
229       * Register the application master. This must be called before any 
230       * other interaction
231       * @param appHostName Name of the host on which master is running
232       * @param appHostPort Port master is listening on
233       * @param appTrackingUrl URL at which the master info can be seen
234       * @return <code>RegisterApplicationMasterResponse</code>
235       * @throws YarnException
236       * @throws IOException
237       */
238      public abstract RegisterApplicationMasterResponse 
239                   registerApplicationMaster(String appHostName,
240                                             int appHostPort,
241                                             String appTrackingUrl) 
242                   throws YarnException, IOException;
243      
244      /**
245       * Request additional containers and receive new container allocations.
246       * Requests made via <code>addContainerRequest</code> are sent to the
247       * <code>ResourceManager</code>. New containers assigned to the master are
248       * retrieved. Status of completed containers and node health updates are also
249       * retrieved. This also doubles up as a heartbeat to the ResourceManager and
250       * must be made periodically. The call may not always return any new
251       * allocations of containers. App should not make concurrent allocate
252       * requests. May cause request loss.
253       * 
254       * <p>
255       * Note : If the user has not removed container requests that have already
256       * been satisfied, then the re-register may end up sending the entire
257       * container requests to the RM (including matched requests). Which would mean
258       * the RM could end up giving it a lot of new allocated containers.
259       * </p>
260       * 
261       * @param progressIndicator Indicates progress made by the master
262       * @return the response of the allocate request
263       * @throws YarnException
264       * @throws IOException
265       */
266      public abstract AllocateResponse allocate(float progressIndicator) 
267                               throws YarnException, IOException;
268      
269      /**
270       * Unregister the application master. This must be called in the end.
271       * @param appStatus Success/Failure status of the master
272       * @param appMessage Diagnostics message on failure
273       * @param appTrackingUrl New URL to get master info
274       * @throws YarnException
275       * @throws IOException
276       */
277      public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus,
278                                               String appMessage,
279                                               String appTrackingUrl) 
280                   throws YarnException, IOException;
281      
282      /**
283       * Request containers for resources before calling <code>allocate</code>
284       * @param req Resource request
285       */
286      public abstract void addContainerRequest(T req);
287      
288      /**
289       * Remove previous container request. The previous container request may have 
290       * already been sent to the ResourceManager. So even after the remove request 
291       * the app must be prepared to receive an allocation for the previous request 
292       * even after the remove request
293       * @param req Resource request
294       */
295      public abstract void removeContainerRequest(T req);
296      
297      /**
298       * Release containers assigned by the Resource Manager. If the app cannot use
299       * the container or wants to give up the container then it can release them.
300       * The app needs to make new requests for the released resource capability if
301       * it still needs it. eg. it released non-local resources
302       * @param containerId
303       */
304      public abstract void releaseAssignedContainer(ContainerId containerId);
305      
306      /**
307       * Get the currently available resources in the cluster.
308       * A valid value is available after a call to allocate has been made
309       * @return Currently available resources
310       */
311      public abstract Resource getAvailableResources();
312      
313      /**
314       * Get the current number of nodes in the cluster.
315       * A valid values is available after a call to allocate has been made
316       * @return Current number of nodes in the cluster
317       */
318      public abstract int getClusterNodeCount();
319    
320      /**
321       * Get outstanding <code>ContainerRequest</code>s matching the given 
322       * parameters. These ContainerRequests should have been added via
323       * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
324       * the AMRMClient may return its internal collection directly without creating 
325       * a copy. Users should not perform mutable operations on the return value.
326       * Each collection in the list contains requests with identical 
327       * <code>Resource</code> size that fit in the given capability. In a 
328       * collection, requests will be returned in the same order as they were added.
329       * @return Collection of request matching the parameters
330       */
331      public abstract List<? extends Collection<T>> getMatchingRequests(
332                                               Priority priority, 
333                                               String resourceName, 
334                                               Resource capability);
335      
336      /**
337       * Update application's blacklist with addition or removal resources.
338       * 
339       * @param blacklistAdditions list of resources which should be added to the 
340       *        application blacklist
341       * @param blacklistRemovals list of resources which should be removed from the 
342       *        application blacklist
343       */
344      public abstract void updateBlacklist(List<String> blacklistAdditions,
345          List<String> blacklistRemovals);
346    
347      /**
348       * Set the NM token cache for the <code>AMRMClient</code>. This cache must
349       * be shared with the {@link NMClient} used to manage containers for the
350       * <code>AMRMClient</code>
351       * <p/>
352       * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
353       * singleton instance will be used.
354       *
355       * @param nmTokenCache the NM token cache to use.
356       */
357      public void setNMTokenCache(NMTokenCache nmTokenCache) {
358        this.nmTokenCache = nmTokenCache;
359      }
360    
361      /**
362       * Get the NM token cache of the <code>AMRMClient</code>. This cache must be
363       * shared with the {@link NMClient} used to manage containers for the
364       * <code>AMRMClient</code>.
365       * <p/>
366       * If a NM token cache is not set, the {@link NMTokenCache#getSingleton()}
367       * singleton instance will be used.
368       *
369       * @return the NM token cache.
370       */
371      public NMTokenCache getNMTokenCache() {
372        return nmTokenCache;
373      }
374    
375      /**
376       * Wait for <code>check</code> to return true for each 1000 ms.
377       * See also {@link #waitFor(com.google.common.base.Supplier, int)}
378       * and {@link #waitFor(com.google.common.base.Supplier, int, int)}
379       * @param check
380       */
381      public void waitFor(Supplier<Boolean> check) throws InterruptedException {
382        waitFor(check, 1000);
383      }
384    
385      /**
386       * Wait for <code>check</code> to return true for each
387       * <code>checkEveryMillis</code> ms.
388       * See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
389       * @param check user defined checker
390       * @param checkEveryMillis interval to call <code>check</code>
391       */
392      public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
393          throws InterruptedException {
394        waitFor(check, checkEveryMillis, 1);
395      }
396    
397      /**
398       * Wait for <code>check</code> to return true for each
399       * <code>checkEveryMillis</code> ms. In the main loop, this method will log
400       * the message "waiting in main loop" for each <code>logInterval</code> times
401       * iteration to confirm the thread is alive.
402       * @param check user defined checker
403       * @param checkEveryMillis interval to call <code>check</code>
404       * @param logInterval interval to log for each
405       */
406      public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
407          int logInterval) throws InterruptedException {
408        Preconditions.checkNotNull(check, "check should not be null");
409        Preconditions.checkArgument(checkEveryMillis >= 0,
410            "checkEveryMillis should be positive value");
411        Preconditions.checkArgument(logInterval >= 0,
412            "logInterval should be positive value");
413    
414        int loggingCounter = logInterval;
415        do {
416          if (LOG.isDebugEnabled()) {
417            LOG.debug("Check the condition for main loop.");
418          }
419    
420          boolean result = check.get();
421          if (result) {
422            LOG.info("Exits the main loop.");
423            return;
424          }
425          if (--loggingCounter <= 0) {
426            LOG.info("Waiting in main loop.");
427            loggingCounter = logInterval;
428          }
429    
430          Thread.sleep(checkEveryMillis);
431        } while (true);
432      }
433    
434    }