001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.io.compress;
019
020import java.util.*;
021
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.CommonConfigurationKeys;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.util.ReflectionUtils;
030import org.apache.hadoop.util.StringUtils;
031
032/**
033 * A factory that will find the correct codec for a given filename.
034 */
035@InterfaceAudience.Public
036@InterfaceStability.Evolving
037public class CompressionCodecFactory {
038
039  public static final Log LOG =
040    LogFactory.getLog(CompressionCodecFactory.class.getName());
041  
042  private static final ServiceLoader<CompressionCodec> CODEC_PROVIDERS =
043    ServiceLoader.load(CompressionCodec.class);
044
045  /**
046   * A map from the reversed filename suffixes to the codecs.
047   * This is probably overkill, because the maps should be small, but it 
048   * automatically supports finding the longest matching suffix. 
049   */
050  private SortedMap<String, CompressionCodec> codecs = null;
051
052    /**
053     * A map from the reversed filename suffixes to the codecs.
054     * This is probably overkill, because the maps should be small, but it
055     * automatically supports finding the longest matching suffix.
056     */
057    private Map<String, CompressionCodec> codecsByName = null;
058
059  /**
060   * A map from class names to the codecs
061   */
062  private HashMap<String, CompressionCodec> codecsByClassName = null;
063
064  private void addCodec(CompressionCodec codec) {
065    String suffix = codec.getDefaultExtension();
066    codecs.put(new StringBuilder(suffix).reverse().toString(), codec);
067    codecsByClassName.put(codec.getClass().getCanonicalName(), codec);
068
069    String codecName = codec.getClass().getSimpleName();
070    codecsByName.put(StringUtils.toLowerCase(codecName), codec);
071    if (codecName.endsWith("Codec")) {
072      codecName = codecName.substring(0, codecName.length() - "Codec".length());
073      codecsByName.put(StringUtils.toLowerCase(codecName), codec);
074    }
075  }
076
077  /**
078   * Print the extension map out as a string.
079   */
080  @Override
081  public String toString() {
082    StringBuilder buf = new StringBuilder();
083    Iterator<Map.Entry<String, CompressionCodec>> itr = 
084      codecs.entrySet().iterator();
085    buf.append("{ ");
086    if (itr.hasNext()) {
087      Map.Entry<String, CompressionCodec> entry = itr.next();
088      buf.append(entry.getKey());
089      buf.append(": ");
090      buf.append(entry.getValue().getClass().getName());
091      while (itr.hasNext()) {
092        entry = itr.next();
093        buf.append(", ");
094        buf.append(entry.getKey());
095        buf.append(": ");
096        buf.append(entry.getValue().getClass().getName());
097      }
098    }
099    buf.append(" }");
100    return buf.toString();
101  }
102
103  /**
104   * Get the list of codecs discovered via a Java ServiceLoader, or
105   * listed in the configuration. Codecs specified in configuration come
106   * later in the returned list, and are considered to override those
107   * from the ServiceLoader.
108   * @param conf the configuration to look in
109   * @return a list of the {@link CompressionCodec} classes
110   */
111  public static List<Class<? extends CompressionCodec>> getCodecClasses(
112      Configuration conf) {
113    List<Class<? extends CompressionCodec>> result
114      = new ArrayList<Class<? extends CompressionCodec>>();
115    // Add codec classes discovered via service loading
116    synchronized (CODEC_PROVIDERS) {
117      // CODEC_PROVIDERS is a lazy collection. Synchronize so it is
118      // thread-safe. See HADOOP-8406.
119      for (CompressionCodec codec : CODEC_PROVIDERS) {
120        result.add(codec.getClass());
121      }
122    }
123    // Add codec classes from configuration
124    String codecsString = conf.get(
125        CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY);
126    if (codecsString != null) {
127      StringTokenizer codecSplit = new StringTokenizer(codecsString, ",");
128      while (codecSplit.hasMoreElements()) {
129        String codecSubstring = codecSplit.nextToken().trim();
130        if (codecSubstring.length() != 0) {
131          try {
132            Class<?> cls = conf.getClassByName(codecSubstring);
133            if (!CompressionCodec.class.isAssignableFrom(cls)) {
134              throw new IllegalArgumentException("Class " + codecSubstring +
135                                                 " is not a CompressionCodec");
136            }
137            result.add(cls.asSubclass(CompressionCodec.class));
138          } catch (ClassNotFoundException ex) {
139            throw new IllegalArgumentException("Compression codec " + 
140                                               codecSubstring + " not found.",
141                                               ex);
142          }
143        }
144      }
145    }
146    return result;
147  }
148  
149  /**
150   * Sets a list of codec classes in the configuration. In addition to any
151   * classes specified using this method, {@link CompressionCodec} classes on
152   * the classpath are discovered using a Java ServiceLoader.
153   * @param conf the configuration to modify
154   * @param classes the list of classes to set
155   */
156  public static void setCodecClasses(Configuration conf,
157                                     List<Class> classes) {
158    StringBuilder buf = new StringBuilder();
159    Iterator<Class> itr = classes.iterator();
160    if (itr.hasNext()) {
161      Class cls = itr.next();
162      buf.append(cls.getName());
163      while(itr.hasNext()) {
164        buf.append(',');
165        buf.append(itr.next().getName());
166      }
167    }
168    conf.set(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY, buf.toString());
169  }
170  
171  /**
172   * Find the codecs specified in the config value io.compression.codecs 
173   * and register them. Defaults to gzip and deflate.
174   */
175  public CompressionCodecFactory(Configuration conf) {
176    codecs = new TreeMap<String, CompressionCodec>();
177    codecsByClassName = new HashMap<String, CompressionCodec>();
178    codecsByName = new HashMap<String, CompressionCodec>();
179    List<Class<? extends CompressionCodec>> codecClasses =
180        getCodecClasses(conf);
181    if (codecClasses == null || codecClasses.isEmpty()) {
182      addCodec(new GzipCodec());
183      addCodec(new DefaultCodec());      
184    } else {
185      for (Class<? extends CompressionCodec> codecClass : codecClasses) {
186        addCodec(ReflectionUtils.newInstance(codecClass, conf));
187      }
188    }
189  }
190  
191  /**
192   * Find the relevant compression codec for the given file based on its
193   * filename suffix.
194   * @param file the filename to check
195   * @return the codec object
196   */
197  public CompressionCodec getCodec(Path file) {
198    CompressionCodec result = null;
199    if (codecs != null) {
200      String filename = file.getName();
201      String reversedFilename =
202          new StringBuilder(filename).reverse().toString();
203      SortedMap<String, CompressionCodec> subMap = 
204        codecs.headMap(reversedFilename);
205      if (!subMap.isEmpty()) {
206        String potentialSuffix = subMap.lastKey();
207        if (reversedFilename.startsWith(potentialSuffix)) {
208          result = codecs.get(potentialSuffix);
209        }
210      }
211    }
212    return result;
213  }
214  
215  /**
216   * Find the relevant compression codec for the codec's canonical class name.
217   * @param classname the canonical class name of the codec
218   * @return the codec object
219   */
220  public CompressionCodec getCodecByClassName(String classname) {
221    if (codecsByClassName == null) {
222      return null;
223    }
224    return codecsByClassName.get(classname);
225  }
226
227    /**
228     * Find the relevant compression codec for the codec's canonical class name
229     * or by codec alias.
230     * <p/>
231     * Codec aliases are case insensitive.
232     * <p/>
233     * The code alias is the short class name (without the package name).
234     * If the short class name ends with 'Codec', then there are two aliases for
235     * the codec, the complete short class name and the short class name without
236     * the 'Codec' ending. For example for the 'GzipCodec' codec class name the
237     * alias are 'gzip' and 'gzipcodec'.
238     *
239     * @param codecName the canonical class name of the codec
240     * @return the codec object
241     */
242    public CompressionCodec getCodecByName(String codecName) {
243      if (codecsByClassName == null) {
244        return null;
245      }
246      CompressionCodec codec = getCodecByClassName(codecName);
247      if (codec == null) {
248        // trying to get the codec by name in case the name was specified
249        // instead a class
250        codec = codecsByName.get(StringUtils.toLowerCase(codecName));
251      }
252      return codec;
253    }
254
255    /**
256     * Find the relevant compression codec for the codec's canonical class name
257     * or by codec alias and returns its implemetation class.
258     * <p/>
259     * Codec aliases are case insensitive.
260     * <p/>
261     * The code alias is the short class name (without the package name).
262     * If the short class name ends with 'Codec', then there are two aliases for
263     * the codec, the complete short class name and the short class name without
264     * the 'Codec' ending. For example for the 'GzipCodec' codec class name the
265     * alias are 'gzip' and 'gzipcodec'.
266     *
267     * @param codecName the canonical class name of the codec
268     * @return the codec class
269     */
270    public Class<? extends CompressionCodec> getCodecClassByName(
271        String codecName) {
272      CompressionCodec codec = getCodecByName(codecName);
273      if (codec == null) {
274        return null;
275      }
276      return codec.getClass();
277    }
278
279  /**
280   * Removes a suffix from a filename, if it has it.
281   * @param filename the filename to strip
282   * @param suffix the suffix to remove
283   * @return the shortened filename
284   */
285  public static String removeSuffix(String filename, String suffix) {
286    if (filename.endsWith(suffix)) {
287      return filename.substring(0, filename.length() - suffix.length());
288    }
289    return filename;
290  }
291  
292  /**
293   * A little test program.
294   * @param args
295   */
296  public static void main(String[] args) throws Exception {
297    Configuration conf = new Configuration();
298    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
299    boolean encode = false;
300    for(int i=0; i < args.length; ++i) {
301      if ("-in".equals(args[i])) {
302        encode = true;
303      } else if ("-out".equals(args[i])) {
304        encode = false;
305      } else {
306        CompressionCodec codec = factory.getCodec(new Path(args[i]));
307        if (codec == null) {
308          System.out.println("Codec for " + args[i] + " not found.");
309        } else { 
310          if (encode) {
311            CompressionOutputStream out = null;
312            java.io.InputStream in = null;
313            try {
314              out = codec.createOutputStream(
315                  new java.io.FileOutputStream(args[i]));
316              byte[] buffer = new byte[100];
317              String inFilename = removeSuffix(args[i], 
318                  codec.getDefaultExtension());
319              in = new java.io.FileInputStream(inFilename);
320              int len = in.read(buffer);
321              while (len > 0) {
322                out.write(buffer, 0, len);
323                len = in.read(buffer);
324              }
325            } finally {
326              if(out != null) { out.close(); }
327              if(in  != null) { in.close(); }
328            }
329          } else {
330            CompressionInputStream in = null;
331            try {
332              in = codec.createInputStream(
333                  new java.io.FileInputStream(args[i]));
334              byte[] buffer = new byte[100];
335              int len = in.read(buffer);
336              while (len > 0) {
337                System.out.write(buffer, 0, len);
338                len = in.read(buffer);
339              }
340            } finally {
341              if(in != null) { in.close(); }
342            }
343          }
344        }
345      }
346    }
347  }
348}