001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs.s3;
019    
020    import java.io.IOException;
021    import java.io.InputStream;
022    import java.io.UnsupportedEncodingException;
023    import java.net.URI;
024    import java.net.URLDecoder;
025    import java.net.URLEncoder;
026    import java.util.Set;
027    import java.util.TreeSet;
028    
029    import org.apache.hadoop.classification.InterfaceAudience;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.conf.Configured;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.hadoop.util.Tool;
034    import org.apache.hadoop.util.ToolRunner;
035    import org.jets3t.service.S3Service;
036    import org.jets3t.service.S3ServiceException;
037    import org.jets3t.service.ServiceException;
038    import org.jets3t.service.impl.rest.httpclient.RestS3Service;
039    import org.jets3t.service.model.S3Bucket;
040    import org.jets3t.service.model.S3Object;
041    import org.jets3t.service.security.AWSCredentials;
042    
043    /**
044     * <p>
045     * This class is a tool for migrating data from an older to a newer version
046     * of an S3 filesystem.
047     * </p>
048     * <p>
049     * All files in the filesystem are migrated by re-writing the block metadata
050     * - no datafiles are touched.
051     * </p>
052     */
053    @InterfaceAudience.Public
054    @InterfaceStability.Unstable
055    public class MigrationTool extends Configured implements Tool {
056      
057      private S3Service s3Service;
058      private S3Bucket bucket;
059      
060      public static void main(String[] args) throws Exception {
061        int res = ToolRunner.run(new MigrationTool(), args);
062        System.exit(res);
063      }
064      
065      @Override
066      public int run(String[] args) throws Exception {
067        
068        if (args.length == 0) {
069          System.err.println("Usage: MigrationTool <S3 file system URI>");
070          System.err.println("\t<S3 file system URI>\tfilesystem to migrate");
071          ToolRunner.printGenericCommandUsage(System.err);
072          return -1;
073        }
074        
075        URI uri = URI.create(args[0]);
076        
077        initialize(uri);
078        
079        FileSystemStore newStore = new Jets3tFileSystemStore();
080        newStore.initialize(uri, getConf());
081        
082        if (get("%2F") != null) { 
083          System.err.println("Current version number is [unversioned].");
084          System.err.println("Target version number is " +
085              newStore.getVersion() + ".");
086          Store oldStore = new UnversionedStore();
087          migrate(oldStore, newStore);
088          return 0;
089        } else {
090          S3Object root = get("/");
091          if (root != null) {
092            String version = (String) root.getMetadata("fs-version");
093            if (version == null) {
094              System.err.println("Can't detect version - exiting.");
095            } else {
096              String newVersion = newStore.getVersion();
097              System.err.println("Current version number is " + version + ".");
098              System.err.println("Target version number is " + newVersion + ".");
099              if (version.equals(newStore.getVersion())) {
100                System.err.println("No migration required.");
101                return 0;
102              }
103              // use version number to create Store
104              //Store oldStore = ... 
105              //migrate(oldStore, newStore);
106              System.err.println("Not currently implemented.");
107              return 0;
108            }
109          }
110          System.err.println("Can't detect version - exiting.");
111          return 0;
112        }
113        
114      }
115      
116      public void initialize(URI uri) throws IOException {
117        
118        
119        
120        try {
121          String accessKey = null;
122          String secretAccessKey = null;
123          String userInfo = uri.getUserInfo();
124          if (userInfo != null) {
125            int index = userInfo.indexOf(':');
126            if (index != -1) {
127              accessKey = userInfo.substring(0, index);
128              secretAccessKey = userInfo.substring(index + 1);
129            } else {
130              accessKey = userInfo;
131            }
132          }
133          if (accessKey == null) {
134            accessKey = getConf().get("fs.s3.awsAccessKeyId");
135          }
136          if (secretAccessKey == null) {
137            secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey");
138          }
139          if (accessKey == null && secretAccessKey == null) {
140            throw new IllegalArgumentException("AWS " +
141                                               "Access Key ID and Secret Access Key " +
142                                               "must be specified as the username " +
143                                               "or password (respectively) of a s3 URL, " +
144                                               "or by setting the " +
145                                               "fs.s3.awsAccessKeyId or " +                         
146                                               "fs.s3.awsSecretAccessKey properties (respectively).");
147          } else if (accessKey == null) {
148            throw new IllegalArgumentException("AWS " +
149                                               "Access Key ID must be specified " +
150                                               "as the username of a s3 URL, or by setting the " +
151                                               "fs.s3.awsAccessKeyId property.");
152          } else if (secretAccessKey == null) {
153            throw new IllegalArgumentException("AWS " +
154                                               "Secret Access Key must be specified " +
155                                               "as the password of a s3 URL, or by setting the " +
156                                               "fs.s3.awsSecretAccessKey property.");         
157          }
158          AWSCredentials awsCredentials =
159            new AWSCredentials(accessKey, secretAccessKey);
160          this.s3Service = new RestS3Service(awsCredentials);
161        } catch (S3ServiceException e) {
162          if (e.getCause() instanceof IOException) {
163            throw (IOException) e.getCause();
164          }
165          throw new S3Exception(e);
166        }
167        bucket = new S3Bucket(uri.getHost());
168      }
169      
170      private void migrate(Store oldStore, FileSystemStore newStore)
171          throws IOException {
172        for (Path path : oldStore.listAllPaths()) {
173          INode inode = oldStore.retrieveINode(path);
174          oldStore.deleteINode(path);
175          newStore.storeINode(path, inode);
176        }
177      }
178      
179      private S3Object get(String key) {
180        try {
181          return s3Service.getObject(bucket.getName(), key);
182        } catch (S3ServiceException e) {
183          if ("NoSuchKey".equals(e.getS3ErrorCode())) {
184            return null;
185          }
186        }
187        return null;
188      }
189      
190      interface Store {
191    
192        Set<Path> listAllPaths() throws IOException;
193        INode retrieveINode(Path path) throws IOException;
194        void deleteINode(Path path) throws IOException;
195        
196      }
197      
198      class UnversionedStore implements Store {
199    
200        @Override
201        public Set<Path> listAllPaths() throws IOException {
202          try {
203            String prefix = urlEncode(Path.SEPARATOR);
204            S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
205            Set<Path> prefixes = new TreeSet<Path>();
206            for (int i = 0; i < objects.length; i++) {
207              prefixes.add(keyToPath(objects[i].getKey()));
208            }
209            return prefixes;
210          } catch (S3ServiceException e) {
211            if (e.getCause() instanceof IOException) {
212              throw (IOException) e.getCause();
213            }
214            throw new S3Exception(e);
215          }   
216        }
217    
218        @Override
219        public void deleteINode(Path path) throws IOException {
220          delete(pathToKey(path));
221        }
222        
223        private void delete(String key) throws IOException {
224          try {
225            s3Service.deleteObject(bucket, key);
226          } catch (S3ServiceException e) {
227            if (e.getCause() instanceof IOException) {
228              throw (IOException) e.getCause();
229            }
230            throw new S3Exception(e);
231          }
232        }
233        
234        @Override
235        public INode retrieveINode(Path path) throws IOException {
236          return INode.deserialize(get(pathToKey(path)));
237        }
238    
239        private InputStream get(String key) throws IOException {
240          try {
241            S3Object object = s3Service.getObject(bucket.getName(), key);
242            return object.getDataInputStream();
243          } catch (S3ServiceException e) {
244            if ("NoSuchKey".equals(e.getS3ErrorCode())) {
245              return null;
246            }
247            if (e.getCause() instanceof IOException) {
248              throw (IOException) e.getCause();
249            }
250            throw new S3Exception(e);
251          } catch (ServiceException e) {
252            return null;
253          }
254        }
255        
256        private String pathToKey(Path path) {
257          if (!path.isAbsolute()) {
258            throw new IllegalArgumentException("Path must be absolute: " + path);
259          }
260          return urlEncode(path.toUri().getPath());
261        }
262        
263        private Path keyToPath(String key) {
264          return new Path(urlDecode(key));
265        }
266    
267        private String urlEncode(String s) {
268          try {
269            return URLEncoder.encode(s, "UTF-8");
270          } catch (UnsupportedEncodingException e) {
271            // Should never happen since every implementation of the Java Platform
272            // is required to support UTF-8.
273            // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
274            throw new IllegalStateException(e);
275          }
276        }
277        
278        private String urlDecode(String s) {
279          try {
280            return URLDecoder.decode(s, "UTF-8");
281          } catch (UnsupportedEncodingException e) {
282            // Should never happen since every implementation of the Java Platform
283            // is required to support UTF-8.
284            // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
285            throw new IllegalStateException(e);
286          }
287        }
288        
289      }
290      
291    }