001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.filecache;
020    
021    import java.io.File;
022    import java.io.IOException;
023    import java.net.URI;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.hadoop.fs.FileStatus;
029    import org.apache.hadoop.fs.FileSystem;
030    import org.apache.hadoop.fs.Path;
031    import org.apache.hadoop.mapreduce.MRJobConfig;
032    
033    /**
034     * Distribute application-specific large, read-only files efficiently.
035     * 
036     * <p><code>DistributedCache</code> is a facility provided by the Map-Reduce
037     * framework to cache files (text, archives, jars etc.) needed by applications.
038     * </p>
039     * 
040     * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
041     * via the {@link org.apache.hadoop.mapred.JobConf}. The
042     * <code>DistributedCache</code> assumes that the files specified via urls are
043     * already present on the {@link FileSystem} at the path specified by the url
044     * and are accessible by every machine in the cluster.</p>
045     * 
046     * <p>The framework will copy the necessary files on to the slave node before 
047     * any tasks for the job are executed on that node. Its efficiency stems from 
048     * the fact that the files are only copied once per job and the ability to 
049     * cache archives which are un-archived on the slaves.</p> 
050     *
051     * <p><code>DistributedCache</code> can be used to distribute simple, read-only
052     * data/text files and/or more complex types such as archives, jars etc. 
053     * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. 
054     * Jars may be optionally added to the classpath of the tasks, a rudimentary 
055     * software distribution mechanism.  Files have execution permissions.
056     * In older version of Hadoop Map/Reduce users could optionally ask for symlinks
057     * to be created in the working directory of the child task.  In the current 
058     * version symlinks are always created.  If the URL does not have a fragment 
059     * the name of the file or directory will be used. If multiple files or 
060     * directories map to the same link name, the last one added, will be used.  All
061     * others will not even be downloaded.</p>
062     * 
063     * <p><code>DistributedCache</code> tracks modification timestamps of the cache 
064     * files. Clearly the cache files should not be modified by the application 
065     * or externally while the job is executing.</p>
066     * 
067     * <p>Here is an illustrative example on how to use the 
068     * <code>DistributedCache</code>:</p>
069     * <p><blockquote><pre>
070     *     // Setting up the cache for the application
071     *     
072     *     1. Copy the requisite files to the <code>FileSystem</code>:
073     *     
074     *     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat  
075     *     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip  
076     *     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
077     *     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
078     *     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
079     *     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
080     *     
081     *     2. Setup the application's <code>JobConf</code>:
082     *     
083     *     JobConf job = new JobConf();
084     *     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
085     *                                   job);
086     *     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
087     *     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
088     *     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
089     *     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
090     *     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
091     *     
092     *     3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper}
093     *     or {@link org.apache.hadoop.mapred.Reducer}:
094     *     
095     *     public static class MapClass extends MapReduceBase  
096     *     implements Mapper&lt;K, V, K, V&gt; {
097     *     
098     *       private Path[] localArchives;
099     *       private Path[] localFiles;
100     *       
101     *       public void configure(JobConf job) {
102     *         // Get the cached archives/files
103     *         File f = new File("./map.zip/some/file/in/zip.txt");
104     *       }
105     *       
106     *       public void map(K key, V value, 
107     *                       OutputCollector&lt;K, V&gt; output, Reporter reporter) 
108     *       throws IOException {
109     *         // Use data from the cached archives/files here
110     *         // ...
111     *         // ...
112     *         output.collect(k, v);
113     *       }
114     *     }
115     *     
116     * </pre></blockquote></p>
117     *
118     * It is also very common to use the DistributedCache by using
119     * {@link org.apache.hadoop.util.GenericOptionsParser}.
120     *
121     * This class includes methods that should be used by users
122     * (specifically those mentioned in the example above, as well
123     * as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}),
124     * as well as methods intended for use by the MapReduce framework
125     * (e.g., {@link org.apache.hadoop.mapred.JobClient}).
126     *
127     * @see org.apache.hadoop.mapred.JobConf
128     * @see org.apache.hadoop.mapred.JobClient
129     * @see org.apache.hadoop.mapreduce.Job
130     */
131    @SuppressWarnings("deprecation")
132    @InterfaceAudience.Public
133    @InterfaceStability.Stable
134    @Deprecated
135    public class DistributedCache extends
136        org.apache.hadoop.mapreduce.filecache.DistributedCache {
137      /**
138       * Warning: {@link #CACHE_FILES_SIZES} is not a *public* constant.
139       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
140       * use {@link MRJobConfig#CACHE_FILES_SIZES}
141       */
142      @Deprecated
143      public static final String CACHE_FILES_SIZES =
144          "mapred.cache.files.filesizes";
145    
146      /**
147       * Warning: {@link #CACHE_ARCHIVES_SIZES} is not a *public* constant.
148       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
149       * use {@link MRJobConfig#CACHE_ARCHIVES_SIZES}
150       */
151      @Deprecated
152      public static final String CACHE_ARCHIVES_SIZES =
153        "mapred.cache.archives.filesizes";
154    
155      /**
156       * Warning: {@link #CACHE_ARCHIVES_TIMESTAMPS} is not a *public* constant.
157       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
158       * use {@link MRJobConfig#CACHE_ARCHIVES_TIMESTAMPS}
159       */
160      @Deprecated
161      public static final String CACHE_ARCHIVES_TIMESTAMPS =
162          "mapred.cache.archives.timestamps";
163    
164      /**
165       * Warning: {@link #CACHE_FILES_TIMESTAMPS} is not a *public* constant.
166       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
167       * use {@link MRJobConfig#CACHE_FILE_TIMESTAMPS}
168       */
169      @Deprecated
170      public static final String CACHE_FILES_TIMESTAMPS =
171          "mapred.cache.files.timestamps";
172    
173      /**
174       * Warning: {@link #CACHE_ARCHIVES} is not a *public* constant.
175       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
176       * use {@link MRJobConfig#CACHE_ARCHIVES}
177       */
178      @Deprecated
179      public static final String CACHE_ARCHIVES = "mapred.cache.archives";
180    
181      /**
182       * Warning: {@link #CACHE_FILES} is not a *public* constant.
183       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
184       * use {@link MRJobConfig#CACHE_FILES}
185       */
186      @Deprecated
187      public static final String CACHE_FILES = "mapred.cache.files";
188    
189      /**
190       * Warning: {@link #CACHE_LOCALARCHIVES} is not a *public* constant.
191       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
192       * use {@link MRJobConfig#CACHE_LOCALARCHIVES}
193       */
194      @Deprecated
195      public static final String CACHE_LOCALARCHIVES =
196          "mapred.cache.localArchives";
197    
198      /**
199       * Warning: {@link #CACHE_LOCALFILES} is not a *public* constant.
200       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
201       * use {@link MRJobConfig#CACHE_LOCALFILES}
202       */
203      @Deprecated
204      public static final String CACHE_LOCALFILES = "mapred.cache.localFiles";
205    
206      /**
207       * Warning: {@link #CACHE_SYMLINK} is not a *public* constant.
208       * The variable is kept for M/R 1.x applications, M/R 2.x applications should
209       * use {@link MRJobConfig#CACHE_SYMLINK}
210       */
211      @Deprecated
212      public static final String CACHE_SYMLINK = "mapred.create.symlink";
213    
214      /**
215       * Add a archive that has been localized to the conf.  Used
216       * by internal DistributedCache code.
217       * @param conf The conf to modify to contain the localized caches
218       * @param str a comma separated list of local archives
219       */
220      @Deprecated
221      public static void addLocalArchives(Configuration conf, String str) {
222        String archives = conf.get(CACHE_LOCALARCHIVES);
223        conf.set(CACHE_LOCALARCHIVES, archives == null ? str
224            : archives + "," + str);
225      }
226    
227      /**
228       * Add a file that has been localized to the conf..  Used
229       * by internal DistributedCache code.
230       * @param conf The conf to modify to contain the localized caches
231       * @param str a comma separated list of local files
232       */
233      @Deprecated
234      public static void addLocalFiles(Configuration conf, String str) {
235        String files = conf.get(CACHE_LOCALFILES);
236        conf.set(CACHE_LOCALFILES, files == null ? str
237            : files + "," + str);
238      }
239    
240      /**
241       * This method create symlinks for all files in a given dir in another
242       * directory. Currently symlinks cannot be disabled. This is a NO-OP.
243       *
244       * @param conf the configuration
245       * @param jobCacheDir the target directory for creating symlinks
246       * @param workDir the directory in which the symlinks are created
247       * @throws IOException
248       * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
249       * instead.
250       */
251      @Deprecated
252      public static void createAllSymlink(
253          Configuration conf, File jobCacheDir, File workDir)
254        throws IOException{
255        // Do nothing
256      }
257    
258      /**
259       * Returns {@link FileStatus} of a given cache file on hdfs. Internal to
260       * MapReduce.
261       * @param conf configuration
262       * @param cache cache file
263       * @return <code>FileStatus</code> of a given cache file on hdfs
264       * @throws IOException
265       */
266      @Deprecated
267      public static FileStatus getFileStatus(Configuration conf, URI cache)
268        throws IOException {
269        FileSystem fileSystem = FileSystem.get(cache, conf);
270        return fileSystem.getFileStatus(new Path(cache.getPath()));
271      }
272    
273      /**
274       * Returns mtime of a given cache file on hdfs. Internal to MapReduce.
275       * @param conf configuration
276       * @param cache cache file
277       * @return mtime of a given cache file on hdfs
278       * @throws IOException
279       */
280      @Deprecated
281      public static long getTimestamp(Configuration conf, URI cache)
282        throws IOException {
283        return getFileStatus(conf, cache).getModificationTime();
284      }
285    
286      /**
287       * This is to check the timestamp of the archives to be localized.
288       * Used by internal MapReduce code.
289       * @param conf Configuration which stores the timestamp's
290       * @param timestamps comma separated list of timestamps of archives.
291       * The order should be the same as the order in which the archives are added.
292       */
293      @Deprecated
294      public static void setArchiveTimestamps(Configuration conf, String timestamps) {
295        conf.set(CACHE_ARCHIVES_TIMESTAMPS, timestamps);
296      }
297    
298      /**
299       * This is to check the timestamp of the files to be localized.
300       * Used by internal MapReduce code.
301       * @param conf Configuration which stores the timestamp's
302       * @param timestamps comma separated list of timestamps of files.
303       * The order should be the same as the order in which the files are added.
304       */
305      @Deprecated
306      public static void setFileTimestamps(Configuration conf, String timestamps) {
307        conf.set(CACHE_FILES_TIMESTAMPS, timestamps);
308      }
309    
310      /**
311       * Set the conf to contain the location for localized archives.  Used
312       * by internal DistributedCache code.
313       * @param conf The conf to modify to contain the localized caches
314       * @param str a comma separated list of local archives
315       */
316      @Deprecated
317      public static void setLocalArchives(Configuration conf, String str) {
318        conf.set(CACHE_LOCALARCHIVES, str);
319      }
320    
321      /**
322       * Set the conf to contain the location for localized files.  Used
323       * by internal DistributedCache code.
324       * @param conf The conf to modify to contain the localized caches
325       * @param str a comma separated list of local files
326       */
327      @Deprecated
328      public static void setLocalFiles(Configuration conf, String str) {
329        conf.set(CACHE_LOCALFILES, str);
330      }
331    }