001/**
002 *
003 * Licensed under the Apache License, Version 2.0
004 * (the "License"); you may not use this file except in compliance with
005 * the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software
010 * distributed under the License is distributed on an "AS IS" BASIS,
011 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
012 * implied. See the License for the specific language governing
013 * permissions and limitations under the License.
014 *
015 * 
016 * Implements the Hadoop FS interfaces to allow applications to store
017 *files in Kosmos File System (KFS).
018 */
019
020package org.apache.hadoop.fs.kfs;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.net.URI;
025
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.BlockLocation;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.FileUtil;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.fs.permission.FsPermission;
037import org.apache.hadoop.util.Progressable;
038
039/**
040 * A FileSystem backed by KFS.
041 *
042 */
043@InterfaceAudience.Public
044@InterfaceStability.Stable
045public class KosmosFileSystem extends FileSystem {
046
047    private FileSystem localFs;
048    private IFSImpl kfsImpl = null;
049    private URI uri;
050    private Path workingDir = new Path("/");
051
052    public KosmosFileSystem() {
053
054    }
055
056    KosmosFileSystem(IFSImpl fsimpl) {
057        this.kfsImpl = fsimpl;
058    }
059
060    @Override
061    public URI getUri() {
062        return uri;
063    }
064
065    @Override
066    public void initialize(URI uri, Configuration conf) throws IOException {
067      super.initialize(uri, conf);
068      try {
069        if (kfsImpl == null) {
070          if (uri.getHost() == null) {
071            kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
072                                  conf.getInt("fs.kfs.metaServerPort", -1),
073                                  statistics);
074          } else {
075            kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics);
076          }
077        }
078
079        this.localFs = FileSystem.getLocal(conf);
080        this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
081        this.workingDir = new Path("/user", System.getProperty("user.name")
082                                   ).makeQualified(this);
083        setConf(conf);
084
085      } catch (Exception e) {
086        e.printStackTrace();
087        System.out.println("Unable to initialize KFS");
088        System.exit(-1);
089      }
090    }
091
092    @Override
093    public Path getWorkingDirectory() {
094        return workingDir;
095    }
096
097    @Override
098    public void setWorkingDirectory(Path dir) {
099        workingDir = makeAbsolute(dir);
100    }
101
102    private Path makeAbsolute(Path path) {
103        if (path.isAbsolute()) {
104            return path;
105        }
106        return new Path(workingDir, path);
107    }
108
109    @Override
110    public boolean mkdirs(Path path, FsPermission permission
111        ) throws IOException {
112        Path absolute = makeAbsolute(path);
113        String srep = absolute.toUri().getPath();
114
115        int res;
116
117        // System.out.println("Calling mkdirs on: " + srep);
118
119        res = kfsImpl.mkdirs(srep);
120        
121        return res == 0;
122    }
123
124    @Override
125    public boolean isDirectory(Path path) throws IOException {
126        Path absolute = makeAbsolute(path);
127        String srep = absolute.toUri().getPath();
128
129        // System.out.println("Calling isdir on: " + srep);
130
131        return kfsImpl.isDirectory(srep);
132    }
133
134    @Override
135    public boolean isFile(Path path) throws IOException {
136        Path absolute = makeAbsolute(path);
137        String srep = absolute.toUri().getPath();
138        return kfsImpl.isFile(srep);
139    }
140
141    @Override
142    public FileStatus[] listStatus(Path path) throws IOException {
143        Path absolute = makeAbsolute(path);
144        String srep = absolute.toUri().getPath();
145
146        if(!kfsImpl.exists(srep))
147          throw new FileNotFoundException("File " + path + " does not exist.");
148
149        if (kfsImpl.isFile(srep))
150                return new FileStatus[] { getFileStatus(path) } ;
151
152        return kfsImpl.readdirplus(absolute);
153    }
154
155    @Override
156    public FileStatus getFileStatus(Path path) throws IOException {
157        Path absolute = makeAbsolute(path);
158        String srep = absolute.toUri().getPath();
159        if (!kfsImpl.exists(srep)) {
160          throw new FileNotFoundException("File " + path + " does not exist.");
161        }
162        if (kfsImpl.isDirectory(srep)) {
163            // System.out.println("Status of path: " + path + " is dir");
164            return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep), 
165                                  path.makeQualified(this));
166        } else {
167            // System.out.println("Status of path: " + path + " is file");
168            return new FileStatus(kfsImpl.filesize(srep), false, 
169                                  kfsImpl.getReplication(srep),
170                                  getDefaultBlockSize(),
171                                  kfsImpl.getModificationTime(srep),
172                                  path.makeQualified(this));
173        }
174    }
175    
176    @Override
177    public FSDataOutputStream append(Path f, int bufferSize,
178        Progressable progress) throws IOException {
179        Path parent = f.getParent();
180        if (parent != null && !mkdirs(parent)) {
181            throw new IOException("Mkdirs failed to create " + parent);
182        }
183
184        Path absolute = makeAbsolute(f);
185        String srep = absolute.toUri().getPath();
186
187        return kfsImpl.append(srep, bufferSize, progress);
188    }
189
190    @Override
191    public FSDataOutputStream create(Path file, FsPermission permission,
192                                     boolean overwrite, int bufferSize,
193                                     short replication, long blockSize, Progressable progress)
194        throws IOException {
195
196        if (exists(file)) {
197            if (overwrite) {
198                delete(file, true);
199            } else {
200                throw new IOException("File already exists: " + file);
201            }
202        }
203
204        Path parent = file.getParent();
205        if (parent != null && !mkdirs(parent)) {
206            throw new IOException("Mkdirs failed to create " + parent);
207        }
208
209        Path absolute = makeAbsolute(file);
210        String srep = absolute.toUri().getPath();
211
212        return kfsImpl.create(srep, replication, bufferSize, progress);
213    }
214
215    @Override
216    public FSDataInputStream open(Path path, int bufferSize) throws IOException {
217        if (!exists(path))
218            throw new IOException("File does not exist: " + path);
219
220        Path absolute = makeAbsolute(path);
221        String srep = absolute.toUri().getPath();
222
223        return kfsImpl.open(srep, bufferSize);
224    }
225
226    @Override
227    public boolean rename(Path src, Path dst) throws IOException {
228        Path absoluteS = makeAbsolute(src);
229        String srepS = absoluteS.toUri().getPath();
230        Path absoluteD = makeAbsolute(dst);
231        String srepD = absoluteD.toUri().getPath();
232
233        // System.out.println("Calling rename on: " + srepS + " -> " + srepD);
234
235        return kfsImpl.rename(srepS, srepD) == 0;
236    }
237
238    // recursively delete the directory and its contents
239    @Override
240    public boolean delete(Path path, boolean recursive) throws IOException {
241      Path absolute = makeAbsolute(path);
242      String srep = absolute.toUri().getPath();
243      if (kfsImpl.isFile(srep))
244        return kfsImpl.remove(srep) == 0;
245
246      FileStatus[] dirEntries = listStatus(absolute);
247      if (!recursive && (dirEntries.length != 0)) {
248        throw new IOException("Directory " + path.toString() + 
249        " is not empty.");
250      }
251
252      for (int i = 0; i < dirEntries.length; i++) {
253        delete(new Path(absolute, dirEntries[i].getPath()), recursive);
254      }
255      return kfsImpl.rmdir(srep) == 0;
256    }
257    
258    @Override
259    public short getDefaultReplication() {
260        return 3;
261    }
262
263    @Override
264    public boolean setReplication(Path path, short replication)
265        throws IOException {
266
267        Path absolute = makeAbsolute(path);
268        String srep = absolute.toUri().getPath();
269
270        int res = kfsImpl.setReplication(srep, replication);
271        return res >= 0;
272    }
273
274    // 64MB is the KFS block size
275
276    @Override
277    public long getDefaultBlockSize() {
278        return 1 << 26;
279    }
280
281    @Deprecated            
282    public void lock(Path path, boolean shared) throws IOException {
283
284    }
285
286    @Deprecated            
287    public void release(Path path) throws IOException {
288
289    }
290
291    /**
292     * Return null if the file doesn't exist; otherwise, get the
293     * locations of the various chunks of the file file from KFS.
294     */
295    @Override
296    public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
297        long len) throws IOException {
298
299      if (file == null) {
300        return null;
301      }
302      String srep = makeAbsolute(file.getPath()).toUri().getPath();
303      String[][] hints = kfsImpl.getDataLocation(srep, start, len);
304      if (hints == null) {
305        return null;
306      }
307      BlockLocation[] result = new BlockLocation[hints.length];
308      long blockSize = getDefaultBlockSize();
309      long length = len;
310      long blockStart = start;
311      for(int i=0; i < result.length; ++i) {
312        result[i] = new BlockLocation(null, hints[i], blockStart, 
313                                      length < blockSize ? length : blockSize);
314        blockStart += blockSize;
315        length -= blockSize;
316      }
317      return result;
318    }
319
320    @Override
321    public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
322        FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
323    }
324
325    @Override
326    public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
327        FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
328    }
329
330    @Override
331    public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
332        throws IOException {
333        return tmpLocalFile;
334    }
335
336    @Override
337    public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
338        throws IOException {
339        moveFromLocalFile(tmpLocalFile, fsOutputFile);
340    }
341}