001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.fs.s3; 019 020 import java.io.IOException; 021 import java.io.InputStream; 022 import java.io.UnsupportedEncodingException; 023 import java.net.URI; 024 import java.net.URLDecoder; 025 import java.net.URLEncoder; 026 import java.util.Set; 027 import java.util.TreeSet; 028 029 import org.apache.hadoop.classification.InterfaceAudience; 030 import org.apache.hadoop.classification.InterfaceStability; 031 import org.apache.hadoop.conf.Configured; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.hadoop.util.Tool; 034 import org.apache.hadoop.util.ToolRunner; 035 import org.jets3t.service.S3Service; 036 import org.jets3t.service.S3ServiceException; 037 import org.jets3t.service.ServiceException; 038 import org.jets3t.service.impl.rest.httpclient.RestS3Service; 039 import org.jets3t.service.model.S3Bucket; 040 import org.jets3t.service.model.S3Object; 041 import org.jets3t.service.security.AWSCredentials; 042 043 /** 044 * <p> 045 * This class is a tool for migrating data from an older to a newer version 046 * of an S3 filesystem. 047 * </p> 048 * <p> 049 * All files in the filesystem are migrated by re-writing the block metadata 050 * - no datafiles are touched. 051 * </p> 052 */ 053 @InterfaceAudience.Public 054 @InterfaceStability.Unstable 055 public class MigrationTool extends Configured implements Tool { 056 057 private S3Service s3Service; 058 private S3Bucket bucket; 059 060 public static void main(String[] args) throws Exception { 061 int res = ToolRunner.run(new MigrationTool(), args); 062 System.exit(res); 063 } 064 065 @Override 066 public int run(String[] args) throws Exception { 067 068 if (args.length == 0) { 069 System.err.println("Usage: MigrationTool <S3 file system URI>"); 070 System.err.println("\t<S3 file system URI>\tfilesystem to migrate"); 071 ToolRunner.printGenericCommandUsage(System.err); 072 return -1; 073 } 074 075 URI uri = URI.create(args[0]); 076 077 initialize(uri); 078 079 FileSystemStore newStore = new Jets3tFileSystemStore(); 080 newStore.initialize(uri, getConf()); 081 082 if (get("%2F") != null) { 083 System.err.println("Current version number is [unversioned]."); 084 System.err.println("Target version number is " + 085 newStore.getVersion() + "."); 086 Store oldStore = new UnversionedStore(); 087 migrate(oldStore, newStore); 088 return 0; 089 } else { 090 S3Object root = get("/"); 091 if (root != null) { 092 String version = (String) root.getMetadata("fs-version"); 093 if (version == null) { 094 System.err.println("Can't detect version - exiting."); 095 } else { 096 String newVersion = newStore.getVersion(); 097 System.err.println("Current version number is " + version + "."); 098 System.err.println("Target version number is " + newVersion + "."); 099 if (version.equals(newStore.getVersion())) { 100 System.err.println("No migration required."); 101 return 0; 102 } 103 // use version number to create Store 104 //Store oldStore = ... 105 //migrate(oldStore, newStore); 106 System.err.println("Not currently implemented."); 107 return 0; 108 } 109 } 110 System.err.println("Can't detect version - exiting."); 111 return 0; 112 } 113 114 } 115 116 public void initialize(URI uri) throws IOException { 117 118 119 120 try { 121 String accessKey = null; 122 String secretAccessKey = null; 123 String userInfo = uri.getUserInfo(); 124 if (userInfo != null) { 125 int index = userInfo.indexOf(':'); 126 if (index != -1) { 127 accessKey = userInfo.substring(0, index); 128 secretAccessKey = userInfo.substring(index + 1); 129 } else { 130 accessKey = userInfo; 131 } 132 } 133 if (accessKey == null) { 134 accessKey = getConf().get("fs.s3.awsAccessKeyId"); 135 } 136 if (secretAccessKey == null) { 137 secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey"); 138 } 139 if (accessKey == null && secretAccessKey == null) { 140 throw new IllegalArgumentException("AWS " + 141 "Access Key ID and Secret Access Key " + 142 "must be specified as the username " + 143 "or password (respectively) of a s3 URL, " + 144 "or by setting the " + 145 "fs.s3.awsAccessKeyId or " + 146 "fs.s3.awsSecretAccessKey properties (respectively)."); 147 } else if (accessKey == null) { 148 throw new IllegalArgumentException("AWS " + 149 "Access Key ID must be specified " + 150 "as the username of a s3 URL, or by setting the " + 151 "fs.s3.awsAccessKeyId property."); 152 } else if (secretAccessKey == null) { 153 throw new IllegalArgumentException("AWS " + 154 "Secret Access Key must be specified " + 155 "as the password of a s3 URL, or by setting the " + 156 "fs.s3.awsSecretAccessKey property."); 157 } 158 AWSCredentials awsCredentials = 159 new AWSCredentials(accessKey, secretAccessKey); 160 this.s3Service = new RestS3Service(awsCredentials); 161 } catch (S3ServiceException e) { 162 if (e.getCause() instanceof IOException) { 163 throw (IOException) e.getCause(); 164 } 165 throw new S3Exception(e); 166 } 167 bucket = new S3Bucket(uri.getHost()); 168 } 169 170 private void migrate(Store oldStore, FileSystemStore newStore) 171 throws IOException { 172 for (Path path : oldStore.listAllPaths()) { 173 INode inode = oldStore.retrieveINode(path); 174 oldStore.deleteINode(path); 175 newStore.storeINode(path, inode); 176 } 177 } 178 179 private S3Object get(String key) { 180 try { 181 return s3Service.getObject(bucket.getName(), key); 182 } catch (S3ServiceException e) { 183 if ("NoSuchKey".equals(e.getS3ErrorCode())) { 184 return null; 185 } 186 } 187 return null; 188 } 189 190 interface Store { 191 192 Set<Path> listAllPaths() throws IOException; 193 INode retrieveINode(Path path) throws IOException; 194 void deleteINode(Path path) throws IOException; 195 196 } 197 198 class UnversionedStore implements Store { 199 200 @Override 201 public Set<Path> listAllPaths() throws IOException { 202 try { 203 String prefix = urlEncode(Path.SEPARATOR); 204 S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null); 205 Set<Path> prefixes = new TreeSet<Path>(); 206 for (int i = 0; i < objects.length; i++) { 207 prefixes.add(keyToPath(objects[i].getKey())); 208 } 209 return prefixes; 210 } catch (S3ServiceException e) { 211 if (e.getCause() instanceof IOException) { 212 throw (IOException) e.getCause(); 213 } 214 throw new S3Exception(e); 215 } 216 } 217 218 @Override 219 public void deleteINode(Path path) throws IOException { 220 delete(pathToKey(path)); 221 } 222 223 private void delete(String key) throws IOException { 224 try { 225 s3Service.deleteObject(bucket, key); 226 } catch (S3ServiceException e) { 227 if (e.getCause() instanceof IOException) { 228 throw (IOException) e.getCause(); 229 } 230 throw new S3Exception(e); 231 } 232 } 233 234 @Override 235 public INode retrieveINode(Path path) throws IOException { 236 return INode.deserialize(get(pathToKey(path))); 237 } 238 239 private InputStream get(String key) throws IOException { 240 try { 241 S3Object object = s3Service.getObject(bucket.getName(), key); 242 return object.getDataInputStream(); 243 } catch (S3ServiceException e) { 244 if ("NoSuchKey".equals(e.getS3ErrorCode())) { 245 return null; 246 } 247 if (e.getCause() instanceof IOException) { 248 throw (IOException) e.getCause(); 249 } 250 throw new S3Exception(e); 251 } catch (ServiceException e) { 252 return null; 253 } 254 } 255 256 private String pathToKey(Path path) { 257 if (!path.isAbsolute()) { 258 throw new IllegalArgumentException("Path must be absolute: " + path); 259 } 260 return urlEncode(path.toUri().getPath()); 261 } 262 263 private Path keyToPath(String key) { 264 return new Path(urlDecode(key)); 265 } 266 267 private String urlEncode(String s) { 268 try { 269 return URLEncoder.encode(s, "UTF-8"); 270 } catch (UnsupportedEncodingException e) { 271 // Should never happen since every implementation of the Java Platform 272 // is required to support UTF-8. 273 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html 274 throw new IllegalStateException(e); 275 } 276 } 277 278 private String urlDecode(String s) { 279 try { 280 return URLDecoder.decode(s, "UTF-8"); 281 } catch (UnsupportedEncodingException e) { 282 // Should never happen since every implementation of the Java Platform 283 // is required to support UTF-8. 284 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html 285 throw new IllegalStateException(e); 286 } 287 } 288 289 } 290 291 }