001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs.s3; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.UnsupportedEncodingException; 023import java.net.URI; 024import java.net.URLDecoder; 025import java.net.URLEncoder; 026import java.util.Set; 027import java.util.TreeSet; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configured; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.util.Tool; 034import org.apache.hadoop.util.ToolRunner; 035import org.jets3t.service.S3Service; 036import org.jets3t.service.S3ServiceException; 037import org.jets3t.service.ServiceException; 038import org.jets3t.service.impl.rest.httpclient.RestS3Service; 039import org.jets3t.service.model.S3Bucket; 040import org.jets3t.service.model.S3Object; 041import org.jets3t.service.security.AWSCredentials; 042 043/** 044 * <p> 045 * This class is a tool for migrating data from an older to a newer version 046 * of an S3 filesystem. 047 * </p> 048 * <p> 049 * All files in the filesystem are migrated by re-writing the block metadata 050 * - no datafiles are touched. 051 * </p> 052 */ 053@InterfaceAudience.Public 054@InterfaceStability.Unstable 055@Deprecated 056public class MigrationTool extends Configured implements Tool { 057 058 private S3Service s3Service; 059 private S3Bucket bucket; 060 061 public static void main(String[] args) throws Exception { 062 int res = ToolRunner.run(new MigrationTool(), args); 063 System.exit(res); 064 } 065 066 @Override 067 public int run(String[] args) throws Exception { 068 069 if (args.length == 0) { 070 System.err.println("Usage: MigrationTool <S3 file system URI>"); 071 System.err.println("\t<S3 file system URI>\tfilesystem to migrate"); 072 ToolRunner.printGenericCommandUsage(System.err); 073 return -1; 074 } 075 076 URI uri = URI.create(args[0]); 077 078 initialize(uri); 079 080 FileSystemStore newStore = new Jets3tFileSystemStore(); 081 newStore.initialize(uri, getConf()); 082 083 if (get("%2F") != null) { 084 System.err.println("Current version number is [unversioned]."); 085 System.err.println("Target version number is " + 086 newStore.getVersion() + "."); 087 Store oldStore = new UnversionedStore(); 088 migrate(oldStore, newStore); 089 return 0; 090 } else { 091 S3Object root = get("/"); 092 if (root != null) { 093 String version = (String) root.getMetadata("fs-version"); 094 if (version == null) { 095 System.err.println("Can't detect version - exiting."); 096 } else { 097 String newVersion = newStore.getVersion(); 098 System.err.println("Current version number is " + version + "."); 099 System.err.println("Target version number is " + newVersion + "."); 100 if (version.equals(newStore.getVersion())) { 101 System.err.println("No migration required."); 102 return 0; 103 } 104 // use version number to create Store 105 //Store oldStore = ... 106 //migrate(oldStore, newStore); 107 System.err.println("Not currently implemented."); 108 return 0; 109 } 110 } 111 System.err.println("Can't detect version - exiting."); 112 return 0; 113 } 114 115 } 116 117 public void initialize(URI uri) throws IOException { 118 119 120 121 try { 122 String accessKey = null; 123 String secretAccessKey = null; 124 String userInfo = uri.getUserInfo(); 125 if (userInfo != null) { 126 int index = userInfo.indexOf(':'); 127 if (index != -1) { 128 accessKey = userInfo.substring(0, index); 129 secretAccessKey = userInfo.substring(index + 1); 130 } else { 131 accessKey = userInfo; 132 } 133 } 134 if (accessKey == null) { 135 accessKey = getConf().get("fs.s3.awsAccessKeyId"); 136 } 137 if (secretAccessKey == null) { 138 secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey"); 139 } 140 if (accessKey == null && secretAccessKey == null) { 141 throw new IllegalArgumentException("AWS " + 142 "Access Key ID and Secret Access Key " + 143 "must be specified as the username " + 144 "or password (respectively) of a s3 URL, " + 145 "or by setting the " + 146 "fs.s3.awsAccessKeyId or " + 147 "fs.s3.awsSecretAccessKey properties (respectively)."); 148 } else if (accessKey == null) { 149 throw new IllegalArgumentException("AWS " + 150 "Access Key ID must be specified " + 151 "as the username of a s3 URL, or by setting the " + 152 "fs.s3.awsAccessKeyId property."); 153 } else if (secretAccessKey == null) { 154 throw new IllegalArgumentException("AWS " + 155 "Secret Access Key must be specified " + 156 "as the password of a s3 URL, or by setting the " + 157 "fs.s3.awsSecretAccessKey property."); 158 } 159 AWSCredentials awsCredentials = 160 new AWSCredentials(accessKey, secretAccessKey); 161 this.s3Service = new RestS3Service(awsCredentials); 162 } catch (S3ServiceException e) { 163 if (e.getCause() instanceof IOException) { 164 throw (IOException) e.getCause(); 165 } 166 throw new S3Exception(e); 167 } 168 bucket = new S3Bucket(uri.getHost()); 169 } 170 171 private void migrate(Store oldStore, FileSystemStore newStore) 172 throws IOException { 173 for (Path path : oldStore.listAllPaths()) { 174 INode inode = oldStore.retrieveINode(path); 175 oldStore.deleteINode(path); 176 newStore.storeINode(path, inode); 177 } 178 } 179 180 private S3Object get(String key) { 181 try { 182 return s3Service.getObject(bucket.getName(), key); 183 } catch (S3ServiceException e) { 184 if ("NoSuchKey".equals(e.getS3ErrorCode())) { 185 return null; 186 } 187 } 188 return null; 189 } 190 191 interface Store { 192 193 Set<Path> listAllPaths() throws IOException; 194 INode retrieveINode(Path path) throws IOException; 195 void deleteINode(Path path) throws IOException; 196 197 } 198 199 class UnversionedStore implements Store { 200 201 @Override 202 public Set<Path> listAllPaths() throws IOException { 203 try { 204 String prefix = urlEncode(Path.SEPARATOR); 205 S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null); 206 Set<Path> prefixes = new TreeSet<Path>(); 207 for (int i = 0; i < objects.length; i++) { 208 prefixes.add(keyToPath(objects[i].getKey())); 209 } 210 return prefixes; 211 } catch (S3ServiceException e) { 212 if (e.getCause() instanceof IOException) { 213 throw (IOException) e.getCause(); 214 } 215 throw new S3Exception(e); 216 } 217 } 218 219 @Override 220 public void deleteINode(Path path) throws IOException { 221 delete(pathToKey(path)); 222 } 223 224 private void delete(String key) throws IOException { 225 try { 226 s3Service.deleteObject(bucket, key); 227 } catch (S3ServiceException e) { 228 if (e.getCause() instanceof IOException) { 229 throw (IOException) e.getCause(); 230 } 231 throw new S3Exception(e); 232 } 233 } 234 235 @Override 236 public INode retrieveINode(Path path) throws IOException { 237 return INode.deserialize(get(pathToKey(path))); 238 } 239 240 private InputStream get(String key) throws IOException { 241 try { 242 S3Object object = s3Service.getObject(bucket.getName(), key); 243 return object.getDataInputStream(); 244 } catch (S3ServiceException e) { 245 if ("NoSuchKey".equals(e.getS3ErrorCode())) { 246 return null; 247 } 248 if (e.getCause() instanceof IOException) { 249 throw (IOException) e.getCause(); 250 } 251 throw new S3Exception(e); 252 } catch (ServiceException e) { 253 return null; 254 } 255 } 256 257 private String pathToKey(Path path) { 258 if (!path.isAbsolute()) { 259 throw new IllegalArgumentException("Path must be absolute: " + path); 260 } 261 return urlEncode(path.toUri().getPath()); 262 } 263 264 private Path keyToPath(String key) { 265 return new Path(urlDecode(key)); 266 } 267 268 private String urlEncode(String s) { 269 try { 270 return URLEncoder.encode(s, "UTF-8"); 271 } catch (UnsupportedEncodingException e) { 272 // Should never happen since every implementation of the Java Platform 273 // is required to support UTF-8. 274 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html 275 throw new IllegalStateException(e); 276 } 277 } 278 279 private String urlDecode(String s) { 280 try { 281 return URLDecoder.decode(s, "UTF-8"); 282 } catch (UnsupportedEncodingException e) { 283 // Should never happen since every implementation of the Java Platform 284 // is required to support UTF-8. 285 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html 286 throw new IllegalStateException(e); 287 } 288 } 289 290 } 291 292}