001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs.s3;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.UnsupportedEncodingException;
023import java.net.URI;
024import java.net.URLDecoder;
025import java.net.URLEncoder;
026import java.util.Set;
027import java.util.TreeSet;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configured;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.util.Tool;
034import org.apache.hadoop.util.ToolRunner;
035import org.jets3t.service.S3Service;
036import org.jets3t.service.S3ServiceException;
037import org.jets3t.service.ServiceException;
038import org.jets3t.service.impl.rest.httpclient.RestS3Service;
039import org.jets3t.service.model.S3Bucket;
040import org.jets3t.service.model.S3Object;
041import org.jets3t.service.security.AWSCredentials;
042
043/**
044 * <p>
045 * This class is a tool for migrating data from an older to a newer version
046 * of an S3 filesystem.
047 * </p>
048 * <p>
049 * All files in the filesystem are migrated by re-writing the block metadata
050 * - no datafiles are touched.
051 * </p>
052 */
053@InterfaceAudience.Public
054@InterfaceStability.Unstable
055@Deprecated
056public class MigrationTool extends Configured implements Tool {
057  
058  private S3Service s3Service;
059  private S3Bucket bucket;
060  
061  public static void main(String[] args) throws Exception {
062    int res = ToolRunner.run(new MigrationTool(), args);
063    System.exit(res);
064  }
065  
066  @Override
067  public int run(String[] args) throws Exception {
068    
069    if (args.length == 0) {
070      System.err.println("Usage: MigrationTool <S3 file system URI>");
071      System.err.println("\t<S3 file system URI>\tfilesystem to migrate");
072      ToolRunner.printGenericCommandUsage(System.err);
073      return -1;
074    }
075    
076    URI uri = URI.create(args[0]);
077    
078    initialize(uri);
079    
080    FileSystemStore newStore = new Jets3tFileSystemStore();
081    newStore.initialize(uri, getConf());
082    
083    if (get("%2F") != null) { 
084      System.err.println("Current version number is [unversioned].");
085      System.err.println("Target version number is " +
086          newStore.getVersion() + ".");
087      Store oldStore = new UnversionedStore();
088      migrate(oldStore, newStore);
089      return 0;
090    } else {
091      S3Object root = get("/");
092      if (root != null) {
093        String version = (String) root.getMetadata("fs-version");
094        if (version == null) {
095          System.err.println("Can't detect version - exiting.");
096        } else {
097          String newVersion = newStore.getVersion();
098          System.err.println("Current version number is " + version + ".");
099          System.err.println("Target version number is " + newVersion + ".");
100          if (version.equals(newStore.getVersion())) {
101            System.err.println("No migration required.");
102            return 0;
103          }
104          // use version number to create Store
105          //Store oldStore = ... 
106          //migrate(oldStore, newStore);
107          System.err.println("Not currently implemented.");
108          return 0;
109        }
110      }
111      System.err.println("Can't detect version - exiting.");
112      return 0;
113    }
114    
115  }
116  
117  public void initialize(URI uri) throws IOException {
118    
119    
120    
121    try {
122      String accessKey = null;
123      String secretAccessKey = null;
124      String userInfo = uri.getUserInfo();
125      if (userInfo != null) {
126        int index = userInfo.indexOf(':');
127        if (index != -1) {
128          accessKey = userInfo.substring(0, index);
129          secretAccessKey = userInfo.substring(index + 1);
130        } else {
131          accessKey = userInfo;
132        }
133      }
134      if (accessKey == null) {
135        accessKey = getConf().get("fs.s3.awsAccessKeyId");
136      }
137      if (secretAccessKey == null) {
138        secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey");
139      }
140      if (accessKey == null && secretAccessKey == null) {
141        throw new IllegalArgumentException("AWS " +
142                                           "Access Key ID and Secret Access Key " +
143                                           "must be specified as the username " +
144                                           "or password (respectively) of a s3 URL, " +
145                                           "or by setting the " +
146                                           "fs.s3.awsAccessKeyId or " +                         
147                                           "fs.s3.awsSecretAccessKey properties (respectively).");
148      } else if (accessKey == null) {
149        throw new IllegalArgumentException("AWS " +
150                                           "Access Key ID must be specified " +
151                                           "as the username of a s3 URL, or by setting the " +
152                                           "fs.s3.awsAccessKeyId property.");
153      } else if (secretAccessKey == null) {
154        throw new IllegalArgumentException("AWS " +
155                                           "Secret Access Key must be specified " +
156                                           "as the password of a s3 URL, or by setting the " +
157                                           "fs.s3.awsSecretAccessKey property.");         
158      }
159      AWSCredentials awsCredentials =
160        new AWSCredentials(accessKey, secretAccessKey);
161      this.s3Service = new RestS3Service(awsCredentials);
162    } catch (S3ServiceException e) {
163      if (e.getCause() instanceof IOException) {
164        throw (IOException) e.getCause();
165      }
166      throw new S3Exception(e);
167    }
168    bucket = new S3Bucket(uri.getHost());
169  }
170  
171  private void migrate(Store oldStore, FileSystemStore newStore)
172      throws IOException {
173    for (Path path : oldStore.listAllPaths()) {
174      INode inode = oldStore.retrieveINode(path);
175      oldStore.deleteINode(path);
176      newStore.storeINode(path, inode);
177    }
178  }
179  
180  private S3Object get(String key) {
181    try {
182      return s3Service.getObject(bucket.getName(), key);
183    } catch (S3ServiceException e) {
184      if ("NoSuchKey".equals(e.getS3ErrorCode())) {
185        return null;
186      }
187    }
188    return null;
189  }
190  
191  interface Store {
192
193    Set<Path> listAllPaths() throws IOException;
194    INode retrieveINode(Path path) throws IOException;
195    void deleteINode(Path path) throws IOException;
196    
197  }
198  
199  class UnversionedStore implements Store {
200
201    @Override
202    public Set<Path> listAllPaths() throws IOException {
203      try {
204        String prefix = urlEncode(Path.SEPARATOR);
205        S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
206        Set<Path> prefixes = new TreeSet<Path>();
207        for (int i = 0; i < objects.length; i++) {
208          prefixes.add(keyToPath(objects[i].getKey()));
209        }
210        return prefixes;
211      } catch (S3ServiceException e) {
212        if (e.getCause() instanceof IOException) {
213          throw (IOException) e.getCause();
214        }
215        throw new S3Exception(e);
216      }   
217    }
218
219    @Override
220    public void deleteINode(Path path) throws IOException {
221      delete(pathToKey(path));
222    }
223    
224    private void delete(String key) throws IOException {
225      try {
226        s3Service.deleteObject(bucket, key);
227      } catch (S3ServiceException e) {
228        if (e.getCause() instanceof IOException) {
229          throw (IOException) e.getCause();
230        }
231        throw new S3Exception(e);
232      }
233    }
234    
235    @Override
236    public INode retrieveINode(Path path) throws IOException {
237      return INode.deserialize(get(pathToKey(path)));
238    }
239
240    private InputStream get(String key) throws IOException {
241      try {
242        S3Object object = s3Service.getObject(bucket.getName(), key);
243        return object.getDataInputStream();
244      } catch (S3ServiceException e) {
245        if ("NoSuchKey".equals(e.getS3ErrorCode())) {
246          return null;
247        }
248        if (e.getCause() instanceof IOException) {
249          throw (IOException) e.getCause();
250        }
251        throw new S3Exception(e);
252      } catch (ServiceException e) {
253        return null;
254      }
255    }
256    
257    private String pathToKey(Path path) {
258      if (!path.isAbsolute()) {
259        throw new IllegalArgumentException("Path must be absolute: " + path);
260      }
261      return urlEncode(path.toUri().getPath());
262    }
263    
264    private Path keyToPath(String key) {
265      return new Path(urlDecode(key));
266    }
267
268    private String urlEncode(String s) {
269      try {
270        return URLEncoder.encode(s, "UTF-8");
271      } catch (UnsupportedEncodingException e) {
272        // Should never happen since every implementation of the Java Platform
273        // is required to support UTF-8.
274        // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
275        throw new IllegalStateException(e);
276      }
277    }
278    
279    private String urlDecode(String s) {
280      try {
281        return URLDecoder.decode(s, "UTF-8");
282      } catch (UnsupportedEncodingException e) {
283        // Should never happen since every implementation of the Java Platform
284        // is required to support UTF-8.
285        // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
286        throw new IllegalStateException(e);
287      }
288    }
289    
290  }
291  
292}