001/** 002 * 003 * Licensed under the Apache License, Version 2.0 004 * (the "License"); you may not use this file except in compliance with 005 * the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software 010 * distributed under the License is distributed on an "AS IS" BASIS, 011 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 012 * implied. See the License for the specific language governing 013 * permissions and limitations under the License. 014 * 015 * 016 * Implements the Hadoop FS interfaces to allow applications to store 017 *files in Kosmos File System (KFS). 018 */ 019 020package org.apache.hadoop.fs.kfs; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.net.URI; 025 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.BlockLocation; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.FileUtil; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.fs.permission.FsPermission; 037import org.apache.hadoop.util.Progressable; 038 039/** 040 * A FileSystem backed by KFS. 041 * 042 */ 043@InterfaceAudience.Public 044@InterfaceStability.Stable 045public class KosmosFileSystem extends FileSystem { 046 047 private FileSystem localFs; 048 private IFSImpl kfsImpl = null; 049 private URI uri; 050 private Path workingDir = new Path("/"); 051 052 public KosmosFileSystem() { 053 054 } 055 056 KosmosFileSystem(IFSImpl fsimpl) { 057 this.kfsImpl = fsimpl; 058 } 059 060 @Override 061 public URI getUri() { 062 return uri; 063 } 064 065 @Override 066 public void initialize(URI uri, Configuration conf) throws IOException { 067 super.initialize(uri, conf); 068 try { 069 if (kfsImpl == null) { 070 if (uri.getHost() == null) { 071 kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""), 072 conf.getInt("fs.kfs.metaServerPort", -1), 073 statistics); 074 } else { 075 kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics); 076 } 077 } 078 079 this.localFs = FileSystem.getLocal(conf); 080 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 081 this.workingDir = new Path("/user", System.getProperty("user.name") 082 ).makeQualified(this); 083 setConf(conf); 084 085 } catch (Exception e) { 086 e.printStackTrace(); 087 System.out.println("Unable to initialize KFS"); 088 System.exit(-1); 089 } 090 } 091 092 @Override 093 public Path getWorkingDirectory() { 094 return workingDir; 095 } 096 097 @Override 098 public void setWorkingDirectory(Path dir) { 099 workingDir = makeAbsolute(dir); 100 } 101 102 private Path makeAbsolute(Path path) { 103 if (path.isAbsolute()) { 104 return path; 105 } 106 return new Path(workingDir, path); 107 } 108 109 @Override 110 public boolean mkdirs(Path path, FsPermission permission 111 ) throws IOException { 112 Path absolute = makeAbsolute(path); 113 String srep = absolute.toUri().getPath(); 114 115 int res; 116 117 // System.out.println("Calling mkdirs on: " + srep); 118 119 res = kfsImpl.mkdirs(srep); 120 121 return res == 0; 122 } 123 124 @Override 125 public boolean isDirectory(Path path) throws IOException { 126 Path absolute = makeAbsolute(path); 127 String srep = absolute.toUri().getPath(); 128 129 // System.out.println("Calling isdir on: " + srep); 130 131 return kfsImpl.isDirectory(srep); 132 } 133 134 @Override 135 public boolean isFile(Path path) throws IOException { 136 Path absolute = makeAbsolute(path); 137 String srep = absolute.toUri().getPath(); 138 return kfsImpl.isFile(srep); 139 } 140 141 @Override 142 public FileStatus[] listStatus(Path path) throws IOException { 143 Path absolute = makeAbsolute(path); 144 String srep = absolute.toUri().getPath(); 145 146 if(!kfsImpl.exists(srep)) 147 throw new FileNotFoundException("File " + path + " does not exist."); 148 149 if (kfsImpl.isFile(srep)) 150 return new FileStatus[] { getFileStatus(path) } ; 151 152 return kfsImpl.readdirplus(absolute); 153 } 154 155 @Override 156 public FileStatus getFileStatus(Path path) throws IOException { 157 Path absolute = makeAbsolute(path); 158 String srep = absolute.toUri().getPath(); 159 if (!kfsImpl.exists(srep)) { 160 throw new FileNotFoundException("File " + path + " does not exist."); 161 } 162 if (kfsImpl.isDirectory(srep)) { 163 // System.out.println("Status of path: " + path + " is dir"); 164 return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep), 165 path.makeQualified(this)); 166 } else { 167 // System.out.println("Status of path: " + path + " is file"); 168 return new FileStatus(kfsImpl.filesize(srep), false, 169 kfsImpl.getReplication(srep), 170 getDefaultBlockSize(), 171 kfsImpl.getModificationTime(srep), 172 path.makeQualified(this)); 173 } 174 } 175 176 @Override 177 public FSDataOutputStream append(Path f, int bufferSize, 178 Progressable progress) throws IOException { 179 Path parent = f.getParent(); 180 if (parent != null && !mkdirs(parent)) { 181 throw new IOException("Mkdirs failed to create " + parent); 182 } 183 184 Path absolute = makeAbsolute(f); 185 String srep = absolute.toUri().getPath(); 186 187 return kfsImpl.append(srep, bufferSize, progress); 188 } 189 190 @Override 191 public FSDataOutputStream create(Path file, FsPermission permission, 192 boolean overwrite, int bufferSize, 193 short replication, long blockSize, Progressable progress) 194 throws IOException { 195 196 if (exists(file)) { 197 if (overwrite) { 198 delete(file, true); 199 } else { 200 throw new IOException("File already exists: " + file); 201 } 202 } 203 204 Path parent = file.getParent(); 205 if (parent != null && !mkdirs(parent)) { 206 throw new IOException("Mkdirs failed to create " + parent); 207 } 208 209 Path absolute = makeAbsolute(file); 210 String srep = absolute.toUri().getPath(); 211 212 return kfsImpl.create(srep, replication, bufferSize, progress); 213 } 214 215 @Override 216 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 217 if (!exists(path)) 218 throw new IOException("File does not exist: " + path); 219 220 Path absolute = makeAbsolute(path); 221 String srep = absolute.toUri().getPath(); 222 223 return kfsImpl.open(srep, bufferSize); 224 } 225 226 @Override 227 public boolean rename(Path src, Path dst) throws IOException { 228 Path absoluteS = makeAbsolute(src); 229 String srepS = absoluteS.toUri().getPath(); 230 Path absoluteD = makeAbsolute(dst); 231 String srepD = absoluteD.toUri().getPath(); 232 233 // System.out.println("Calling rename on: " + srepS + " -> " + srepD); 234 235 return kfsImpl.rename(srepS, srepD) == 0; 236 } 237 238 // recursively delete the directory and its contents 239 @Override 240 public boolean delete(Path path, boolean recursive) throws IOException { 241 Path absolute = makeAbsolute(path); 242 String srep = absolute.toUri().getPath(); 243 if (kfsImpl.isFile(srep)) 244 return kfsImpl.remove(srep) == 0; 245 246 FileStatus[] dirEntries = listStatus(absolute); 247 if (!recursive && (dirEntries.length != 0)) { 248 throw new IOException("Directory " + path.toString() + 249 " is not empty."); 250 } 251 252 for (int i = 0; i < dirEntries.length; i++) { 253 delete(new Path(absolute, dirEntries[i].getPath()), recursive); 254 } 255 return kfsImpl.rmdir(srep) == 0; 256 } 257 258 @Override 259 public short getDefaultReplication() { 260 return 3; 261 } 262 263 @Override 264 public boolean setReplication(Path path, short replication) 265 throws IOException { 266 267 Path absolute = makeAbsolute(path); 268 String srep = absolute.toUri().getPath(); 269 270 int res = kfsImpl.setReplication(srep, replication); 271 return res >= 0; 272 } 273 274 // 64MB is the KFS block size 275 276 @Override 277 public long getDefaultBlockSize() { 278 return 1 << 26; 279 } 280 281 @Deprecated 282 public void lock(Path path, boolean shared) throws IOException { 283 284 } 285 286 @Deprecated 287 public void release(Path path) throws IOException { 288 289 } 290 291 /** 292 * Return null if the file doesn't exist; otherwise, get the 293 * locations of the various chunks of the file file from KFS. 294 */ 295 @Override 296 public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 297 long len) throws IOException { 298 299 if (file == null) { 300 return null; 301 } 302 String srep = makeAbsolute(file.getPath()).toUri().getPath(); 303 String[][] hints = kfsImpl.getDataLocation(srep, start, len); 304 if (hints == null) { 305 return null; 306 } 307 BlockLocation[] result = new BlockLocation[hints.length]; 308 long blockSize = getDefaultBlockSize(); 309 long length = len; 310 long blockStart = start; 311 for(int i=0; i < result.length; ++i) { 312 result[i] = new BlockLocation(null, hints[i], blockStart, 313 length < blockSize ? length : blockSize); 314 blockStart += blockSize; 315 length -= blockSize; 316 } 317 return result; 318 } 319 320 @Override 321 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { 322 FileUtil.copy(localFs, src, this, dst, delSrc, getConf()); 323 } 324 325 @Override 326 public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException { 327 FileUtil.copy(this, src, localFs, dst, delSrc, getConf()); 328 } 329 330 @Override 331 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 332 throws IOException { 333 return tmpLocalFile; 334 } 335 336 @Override 337 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 338 throws IOException { 339 moveFromLocalFile(tmpLocalFile, fsOutputFile); 340 } 341}