001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs.s3;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.UnsupportedEncodingException;
023import java.net.URI;
024import java.net.URLDecoder;
025import java.net.URLEncoder;
026import java.util.Set;
027import java.util.TreeSet;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configured;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.util.Tool;
034import org.apache.hadoop.util.ToolRunner;
035import org.jets3t.service.S3Service;
036import org.jets3t.service.S3ServiceException;
037import org.jets3t.service.ServiceException;
038import org.jets3t.service.impl.rest.httpclient.RestS3Service;
039import org.jets3t.service.model.S3Bucket;
040import org.jets3t.service.model.S3Object;
041import org.jets3t.service.security.AWSCredentials;
042
043/**
044 * <p>
045 * This class is a tool for migrating data from an older to a newer version
046 * of an S3 filesystem.
047 * </p>
048 * <p>
049 * All files in the filesystem are migrated by re-writing the block metadata
050 * - no datafiles are touched.
051 * </p>
052 */
053@InterfaceAudience.Public
054@InterfaceStability.Unstable
055public class MigrationTool extends Configured implements Tool {
056  
057  private S3Service s3Service;
058  private S3Bucket bucket;
059  
060  public static void main(String[] args) throws Exception {
061    int res = ToolRunner.run(new MigrationTool(), args);
062    System.exit(res);
063  }
064  
065  @Override
066  public int run(String[] args) throws Exception {
067    
068    if (args.length == 0) {
069      System.err.println("Usage: MigrationTool <S3 file system URI>");
070      System.err.println("\t<S3 file system URI>\tfilesystem to migrate");
071      ToolRunner.printGenericCommandUsage(System.err);
072      return -1;
073    }
074    
075    URI uri = URI.create(args[0]);
076    
077    initialize(uri);
078    
079    FileSystemStore newStore = new Jets3tFileSystemStore();
080    newStore.initialize(uri, getConf());
081    
082    if (get("%2F") != null) { 
083      System.err.println("Current version number is [unversioned].");
084      System.err.println("Target version number is " +
085          newStore.getVersion() + ".");
086      Store oldStore = new UnversionedStore();
087      migrate(oldStore, newStore);
088      return 0;
089    } else {
090      S3Object root = get("/");
091      if (root != null) {
092        String version = (String) root.getMetadata("fs-version");
093        if (version == null) {
094          System.err.println("Can't detect version - exiting.");
095        } else {
096          String newVersion = newStore.getVersion();
097          System.err.println("Current version number is " + version + ".");
098          System.err.println("Target version number is " + newVersion + ".");
099          if (version.equals(newStore.getVersion())) {
100            System.err.println("No migration required.");
101            return 0;
102          }
103          // use version number to create Store
104          //Store oldStore = ... 
105          //migrate(oldStore, newStore);
106          System.err.println("Not currently implemented.");
107          return 0;
108        }
109      }
110      System.err.println("Can't detect version - exiting.");
111      return 0;
112    }
113    
114  }
115  
116  public void initialize(URI uri) throws IOException {
117    
118    
119    
120    try {
121      String accessKey = null;
122      String secretAccessKey = null;
123      String userInfo = uri.getUserInfo();
124      if (userInfo != null) {
125        int index = userInfo.indexOf(':');
126        if (index != -1) {
127          accessKey = userInfo.substring(0, index);
128          secretAccessKey = userInfo.substring(index + 1);
129        } else {
130          accessKey = userInfo;
131        }
132      }
133      if (accessKey == null) {
134        accessKey = getConf().get("fs.s3.awsAccessKeyId");
135      }
136      if (secretAccessKey == null) {
137        secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey");
138      }
139      if (accessKey == null && secretAccessKey == null) {
140        throw new IllegalArgumentException("AWS " +
141                                           "Access Key ID and Secret Access Key " +
142                                           "must be specified as the username " +
143                                           "or password (respectively) of a s3 URL, " +
144                                           "or by setting the " +
145                                           "fs.s3.awsAccessKeyId or " +                         
146                                           "fs.s3.awsSecretAccessKey properties (respectively).");
147      } else if (accessKey == null) {
148        throw new IllegalArgumentException("AWS " +
149                                           "Access Key ID must be specified " +
150                                           "as the username of a s3 URL, or by setting the " +
151                                           "fs.s3.awsAccessKeyId property.");
152      } else if (secretAccessKey == null) {
153        throw new IllegalArgumentException("AWS " +
154                                           "Secret Access Key must be specified " +
155                                           "as the password of a s3 URL, or by setting the " +
156                                           "fs.s3.awsSecretAccessKey property.");         
157      }
158      AWSCredentials awsCredentials =
159        new AWSCredentials(accessKey, secretAccessKey);
160      this.s3Service = new RestS3Service(awsCredentials);
161    } catch (S3ServiceException e) {
162      if (e.getCause() instanceof IOException) {
163        throw (IOException) e.getCause();
164      }
165      throw new S3Exception(e);
166    }
167    bucket = new S3Bucket(uri.getHost());
168  }
169  
170  private void migrate(Store oldStore, FileSystemStore newStore)
171      throws IOException {
172    for (Path path : oldStore.listAllPaths()) {
173      INode inode = oldStore.retrieveINode(path);
174      oldStore.deleteINode(path);
175      newStore.storeINode(path, inode);
176    }
177  }
178  
179  private S3Object get(String key) {
180    try {
181      return s3Service.getObject(bucket.getName(), key);
182    } catch (S3ServiceException e) {
183      if ("NoSuchKey".equals(e.getS3ErrorCode())) {
184        return null;
185      }
186    }
187    return null;
188  }
189  
190  interface Store {
191
192    Set<Path> listAllPaths() throws IOException;
193    INode retrieveINode(Path path) throws IOException;
194    void deleteINode(Path path) throws IOException;
195    
196  }
197  
198  class UnversionedStore implements Store {
199
200    @Override
201    public Set<Path> listAllPaths() throws IOException {
202      try {
203        String prefix = urlEncode(Path.SEPARATOR);
204        S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
205        Set<Path> prefixes = new TreeSet<Path>();
206        for (int i = 0; i < objects.length; i++) {
207          prefixes.add(keyToPath(objects[i].getKey()));
208        }
209        return prefixes;
210      } catch (S3ServiceException e) {
211        if (e.getCause() instanceof IOException) {
212          throw (IOException) e.getCause();
213        }
214        throw new S3Exception(e);
215      }   
216    }
217
218    @Override
219    public void deleteINode(Path path) throws IOException {
220      delete(pathToKey(path));
221    }
222    
223    private void delete(String key) throws IOException {
224      try {
225        s3Service.deleteObject(bucket, key);
226      } catch (S3ServiceException e) {
227        if (e.getCause() instanceof IOException) {
228          throw (IOException) e.getCause();
229        }
230        throw new S3Exception(e);
231      }
232    }
233    
234    @Override
235    public INode retrieveINode(Path path) throws IOException {
236      return INode.deserialize(get(pathToKey(path)));
237    }
238
239    private InputStream get(String key) throws IOException {
240      try {
241        S3Object object = s3Service.getObject(bucket.getName(), key);
242        return object.getDataInputStream();
243      } catch (S3ServiceException e) {
244        if ("NoSuchKey".equals(e.getS3ErrorCode())) {
245          return null;
246        }
247        if (e.getCause() instanceof IOException) {
248          throw (IOException) e.getCause();
249        }
250        throw new S3Exception(e);
251      } catch (ServiceException e) {
252        return null;
253      }
254    }
255    
256    private String pathToKey(Path path) {
257      if (!path.isAbsolute()) {
258        throw new IllegalArgumentException("Path must be absolute: " + path);
259      }
260      return urlEncode(path.toUri().getPath());
261    }
262    
263    private Path keyToPath(String key) {
264      return new Path(urlDecode(key));
265    }
266
267    private String urlEncode(String s) {
268      try {
269        return URLEncoder.encode(s, "UTF-8");
270      } catch (UnsupportedEncodingException e) {
271        // Should never happen since every implementation of the Java Platform
272        // is required to support UTF-8.
273        // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
274        throw new IllegalStateException(e);
275      }
276    }
277    
278    private String urlDecode(String s) {
279      try {
280        return URLDecoder.decode(s, "UTF-8");
281      } catch (UnsupportedEncodingException e) {
282        // Should never happen since every implementation of the Java Platform
283        // is required to support UTF-8.
284        // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
285        throw new IllegalStateException(e);
286      }
287    }
288    
289  }
290  
291}