001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs.s3; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.UnsupportedEncodingException; 023import java.net.URI; 024import java.net.URLDecoder; 025import java.net.URLEncoder; 026import java.util.Set; 027import java.util.TreeSet; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configured; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.util.Tool; 034import org.apache.hadoop.util.ToolRunner; 035import org.jets3t.service.S3Service; 036import org.jets3t.service.S3ServiceException; 037import org.jets3t.service.impl.rest.httpclient.RestS3Service; 038import org.jets3t.service.model.S3Bucket; 039import org.jets3t.service.model.S3Object; 040import org.jets3t.service.security.AWSCredentials; 041 042/** 043 * <p> 044 * This class is a tool for migrating data from an older to a newer version 045 * of an S3 filesystem. 046 * </p> 047 * <p> 048 * All files in the filesystem are migrated by re-writing the block metadata 049 * - no datafiles are touched. 050 * </p> 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Unstable 054public class MigrationTool extends Configured implements Tool { 055 056 private S3Service s3Service; 057 private S3Bucket bucket; 058 059 public static void main(String[] args) throws Exception { 060 int res = ToolRunner.run(new MigrationTool(), args); 061 System.exit(res); 062 } 063 064 public int run(String[] args) throws Exception { 065 066 if (args.length == 0) { 067 System.err.println("Usage: MigrationTool <S3 file system URI>"); 068 System.err.println("\t<S3 file system URI>\tfilesystem to migrate"); 069 ToolRunner.printGenericCommandUsage(System.err); 070 return -1; 071 } 072 073 URI uri = URI.create(args[0]); 074 075 initialize(uri); 076 077 FileSystemStore newStore = new Jets3tFileSystemStore(); 078 newStore.initialize(uri, getConf()); 079 080 if (get("%2F") != null) { 081 System.err.println("Current version number is [unversioned]."); 082 System.err.println("Target version number is " + 083 newStore.getVersion() + "."); 084 Store oldStore = new UnversionedStore(); 085 migrate(oldStore, newStore); 086 return 0; 087 } else { 088 S3Object root = get("/"); 089 if (root != null) { 090 String version = (String) root.getMetadata("fs-version"); 091 if (version == null) { 092 System.err.println("Can't detect version - exiting."); 093 } else { 094 String newVersion = newStore.getVersion(); 095 System.err.println("Current version number is " + version + "."); 096 System.err.println("Target version number is " + newVersion + "."); 097 if (version.equals(newStore.getVersion())) { 098 System.err.println("No migration required."); 099 return 0; 100 } 101 // use version number to create Store 102 //Store oldStore = ... 103 //migrate(oldStore, newStore); 104 System.err.println("Not currently implemented."); 105 return 0; 106 } 107 } 108 System.err.println("Can't detect version - exiting."); 109 return 0; 110 } 111 112 } 113 114 public void initialize(URI uri) throws IOException { 115 116 117 118 try { 119 String accessKey = null; 120 String secretAccessKey = null; 121 String userInfo = uri.getUserInfo(); 122 if (userInfo != null) { 123 int index = userInfo.indexOf(':'); 124 if (index != -1) { 125 accessKey = userInfo.substring(0, index); 126 secretAccessKey = userInfo.substring(index + 1); 127 } else { 128 accessKey = userInfo; 129 } 130 } 131 if (accessKey == null) { 132 accessKey = getConf().get("fs.s3.awsAccessKeyId"); 133 } 134 if (secretAccessKey == null) { 135 secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey"); 136 } 137 if (accessKey == null && secretAccessKey == null) { 138 throw new IllegalArgumentException("AWS " + 139 "Access Key ID and Secret Access Key " + 140 "must be specified as the username " + 141 "or password (respectively) of a s3 URL, " + 142 "or by setting the " + 143 "fs.s3.awsAccessKeyId or " + 144 "fs.s3.awsSecretAccessKey properties (respectively)."); 145 } else if (accessKey == null) { 146 throw new IllegalArgumentException("AWS " + 147 "Access Key ID must be specified " + 148 "as the username of a s3 URL, or by setting the " + 149 "fs.s3.awsAccessKeyId property."); 150 } else if (secretAccessKey == null) { 151 throw new IllegalArgumentException("AWS " + 152 "Secret Access Key must be specified " + 153 "as the password of a s3 URL, or by setting the " + 154 "fs.s3.awsSecretAccessKey property."); 155 } 156 AWSCredentials awsCredentials = 157 new AWSCredentials(accessKey, secretAccessKey); 158 this.s3Service = new RestS3Service(awsCredentials); 159 } catch (S3ServiceException e) { 160 if (e.getCause() instanceof IOException) { 161 throw (IOException) e.getCause(); 162 } 163 throw new S3Exception(e); 164 } 165 bucket = new S3Bucket(uri.getHost()); 166 } 167 168 private void migrate(Store oldStore, FileSystemStore newStore) 169 throws IOException { 170 for (Path path : oldStore.listAllPaths()) { 171 INode inode = oldStore.retrieveINode(path); 172 oldStore.deleteINode(path); 173 newStore.storeINode(path, inode); 174 } 175 } 176 177 private S3Object get(String key) { 178 try { 179 return s3Service.getObject(bucket, key); 180 } catch (S3ServiceException e) { 181 if ("NoSuchKey".equals(e.getS3ErrorCode())) { 182 return null; 183 } 184 } 185 return null; 186 } 187 188 interface Store { 189 190 Set<Path> listAllPaths() throws IOException; 191 INode retrieveINode(Path path) throws IOException; 192 void deleteINode(Path path) throws IOException; 193 194 } 195 196 class UnversionedStore implements Store { 197 198 public Set<Path> listAllPaths() throws IOException { 199 try { 200 String prefix = urlEncode(Path.SEPARATOR); 201 S3Object[] objects = s3Service.listObjects(bucket, prefix, null); 202 Set<Path> prefixes = new TreeSet<Path>(); 203 for (int i = 0; i < objects.length; i++) { 204 prefixes.add(keyToPath(objects[i].getKey())); 205 } 206 return prefixes; 207 } catch (S3ServiceException e) { 208 if (e.getCause() instanceof IOException) { 209 throw (IOException) e.getCause(); 210 } 211 throw new S3Exception(e); 212 } 213 } 214 215 public void deleteINode(Path path) throws IOException { 216 delete(pathToKey(path)); 217 } 218 219 private void delete(String key) throws IOException { 220 try { 221 s3Service.deleteObject(bucket, key); 222 } catch (S3ServiceException e) { 223 if (e.getCause() instanceof IOException) { 224 throw (IOException) e.getCause(); 225 } 226 throw new S3Exception(e); 227 } 228 } 229 230 public INode retrieveINode(Path path) throws IOException { 231 return INode.deserialize(get(pathToKey(path))); 232 } 233 234 private InputStream get(String key) throws IOException { 235 try { 236 S3Object object = s3Service.getObject(bucket, key); 237 return object.getDataInputStream(); 238 } catch (S3ServiceException e) { 239 if ("NoSuchKey".equals(e.getS3ErrorCode())) { 240 return null; 241 } 242 if (e.getCause() instanceof IOException) { 243 throw (IOException) e.getCause(); 244 } 245 throw new S3Exception(e); 246 } 247 } 248 249 private String pathToKey(Path path) { 250 if (!path.isAbsolute()) { 251 throw new IllegalArgumentException("Path must be absolute: " + path); 252 } 253 return urlEncode(path.toUri().getPath()); 254 } 255 256 private Path keyToPath(String key) { 257 return new Path(urlDecode(key)); 258 } 259 260 private String urlEncode(String s) { 261 try { 262 return URLEncoder.encode(s, "UTF-8"); 263 } catch (UnsupportedEncodingException e) { 264 // Should never happen since every implementation of the Java Platform 265 // is required to support UTF-8. 266 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html 267 throw new IllegalStateException(e); 268 } 269 } 270 271 private String urlDecode(String s) { 272 try { 273 return URLDecoder.decode(s, "UTF-8"); 274 } catch (UnsupportedEncodingException e) { 275 // Should never happen since every implementation of the Java Platform 276 // is required to support UTF-8. 277 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html 278 throw new IllegalStateException(e); 279 } 280 } 281 282 } 283 284}