001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs.s3;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.UnsupportedEncodingException;
023import java.net.URI;
024import java.net.URLDecoder;
025import java.net.URLEncoder;
026import java.util.Set;
027import java.util.TreeSet;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configured;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.util.Tool;
034import org.apache.hadoop.util.ToolRunner;
035import org.jets3t.service.S3Service;
036import org.jets3t.service.S3ServiceException;
037import org.jets3t.service.impl.rest.httpclient.RestS3Service;
038import org.jets3t.service.model.S3Bucket;
039import org.jets3t.service.model.S3Object;
040import org.jets3t.service.security.AWSCredentials;
041
042/**
043 * <p>
044 * This class is a tool for migrating data from an older to a newer version
045 * of an S3 filesystem.
046 * </p>
047 * <p>
048 * All files in the filesystem are migrated by re-writing the block metadata
049 * - no datafiles are touched.
050 * </p>
051 */
052@InterfaceAudience.Public
053@InterfaceStability.Unstable
054public class MigrationTool extends Configured implements Tool {
055  
056  private S3Service s3Service;
057  private S3Bucket bucket;
058  
059  public static void main(String[] args) throws Exception {
060    int res = ToolRunner.run(new MigrationTool(), args);
061    System.exit(res);
062  }
063  
064  public int run(String[] args) throws Exception {
065    
066    if (args.length == 0) {
067      System.err.println("Usage: MigrationTool <S3 file system URI>");
068      System.err.println("\t<S3 file system URI>\tfilesystem to migrate");
069      ToolRunner.printGenericCommandUsage(System.err);
070      return -1;
071    }
072    
073    URI uri = URI.create(args[0]);
074    
075    initialize(uri);
076    
077    FileSystemStore newStore = new Jets3tFileSystemStore();
078    newStore.initialize(uri, getConf());
079    
080    if (get("%2F") != null) { 
081      System.err.println("Current version number is [unversioned].");
082      System.err.println("Target version number is " +
083          newStore.getVersion() + ".");
084      Store oldStore = new UnversionedStore();
085      migrate(oldStore, newStore);
086      return 0;
087    } else {
088      S3Object root = get("/");
089      if (root != null) {
090        String version = (String) root.getMetadata("fs-version");
091        if (version == null) {
092          System.err.println("Can't detect version - exiting.");
093        } else {
094          String newVersion = newStore.getVersion();
095          System.err.println("Current version number is " + version + ".");
096          System.err.println("Target version number is " + newVersion + ".");
097          if (version.equals(newStore.getVersion())) {
098            System.err.println("No migration required.");
099            return 0;
100          }
101          // use version number to create Store
102          //Store oldStore = ... 
103          //migrate(oldStore, newStore);
104          System.err.println("Not currently implemented.");
105          return 0;
106        }
107      }
108      System.err.println("Can't detect version - exiting.");
109      return 0;
110    }
111    
112  }
113  
114  public void initialize(URI uri) throws IOException {
115    
116    
117    
118    try {
119      String accessKey = null;
120      String secretAccessKey = null;
121      String userInfo = uri.getUserInfo();
122      if (userInfo != null) {
123        int index = userInfo.indexOf(':');
124        if (index != -1) {
125          accessKey = userInfo.substring(0, index);
126          secretAccessKey = userInfo.substring(index + 1);
127        } else {
128          accessKey = userInfo;
129        }
130      }
131      if (accessKey == null) {
132        accessKey = getConf().get("fs.s3.awsAccessKeyId");
133      }
134      if (secretAccessKey == null) {
135        secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey");
136      }
137      if (accessKey == null && secretAccessKey == null) {
138        throw new IllegalArgumentException("AWS " +
139                                           "Access Key ID and Secret Access Key " +
140                                           "must be specified as the username " +
141                                           "or password (respectively) of a s3 URL, " +
142                                           "or by setting the " +
143                                           "fs.s3.awsAccessKeyId or " +                         
144                                           "fs.s3.awsSecretAccessKey properties (respectively).");
145      } else if (accessKey == null) {
146        throw new IllegalArgumentException("AWS " +
147                                           "Access Key ID must be specified " +
148                                           "as the username of a s3 URL, or by setting the " +
149                                           "fs.s3.awsAccessKeyId property.");
150      } else if (secretAccessKey == null) {
151        throw new IllegalArgumentException("AWS " +
152                                           "Secret Access Key must be specified " +
153                                           "as the password of a s3 URL, or by setting the " +
154                                           "fs.s3.awsSecretAccessKey property.");         
155      }
156      AWSCredentials awsCredentials =
157        new AWSCredentials(accessKey, secretAccessKey);
158      this.s3Service = new RestS3Service(awsCredentials);
159    } catch (S3ServiceException e) {
160      if (e.getCause() instanceof IOException) {
161        throw (IOException) e.getCause();
162      }
163      throw new S3Exception(e);
164    }
165    bucket = new S3Bucket(uri.getHost());
166  }
167  
168  private void migrate(Store oldStore, FileSystemStore newStore)
169      throws IOException {
170    for (Path path : oldStore.listAllPaths()) {
171      INode inode = oldStore.retrieveINode(path);
172      oldStore.deleteINode(path);
173      newStore.storeINode(path, inode);
174    }
175  }
176  
177  private S3Object get(String key) {
178    try {
179      return s3Service.getObject(bucket, key);
180    } catch (S3ServiceException e) {
181      if ("NoSuchKey".equals(e.getS3ErrorCode())) {
182        return null;
183      }
184    }
185    return null;
186  }
187  
188  interface Store {
189
190    Set<Path> listAllPaths() throws IOException;
191    INode retrieveINode(Path path) throws IOException;
192    void deleteINode(Path path) throws IOException;
193    
194  }
195  
196  class UnversionedStore implements Store {
197
198    public Set<Path> listAllPaths() throws IOException {
199      try {
200        String prefix = urlEncode(Path.SEPARATOR);
201        S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
202        Set<Path> prefixes = new TreeSet<Path>();
203        for (int i = 0; i < objects.length; i++) {
204          prefixes.add(keyToPath(objects[i].getKey()));
205        }
206        return prefixes;
207      } catch (S3ServiceException e) {
208        if (e.getCause() instanceof IOException) {
209          throw (IOException) e.getCause();
210        }
211        throw new S3Exception(e);
212      }   
213    }
214
215    public void deleteINode(Path path) throws IOException {
216      delete(pathToKey(path));
217    }
218    
219    private void delete(String key) throws IOException {
220      try {
221        s3Service.deleteObject(bucket, key);
222      } catch (S3ServiceException e) {
223        if (e.getCause() instanceof IOException) {
224          throw (IOException) e.getCause();
225        }
226        throw new S3Exception(e);
227      }
228    }
229    
230    public INode retrieveINode(Path path) throws IOException {
231      return INode.deserialize(get(pathToKey(path)));
232    }
233
234    private InputStream get(String key) throws IOException {
235      try {
236        S3Object object = s3Service.getObject(bucket, key);
237        return object.getDataInputStream();
238      } catch (S3ServiceException e) {
239        if ("NoSuchKey".equals(e.getS3ErrorCode())) {
240          return null;
241        }
242        if (e.getCause() instanceof IOException) {
243          throw (IOException) e.getCause();
244        }
245        throw new S3Exception(e);
246      }
247    }
248    
249    private String pathToKey(Path path) {
250      if (!path.isAbsolute()) {
251        throw new IllegalArgumentException("Path must be absolute: " + path);
252      }
253      return urlEncode(path.toUri().getPath());
254    }
255    
256    private Path keyToPath(String key) {
257      return new Path(urlDecode(key));
258    }
259
260    private String urlEncode(String s) {
261      try {
262        return URLEncoder.encode(s, "UTF-8");
263      } catch (UnsupportedEncodingException e) {
264        // Should never happen since every implementation of the Java Platform
265        // is required to support UTF-8.
266        // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
267        throw new IllegalStateException(e);
268      }
269    }
270    
271    private String urlDecode(String s) {
272      try {
273        return URLDecoder.decode(s, "UTF-8");
274      } catch (UnsupportedEncodingException e) {
275        // Should never happen since every implementation of the Java Platform
276        // is required to support UTF-8.
277        // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
278        throw new IllegalStateException(e);
279      }
280    }
281    
282  }
283  
284}