001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.server.namenode;
019
020import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
021import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_APPEND;
022import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_BLOCK;
023import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
024import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
025import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
026import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
027import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
028import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLEAR_NS_QUOTA;
029import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLOSE;
030import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_DELETE;
031import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
032import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
033import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
034import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
035import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
036import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
037import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
038import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
039import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_DIRECTIVE;
040import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
041import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
042import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
043import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
044import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
045import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
046import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
047import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
048import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
049import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_ACL;
050import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_FINALIZE;
051import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ROLLING_UPGRADE_START;
052import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
053import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
054import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
055import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
056import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS;
057import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
058import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
059import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_XATTR;
060import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
061import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
062import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
063import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TRUNCATE;
064import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
065import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
066import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_STORAGE_POLICY;
067import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA_BY_STORAGETYPE;
068
069import java.io.DataInput;
070import java.io.DataInputStream;
071import java.io.DataOutput;
072import java.io.DataOutputStream;
073import java.io.EOFException;
074import java.io.IOException;
075import java.util.ArrayList;
076import java.util.Arrays;
077import java.util.EnumMap;
078import java.util.List;
079import java.util.zip.CheckedInputStream;
080import java.util.zip.Checksum;
081
082import org.apache.commons.codec.DecoderException;
083import org.apache.commons.codec.binary.Hex;
084import org.apache.hadoop.classification.InterfaceAudience;
085import org.apache.hadoop.classification.InterfaceStability;
086import org.apache.hadoop.fs.ChecksumException;
087import org.apache.hadoop.fs.Options.Rename;
088import org.apache.hadoop.fs.XAttr;
089import org.apache.hadoop.fs.XAttrCodec;
090import org.apache.hadoop.fs.permission.AclEntry;
091import org.apache.hadoop.fs.permission.AclEntryScope;
092import org.apache.hadoop.fs.permission.AclEntryType;
093import org.apache.hadoop.fs.permission.FsAction;
094import org.apache.hadoop.fs.permission.FsPermission;
095import org.apache.hadoop.fs.permission.PermissionStatus;
096import org.apache.hadoop.fs.StorageType;
097import org.apache.hadoop.hdfs.DFSConfigKeys;
098import org.apache.hadoop.hdfs.DeprecatedUTF8;
099import org.apache.hadoop.hdfs.protocol.Block;
100import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
101import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
102import org.apache.hadoop.hdfs.protocol.ClientProtocol;
103import org.apache.hadoop.hdfs.protocol.HdfsConstants;
104import org.apache.hadoop.hdfs.protocol.LayoutVersion;
105import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
106import org.apache.hadoop.hdfs.protocol.proto.EditLogProtos.AclEditLogProto;
107import org.apache.hadoop.hdfs.protocol.proto.EditLogProtos.XAttrEditLogProto;
108import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
109import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
110import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
111import org.apache.hadoop.hdfs.util.XMLUtils;
112import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
113import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
114import org.apache.hadoop.io.ArrayWritable;
115import org.apache.hadoop.io.BytesWritable;
116import org.apache.hadoop.io.DataOutputBuffer;
117import org.apache.hadoop.io.IOUtils;
118import org.apache.hadoop.io.Text;
119import org.apache.hadoop.io.Writable;
120import org.apache.hadoop.io.WritableFactories;
121import org.apache.hadoop.io.WritableFactory;
122import org.apache.hadoop.ipc.ClientId;
123import org.apache.hadoop.ipc.RpcConstants;
124import org.apache.hadoop.security.token.delegation.DelegationKey;
125import org.apache.hadoop.util.DataChecksum;
126import org.apache.hadoop.util.StringUtils;
127import org.xml.sax.ContentHandler;
128import org.xml.sax.SAXException;
129import org.xml.sax.helpers.AttributesImpl;
130
131import com.google.common.annotations.VisibleForTesting;
132import com.google.common.base.Joiner;
133import com.google.common.base.Preconditions;
134import com.google.common.collect.ImmutableMap;
135import com.google.common.collect.Lists;
136
137/**
138 * Helper classes for reading the ops from an InputStream.
139 * All ops derive from FSEditLogOp and are only
140 * instantiated from Reader#readOp()
141 */
142@InterfaceAudience.Private
143@InterfaceStability.Unstable
144public abstract class FSEditLogOp {
145  public final FSEditLogOpCodes opCode;
146  long txid;
147  byte[] rpcClientId;
148  int rpcCallId;
149
150  public static class OpInstanceCache {
151    private static ThreadLocal<OpInstanceCacheMap> cache =
152        new ThreadLocal<OpInstanceCacheMap>() {
153      @Override
154      protected OpInstanceCacheMap initialValue() {
155        return new OpInstanceCacheMap();
156      }
157    };
158
159    @SuppressWarnings("serial")
160    static final class OpInstanceCacheMap extends
161        EnumMap<FSEditLogOpCodes, FSEditLogOp> {
162      OpInstanceCacheMap() {
163        super(FSEditLogOpCodes.class);
164        for (FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) {
165          put(opCode, newInstance(opCode));
166        }
167      }
168    }
169
170    private boolean useCache = true;
171
172    void disableCache() {
173      useCache = false;
174    }
175
176    public OpInstanceCache get() {
177      return this;
178    }
179
180    @SuppressWarnings("unchecked")
181    public <T extends FSEditLogOp> T get(FSEditLogOpCodes opCode) {
182      return useCache ? (T)cache.get().get(opCode) : (T)newInstance(opCode);
183    }
184
185    private static FSEditLogOp newInstance(FSEditLogOpCodes opCode) {
186      FSEditLogOp instance = null;
187      Class<? extends FSEditLogOp> clazz = opCode.getOpClass();
188      if (clazz != null) {
189        try {
190          instance = clazz.newInstance();
191        } catch (Exception ex) {
192          throw new RuntimeException("Failed to instantiate "+opCode, ex);
193        }
194      }
195      return instance;
196    }
197  }
198
199  final void reset() {
200    txid = HdfsServerConstants.INVALID_TXID;
201    rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
202    rpcCallId = RpcConstants.INVALID_CALL_ID;
203    resetSubFields();
204  }
205
206  abstract void resetSubFields();
207
208  private static ImmutableMap<String, FsAction> fsActionMap() {
209    ImmutableMap.Builder<String, FsAction> b = ImmutableMap.builder();
210    for (FsAction v : FsAction.values())
211      b.put(v.SYMBOL, v);
212    return b.build();
213  }
214
215  private static final ImmutableMap<String, FsAction> FSACTION_SYMBOL_MAP
216    = fsActionMap();
217
218  /**
219   * Constructor for an EditLog Op. EditLog ops cannot be constructed
220   * directly, but only through Reader#readOp.
221   */
222  @VisibleForTesting
223  protected FSEditLogOp(FSEditLogOpCodes opCode) {
224    this.opCode = opCode;
225    reset();
226  }
227
228  public long getTransactionId() {
229    Preconditions.checkState(txid != HdfsServerConstants.INVALID_TXID);
230    return txid;
231  }
232
233  public String getTransactionIdStr() {
234    return (txid == HdfsServerConstants.INVALID_TXID) ? "(none)" : "" + txid;
235  }
236  
237  public boolean hasTransactionId() {
238    return (txid != HdfsServerConstants.INVALID_TXID);
239  }
240
241  public void setTransactionId(long txid) {
242    this.txid = txid;
243  }
244  
245  public boolean hasRpcIds() {
246    return rpcClientId != RpcConstants.DUMMY_CLIENT_ID
247        && rpcCallId != RpcConstants.INVALID_CALL_ID;
248  }
249  
250  /** this has to be called after calling {@link #hasRpcIds()} */
251  public byte[] getClientId() {
252    Preconditions.checkState(rpcClientId != RpcConstants.DUMMY_CLIENT_ID);
253    return rpcClientId;
254  }
255  
256  public void setRpcClientId(byte[] clientId) {
257    this.rpcClientId = clientId;
258  }
259  
260  /** this has to be called after calling {@link #hasRpcIds()} */
261  public int getCallId() {
262    Preconditions.checkState(rpcCallId != RpcConstants.INVALID_CALL_ID);
263    return rpcCallId;
264  }
265  
266  public void setRpcCallId(int callId) {
267    this.rpcCallId = callId;
268  }
269
270  abstract void readFields(DataInputStream in, int logVersion)
271      throws IOException;
272
273  public abstract void writeFields(DataOutputStream out)
274      throws IOException;
275
276  static interface BlockListUpdatingOp {
277    Block[] getBlocks();
278    String getPath();
279    boolean shouldCompleteLastBlock();
280  }
281  
282  private static void writeRpcIds(final byte[] clientId, final int callId,
283      DataOutputStream out) throws IOException {
284    FSImageSerialization.writeBytes(clientId, out);
285    FSImageSerialization.writeInt(callId, out);
286  }
287  
288  void readRpcIds(DataInputStream in, int logVersion)
289      throws IOException {
290    if (NameNodeLayoutVersion.supports(
291        LayoutVersion.Feature.EDITLOG_SUPPORT_RETRYCACHE, logVersion)) {
292      this.rpcClientId = FSImageSerialization.readBytes(in);
293      this.rpcCallId = FSImageSerialization.readInt(in);
294    }
295  }
296  
297  void readRpcIdsFromXml(Stanza st) {
298    this.rpcClientId = st.hasChildren("RPC_CLIENTID") ? 
299        ClientId.toBytes(st.getValue("RPC_CLIENTID"))
300        : RpcConstants.DUMMY_CLIENT_ID;
301    this.rpcCallId = st.hasChildren("RPC_CALLID") ? 
302        Integer.parseInt(st.getValue("RPC_CALLID"))
303        : RpcConstants.INVALID_CALL_ID;
304  }
305  
306  private static void appendRpcIdsToString(final StringBuilder builder,
307      final byte[] clientId, final int callId) {
308    builder.append(", RpcClientId=");
309    builder.append(ClientId.toString(clientId));
310    builder.append(", RpcCallId=");
311    builder.append(callId);
312  }
313  
314  private static void appendRpcIdsToXml(ContentHandler contentHandler,
315      final byte[] clientId, final int callId) throws SAXException {
316    XMLUtils.addSaxString(contentHandler, "RPC_CLIENTID",
317        ClientId.toString(clientId));
318    XMLUtils.addSaxString(contentHandler, "RPC_CALLID", 
319        Integer.toString(callId));
320  }
321
322  private static final class AclEditLogUtil {
323    private static final int ACL_EDITLOG_ENTRY_HAS_NAME_OFFSET = 6;
324    private static final int ACL_EDITLOG_ENTRY_TYPE_OFFSET = 3;
325    private static final int ACL_EDITLOG_ENTRY_SCOPE_OFFSET = 5;
326    private static final int ACL_EDITLOG_PERM_MASK = 7;
327    private static final int ACL_EDITLOG_ENTRY_TYPE_MASK = 3;
328    private static final int ACL_EDITLOG_ENTRY_SCOPE_MASK = 1;
329
330    private static final FsAction[] FSACTION_VALUES = FsAction.values();
331    private static final AclEntryScope[] ACL_ENTRY_SCOPE_VALUES = AclEntryScope
332        .values();
333    private static final AclEntryType[] ACL_ENTRY_TYPE_VALUES = AclEntryType
334        .values();
335
336    private static List<AclEntry> read(DataInputStream in, int logVersion)
337        throws IOException {
338      if (!NameNodeLayoutVersion.supports(Feature.EXTENDED_ACL, logVersion)) {
339        return null;
340      }
341
342      int size = in.readInt();
343      if (size == 0) {
344        return null;
345      }
346
347      List<AclEntry> aclEntries = Lists.newArrayListWithCapacity(size);
348      for (int i = 0; i < size; ++i) {
349        int v = in.read();
350        int p = v & ACL_EDITLOG_PERM_MASK;
351        int t = (v >> ACL_EDITLOG_ENTRY_TYPE_OFFSET)
352            & ACL_EDITLOG_ENTRY_TYPE_MASK;
353        int s = (v >> ACL_EDITLOG_ENTRY_SCOPE_OFFSET)
354            & ACL_EDITLOG_ENTRY_SCOPE_MASK;
355        boolean hasName = ((v >> ACL_EDITLOG_ENTRY_HAS_NAME_OFFSET) & 1) == 1;
356        String name = hasName ? FSImageSerialization.readString(in) : null;
357        aclEntries.add(new AclEntry.Builder().setName(name)
358            .setPermission(FSACTION_VALUES[p])
359            .setScope(ACL_ENTRY_SCOPE_VALUES[s])
360            .setType(ACL_ENTRY_TYPE_VALUES[t]).build());
361      }
362
363      return aclEntries;
364    }
365
366    private static void write(List<AclEntry> aclEntries, DataOutputStream out)
367        throws IOException {
368      if (aclEntries == null) {
369        out.writeInt(0);
370        return;
371      }
372
373      out.writeInt(aclEntries.size());
374      for (AclEntry e : aclEntries) {
375        boolean hasName = e.getName() != null;
376        int v = (e.getScope().ordinal() << ACL_EDITLOG_ENTRY_SCOPE_OFFSET)
377            | (e.getType().ordinal() << ACL_EDITLOG_ENTRY_TYPE_OFFSET)
378            | e.getPermission().ordinal();
379
380        if (hasName) {
381          v |= 1 << ACL_EDITLOG_ENTRY_HAS_NAME_OFFSET;
382        }
383        out.write(v);
384        if (hasName) {
385          FSImageSerialization.writeString(e.getName(), out);
386        }
387      }
388    }
389  }
390
391  private static List<XAttr> readXAttrsFromEditLog(DataInputStream in,
392      int logVersion) throws IOException {
393    if (!NameNodeLayoutVersion.supports(NameNodeLayoutVersion.Feature.XATTRS,
394        logVersion)) {
395      return null;
396    }
397    XAttrEditLogProto proto = XAttrEditLogProto.parseDelimitedFrom(in);
398    return PBHelperClient.convertXAttrs(proto.getXAttrsList());
399  }
400
401  @SuppressWarnings("unchecked")
402  static abstract class AddCloseOp
403         extends FSEditLogOp
404          implements BlockListUpdatingOp {
405    int length;
406    long inodeId;
407    String path;
408    short replication;
409    long mtime;
410    long atime;
411    long blockSize;
412    Block[] blocks;
413    PermissionStatus permissions;
414    List<AclEntry> aclEntries;
415    List<XAttr> xAttrs;
416    String clientName;
417    String clientMachine;
418    boolean overwrite;
419    byte storagePolicyId;
420    
421    private AddCloseOp(FSEditLogOpCodes opCode) {
422      super(opCode);
423      storagePolicyId = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
424      assert(opCode == OP_ADD || opCode == OP_CLOSE || opCode == OP_APPEND);
425    }
426
427    @Override
428    void resetSubFields() {
429      length = 0;
430      inodeId = 0L;
431      path = null;
432      replication = 0;
433      mtime = 0L;
434      atime = 0L;
435      blockSize = 0L;
436      blocks = null;
437      permissions = null;
438      aclEntries = null;
439      xAttrs = null;
440      clientName = null;
441      clientMachine = null;
442      overwrite = false;
443      storagePolicyId = 0;
444    }
445
446    <T extends AddCloseOp> T setInodeId(long inodeId) {
447      this.inodeId = inodeId;
448      return (T)this;
449    }
450
451    <T extends AddCloseOp> T setPath(String path) {
452      this.path = path;
453      return (T)this;
454    }
455    
456    @Override
457    public String getPath() {
458      return path;
459    }
460
461    <T extends AddCloseOp> T setReplication(short replication) {
462      this.replication = replication;
463      return (T)this;
464    }
465
466    <T extends AddCloseOp> T setModificationTime(long mtime) {
467      this.mtime = mtime;
468      return (T)this;
469    }
470
471    <T extends AddCloseOp> T setAccessTime(long atime) {
472      this.atime = atime;
473      return (T)this;
474    }
475
476    <T extends AddCloseOp> T setBlockSize(long blockSize) {
477      this.blockSize = blockSize;
478      return (T)this;
479    }
480
481    <T extends AddCloseOp> T setBlocks(Block[] blocks) {
482      if (blocks.length > MAX_BLOCKS) {
483        throw new RuntimeException("Can't have more than " + MAX_BLOCKS +
484            " in an AddCloseOp.");
485      }
486      this.blocks = blocks;
487      return (T)this;
488    }
489    
490    @Override
491    public Block[] getBlocks() {
492      return blocks;
493    }
494
495    <T extends AddCloseOp> T setPermissionStatus(PermissionStatus permissions) {
496      this.permissions = permissions;
497      return (T)this;
498    }
499
500    <T extends AddCloseOp> T setAclEntries(List<AclEntry> aclEntries) {
501      this.aclEntries = aclEntries;
502      return (T)this;
503    }
504
505    <T extends AddCloseOp> T setXAttrs(List<XAttr> xAttrs) {
506      this.xAttrs = xAttrs;
507      return (T)this;
508    }
509
510    <T extends AddCloseOp> T setClientName(String clientName) {
511      this.clientName = clientName;
512      return (T)this;
513    }
514
515    <T extends AddCloseOp> T setClientMachine(String clientMachine) {
516      this.clientMachine = clientMachine;
517      return (T)this;
518    }
519    
520    <T extends AddCloseOp> T setOverwrite(boolean overwrite) {
521      this.overwrite = overwrite;
522      return (T)this;
523    }
524
525    <T extends AddCloseOp> T setStoragePolicyId(byte storagePolicyId) {
526      this.storagePolicyId = storagePolicyId;
527      return (T)this;
528    }
529
530    @Override
531    public void writeFields(DataOutputStream out) throws IOException {
532      FSImageSerialization.writeLong(inodeId, out);
533      FSImageSerialization.writeString(path, out);
534      FSImageSerialization.writeShort(replication, out);
535      FSImageSerialization.writeLong(mtime, out);
536      FSImageSerialization.writeLong(atime, out);
537      FSImageSerialization.writeLong(blockSize, out);
538      new ArrayWritable(Block.class, blocks).write(out);
539      permissions.write(out);
540
541      if (this.opCode == OP_ADD) {
542        AclEditLogUtil.write(aclEntries, out);
543        XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
544        b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
545        b.build().writeDelimitedTo(out);
546        FSImageSerialization.writeString(clientName,out);
547        FSImageSerialization.writeString(clientMachine,out);
548        FSImageSerialization.writeBoolean(overwrite, out);
549        FSImageSerialization.writeByte(storagePolicyId, out);
550        // write clientId and callId
551        writeRpcIds(rpcClientId, rpcCallId, out);
552      }
553    }
554
555    @Override
556    void readFields(DataInputStream in, int logVersion)
557        throws IOException {
558      if (!NameNodeLayoutVersion.supports(
559          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
560        this.length = in.readInt();
561      }
562      if (NameNodeLayoutVersion.supports(
563          LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
564        this.inodeId = in.readLong();
565      } else {
566        // The inodeId should be updated when this editLogOp is applied
567        this.inodeId = HdfsConstants.GRANDFATHER_INODE_ID;
568      }
569      if ((-17 < logVersion && length != 4) ||
570          (logVersion <= -17 && length != 5 && !NameNodeLayoutVersion.supports(
571              LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion))) {
572        throw new IOException("Incorrect data format."  +
573                              " logVersion is " + logVersion +
574                              " but writables.length is " +
575                              length + ". ");
576      }
577      this.path = FSImageSerialization.readString(in);
578
579      if (NameNodeLayoutVersion.supports(
580          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
581        this.replication = FSImageSerialization.readShort(in);
582        this.mtime = FSImageSerialization.readLong(in);
583      } else {
584        this.replication = readShort(in);
585        this.mtime = readLong(in);
586      }
587
588      if (NameNodeLayoutVersion.supports(
589          LayoutVersion.Feature.FILE_ACCESS_TIME, logVersion)) {
590        if (NameNodeLayoutVersion.supports(
591            LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
592          this.atime = FSImageSerialization.readLong(in);
593        } else {
594          this.atime = readLong(in);
595        }
596      } else {
597        this.atime = 0;
598      }
599
600      if (NameNodeLayoutVersion.supports(
601          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
602        this.blockSize = FSImageSerialization.readLong(in);
603      } else {
604        this.blockSize = readLong(in);
605      }
606
607      this.blocks = readBlocks(in, logVersion);
608      this.permissions = PermissionStatus.read(in);
609
610      if (this.opCode == OP_ADD) {
611        aclEntries = AclEditLogUtil.read(in, logVersion);
612        this.xAttrs = readXAttrsFromEditLog(in, logVersion);
613        this.clientName = FSImageSerialization.readString(in);
614        this.clientMachine = FSImageSerialization.readString(in);
615        if (NameNodeLayoutVersion.supports(
616            NameNodeLayoutVersion.Feature.CREATE_OVERWRITE, logVersion)) {
617          this.overwrite = FSImageSerialization.readBoolean(in);
618        } else {
619          this.overwrite = false;
620        }
621        if (NameNodeLayoutVersion.supports(
622            NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) {
623          this.storagePolicyId = FSImageSerialization.readByte(in);
624        } else {
625          this.storagePolicyId =
626              HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
627        }
628        // read clientId and callId
629        readRpcIds(in, logVersion);
630      } else {
631        this.clientName = "";
632        this.clientMachine = "";
633      }
634    }
635
636    static final public int MAX_BLOCKS = 1024 * 1024 * 64;
637    
638    private static Block[] readBlocks(
639        DataInputStream in,
640        int logVersion) throws IOException {
641      int numBlocks = in.readInt();
642      if (numBlocks < 0) {
643        throw new IOException("invalid negative number of blocks");
644      } else if (numBlocks > MAX_BLOCKS) {
645        throw new IOException("invalid number of blocks: " + numBlocks +
646            ".  The maximum number of blocks per file is " + MAX_BLOCKS);
647      }
648      Block[] blocks = new Block[numBlocks];
649      for (int i = 0; i < numBlocks; i++) {
650        Block blk = new Block();
651        blk.readFields(in);
652        blocks[i] = blk;
653      }
654      return blocks;
655    }
656
657    public String stringifyMembers() {
658      StringBuilder builder = new StringBuilder();
659      builder.append("[length=");
660      builder.append(length);
661      builder.append(", inodeId=");
662      builder.append(inodeId);
663      builder.append(", path=");
664      builder.append(path);
665      builder.append(", replication=");
666      builder.append(replication);
667      builder.append(", mtime=");
668      builder.append(mtime);
669      builder.append(", atime=");
670      builder.append(atime);
671      builder.append(", blockSize=");
672      builder.append(blockSize);
673      builder.append(", blocks=");
674      builder.append(Arrays.toString(blocks));
675      builder.append(", permissions=");
676      builder.append(permissions);
677      builder.append(", aclEntries=");
678      builder.append(aclEntries);
679      builder.append(", clientName=");
680      builder.append(clientName);
681      builder.append(", clientMachine=");
682      builder.append(clientMachine);
683      builder.append(", overwrite=");
684      builder.append(overwrite);
685      if (this.opCode == OP_ADD) {
686        appendRpcIdsToString(builder, rpcClientId, rpcCallId);
687      }
688      builder.append(", storagePolicyId=");
689      builder.append(storagePolicyId);
690      builder.append(", opCode=");
691      builder.append(opCode);
692      builder.append(", txid=");
693      builder.append(txid);
694      builder.append("]");
695      return builder.toString();
696    }
697    
698    @Override
699    protected void toXml(ContentHandler contentHandler) throws SAXException {
700      XMLUtils.addSaxString(contentHandler, "LENGTH",
701          Integer.toString(length));
702      XMLUtils.addSaxString(contentHandler, "INODEID",
703          Long.toString(inodeId));
704      XMLUtils.addSaxString(contentHandler, "PATH", path);
705      XMLUtils.addSaxString(contentHandler, "REPLICATION",
706          Short.toString(replication));
707      XMLUtils.addSaxString(contentHandler, "MTIME",
708          Long.toString(mtime));
709      XMLUtils.addSaxString(contentHandler, "ATIME",
710          Long.toString(atime));
711      XMLUtils.addSaxString(contentHandler, "BLOCKSIZE",
712          Long.toString(blockSize));
713      XMLUtils.addSaxString(contentHandler, "CLIENT_NAME", clientName);
714      XMLUtils.addSaxString(contentHandler, "CLIENT_MACHINE", clientMachine);
715      XMLUtils.addSaxString(contentHandler, "OVERWRITE", 
716          Boolean.toString(overwrite));
717      for (Block b : blocks) {
718        FSEditLogOp.blockToXml(contentHandler, b);
719      }
720      FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
721      if (this.opCode == OP_ADD) {
722        if (aclEntries != null) {
723          appendAclEntriesToXml(contentHandler, aclEntries);
724        }
725        appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
726      }
727    }
728
729    @Override 
730    void fromXml(Stanza st) throws InvalidXmlException {
731      this.length = Integer.parseInt(st.getValue("LENGTH"));
732      this.inodeId = Long.parseLong(st.getValue("INODEID"));
733      this.path = st.getValue("PATH");
734      this.replication = Short.parseShort(st.getValue("REPLICATION"));
735      this.mtime = Long.parseLong(st.getValue("MTIME"));
736      this.atime = Long.parseLong(st.getValue("ATIME"));
737      this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE"));
738
739      this.clientName = st.getValue("CLIENT_NAME");
740      this.clientMachine = st.getValue("CLIENT_MACHINE");
741      this.overwrite = Boolean.parseBoolean(st.getValueOrNull("OVERWRITE"));
742      if (st.hasChildren("BLOCK")) {
743        List<Stanza> blocks = st.getChildren("BLOCK");
744        this.blocks = new Block[blocks.size()];
745        for (int i = 0; i < blocks.size(); i++) {
746          this.blocks[i] = FSEditLogOp.blockFromXml(blocks.get(i));
747        }
748      } else {
749        this.blocks = new Block[0];
750      }
751      this.permissions = permissionStatusFromXml(st);
752      aclEntries = readAclEntriesFromXml(st);
753      readRpcIdsFromXml(st);
754    }
755  }
756
757  /**
758   * {@literal @AtMostOnce} for {@link ClientProtocol#create} and
759   * {@link ClientProtocol#append}
760   */
761  static class AddOp extends AddCloseOp {
762    AddOp() {
763      super(OP_ADD);
764    }
765
766    static AddOp getInstance(OpInstanceCache cache) {
767      return (AddOp) cache.get(OP_ADD);
768    }
769
770    @Override
771    public boolean shouldCompleteLastBlock() {
772      return false;
773    }
774
775    @Override
776    public String toString() {
777      StringBuilder builder = new StringBuilder();
778      builder.append("AddOp ");
779      builder.append(stringifyMembers());
780      return builder.toString();
781    }
782  }
783
784  /**
785   * Although {@link ClientProtocol#append} may also log a close op, we do
786   * not need to record the rpc ids here since a successful appendFile op will
787   * finally log an AddOp.
788   */
789  static class CloseOp extends AddCloseOp {
790    CloseOp() {
791      super(OP_CLOSE);
792    }
793
794    static CloseOp getInstance(OpInstanceCache cache) {
795      return (CloseOp)cache.get(OP_CLOSE);
796    }
797
798    @Override
799    public boolean shouldCompleteLastBlock() {
800      return true;
801    }
802
803    @Override
804    public String toString() {
805      StringBuilder builder = new StringBuilder();
806      builder.append("CloseOp ");
807      builder.append(stringifyMembers());
808      return builder.toString();
809    }
810  }
811
812  static class AppendOp extends FSEditLogOp {
813    String path;
814    String clientName;
815    String clientMachine;
816    boolean newBlock;
817
818    AppendOp() {
819      super(OP_APPEND);
820    }
821
822    static AppendOp getInstance(OpInstanceCache cache) {
823      return (AppendOp) cache.get(OP_APPEND);
824    }
825
826    AppendOp setPath(String path) {
827      this.path = path;
828      return this;
829    }
830
831    AppendOp setClientName(String clientName) {
832      this.clientName = clientName;
833      return this;
834    }
835
836    AppendOp setClientMachine(String clientMachine) {
837      this.clientMachine = clientMachine;
838      return this;
839    }
840
841    AppendOp setNewBlock(boolean newBlock) {
842      this.newBlock = newBlock;
843      return this;
844    }
845
846    @Override
847    public String toString() {
848      StringBuilder builder = new StringBuilder();
849      builder.append("AppendOp ");
850      builder.append("[path=").append(path);
851      builder.append(", clientName=").append(clientName);
852      builder.append(", clientMachine=").append(clientMachine);
853      builder.append(", newBlock=").append(newBlock).append("]");
854      return builder.toString();
855    }
856
857    @Override
858    void resetSubFields() {
859      this.path = null;
860      this.clientName = null;
861      this.clientMachine = null;
862      this.newBlock = false;
863    }
864
865    @Override
866    void readFields(DataInputStream in, int logVersion) throws IOException {
867      this.path = FSImageSerialization.readString(in);
868      this.clientName = FSImageSerialization.readString(in);
869      this.clientMachine = FSImageSerialization.readString(in);
870      this.newBlock = FSImageSerialization.readBoolean(in);
871      readRpcIds(in, logVersion);
872    }
873
874    @Override
875    public void writeFields(DataOutputStream out) throws IOException {
876      FSImageSerialization.writeString(path, out);
877      FSImageSerialization.writeString(clientName, out);
878      FSImageSerialization.writeString(clientMachine, out);
879      FSImageSerialization.writeBoolean(newBlock, out);
880      writeRpcIds(rpcClientId, rpcCallId, out);
881    }
882
883    @Override
884    protected void toXml(ContentHandler contentHandler) throws SAXException {
885      XMLUtils.addSaxString(contentHandler, "PATH", path);
886      XMLUtils.addSaxString(contentHandler, "CLIENT_NAME", clientName);
887      XMLUtils.addSaxString(contentHandler, "CLIENT_MACHINE", clientMachine);
888      XMLUtils.addSaxString(contentHandler, "NEWBLOCK",
889          Boolean.toString(newBlock));
890      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
891    }
892
893    @Override
894    void fromXml(Stanza st) throws InvalidXmlException {
895      this.path = st.getValue("PATH");
896      this.clientName = st.getValue("CLIENT_NAME");
897      this.clientMachine = st.getValue("CLIENT_MACHINE");
898      this.newBlock = Boolean.parseBoolean(st.getValue("NEWBLOCK"));
899      readRpcIdsFromXml(st);
900    }
901  }
902  
903  static class AddBlockOp extends FSEditLogOp {
904    private String path;
905    private Block penultimateBlock;
906    private Block lastBlock;
907    
908    AddBlockOp() {
909      super(OP_ADD_BLOCK);
910    }
911    
912    static AddBlockOp getInstance(OpInstanceCache cache) {
913      return (AddBlockOp) cache.get(OP_ADD_BLOCK);
914    }
915
916    @Override
917    void resetSubFields() {
918      path = null;
919      penultimateBlock = null;
920      lastBlock = null;
921    }
922    
923    AddBlockOp setPath(String path) {
924      this.path = path;
925      return this;
926    }
927    
928    public String getPath() {
929      return path;
930    }
931
932    AddBlockOp setPenultimateBlock(Block pBlock) {
933      this.penultimateBlock = pBlock;
934      return this;
935    }
936    
937    Block getPenultimateBlock() {
938      return penultimateBlock;
939    }
940    
941    AddBlockOp setLastBlock(Block lastBlock) {
942      this.lastBlock = lastBlock;
943      return this;
944    }
945    
946    Block getLastBlock() {
947      return lastBlock;
948    }
949
950    @Override
951    public void writeFields(DataOutputStream out) throws IOException {
952      FSImageSerialization.writeString(path, out);
953      int size = penultimateBlock != null ? 2 : 1;
954      Block[] blocks = new Block[size];
955      if (penultimateBlock != null) {
956        blocks[0] = penultimateBlock;
957      }
958      blocks[size - 1] = lastBlock;
959      FSImageSerialization.writeCompactBlockArray(blocks, out);
960      // clientId and callId
961      writeRpcIds(rpcClientId, rpcCallId, out);
962    }
963
964    @Override
965    void readFields(DataInputStream in, int logVersion) throws IOException {
966      path = FSImageSerialization.readString(in);
967      Block[] blocks = FSImageSerialization.readCompactBlockArray(in,
968          logVersion);
969      Preconditions.checkState(blocks.length == 2 || blocks.length == 1);
970      penultimateBlock = blocks.length == 1 ? null : blocks[0];
971      lastBlock = blocks[blocks.length - 1];
972      readRpcIds(in, logVersion);
973    }
974
975    @Override
976    public String toString() {
977      StringBuilder sb = new StringBuilder();
978      sb.append("AddBlockOp [path=")
979        .append(path)
980        .append(", penultimateBlock=")
981        .append(penultimateBlock == null ? "NULL" : penultimateBlock)
982        .append(", lastBlock=")
983        .append(lastBlock);
984      appendRpcIdsToString(sb, rpcClientId, rpcCallId);
985      sb.append("]");
986      return sb.toString();
987    }
988    
989    @Override
990    protected void toXml(ContentHandler contentHandler) throws SAXException {
991      XMLUtils.addSaxString(contentHandler, "PATH", path);
992      if (penultimateBlock != null) {
993        FSEditLogOp.blockToXml(contentHandler, penultimateBlock);
994      }
995      FSEditLogOp.blockToXml(contentHandler, lastBlock);
996      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
997    }
998    
999    @Override 
1000    void fromXml(Stanza st) throws InvalidXmlException {
1001      this.path = st.getValue("PATH");
1002      List<Stanza> blocks = st.getChildren("BLOCK");
1003      int size = blocks.size();
1004      Preconditions.checkState(size == 1 || size == 2);
1005      this.penultimateBlock = size == 2 ? 
1006          FSEditLogOp.blockFromXml(blocks.get(0)) : null;
1007      this.lastBlock = FSEditLogOp.blockFromXml(blocks.get(size - 1));
1008      readRpcIdsFromXml(st);
1009    }
1010  }
1011  
1012  /**
1013   * {@literal @AtMostOnce} for {@link ClientProtocol#updatePipeline}, but 
1014   * {@literal @Idempotent} for some other ops.
1015   */
1016  static class UpdateBlocksOp extends FSEditLogOp implements BlockListUpdatingOp {
1017    String path;
1018    Block[] blocks;
1019    
1020    UpdateBlocksOp() {
1021      super(OP_UPDATE_BLOCKS);
1022    }
1023    
1024    static UpdateBlocksOp getInstance(OpInstanceCache cache) {
1025      return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
1026    }
1027
1028    @Override
1029    void resetSubFields() {
1030      path = null;
1031      blocks = null;
1032    }
1033    
1034    UpdateBlocksOp setPath(String path) {
1035      this.path = path;
1036      return this;
1037    }
1038    
1039    @Override
1040    public String getPath() {
1041      return path;
1042    }
1043
1044    UpdateBlocksOp setBlocks(Block[] blocks) {
1045      this.blocks = blocks;
1046      return this;
1047    }
1048    
1049    @Override
1050    public Block[] getBlocks() {
1051      return blocks;
1052    }
1053
1054    @Override
1055    public
1056    void writeFields(DataOutputStream out) throws IOException {
1057      FSImageSerialization.writeString(path, out);
1058      FSImageSerialization.writeCompactBlockArray(blocks, out);
1059      // clientId and callId
1060      writeRpcIds(rpcClientId, rpcCallId, out);
1061    }
1062    
1063    @Override
1064    void readFields(DataInputStream in, int logVersion) throws IOException {
1065      path = FSImageSerialization.readString(in);
1066      this.blocks = FSImageSerialization.readCompactBlockArray(
1067          in, logVersion);
1068      readRpcIds(in, logVersion);
1069    }
1070
1071    @Override
1072    public boolean shouldCompleteLastBlock() {
1073      return false;
1074    }
1075
1076    @Override
1077    public String toString() {
1078      StringBuilder sb = new StringBuilder();
1079      sb.append("UpdateBlocksOp [path=")
1080        .append(path)
1081        .append(", blocks=")
1082        .append(Arrays.toString(blocks));
1083      appendRpcIdsToString(sb, rpcClientId, rpcCallId);
1084      sb.append("]");
1085      return sb.toString();
1086    }
1087    
1088    @Override
1089    protected void toXml(ContentHandler contentHandler) throws SAXException {
1090      XMLUtils.addSaxString(contentHandler, "PATH", path);
1091      for (Block b : blocks) {
1092        FSEditLogOp.blockToXml(contentHandler, b);
1093      }
1094      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
1095    }
1096    
1097    @Override void fromXml(Stanza st) throws InvalidXmlException {
1098      this.path = st.getValue("PATH");
1099      List<Stanza> blocks = st.hasChildren("BLOCK") ?
1100          st.getChildren("BLOCK") : new ArrayList<Stanza>();
1101      this.blocks = new Block[blocks.size()];
1102      for (int i = 0; i < blocks.size(); i++) {
1103        this.blocks[i] = FSEditLogOp.blockFromXml(blocks.get(i));
1104      }
1105      readRpcIdsFromXml(st);
1106    }
1107  }
1108
1109  /** {@literal @Idempotent} for {@link ClientProtocol#setReplication} */
1110  static class SetReplicationOp extends FSEditLogOp {
1111    String path;
1112    short replication;
1113
1114    SetReplicationOp() {
1115      super(OP_SET_REPLICATION);
1116    }
1117
1118    static SetReplicationOp getInstance(OpInstanceCache cache) {
1119      return (SetReplicationOp)cache.get(OP_SET_REPLICATION);
1120    }
1121
1122    @Override
1123    void resetSubFields() {
1124      path = null;
1125      replication = 0;
1126    }
1127
1128    SetReplicationOp setPath(String path) {
1129      this.path = path;
1130      return this;
1131    }
1132
1133    SetReplicationOp setReplication(short replication) {
1134      this.replication = replication;
1135      return this;
1136    }
1137
1138    @Override
1139    public 
1140    void writeFields(DataOutputStream out) throws IOException {
1141      FSImageSerialization.writeString(path, out);
1142      FSImageSerialization.writeShort(replication, out);
1143    }
1144    
1145    @Override
1146    void readFields(DataInputStream in, int logVersion)
1147        throws IOException {
1148      this.path = FSImageSerialization.readString(in);
1149      if (NameNodeLayoutVersion.supports(
1150          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1151        this.replication = FSImageSerialization.readShort(in);
1152      } else {
1153        this.replication = readShort(in);
1154      }
1155    }
1156
1157    @Override
1158    public String toString() {
1159      StringBuilder builder = new StringBuilder();
1160      builder.append("SetReplicationOp [path=");
1161      builder.append(path);
1162      builder.append(", replication=");
1163      builder.append(replication);
1164      builder.append(", opCode=");
1165      builder.append(opCode);
1166      builder.append(", txid=");
1167      builder.append(txid);
1168      builder.append("]");
1169      return builder.toString();
1170    }
1171    
1172    @Override
1173    protected void toXml(ContentHandler contentHandler) throws SAXException {
1174      XMLUtils.addSaxString(contentHandler, "PATH", path);
1175      XMLUtils.addSaxString(contentHandler, "REPLICATION",
1176          Short.toString(replication));
1177    }
1178    
1179    @Override void fromXml(Stanza st) throws InvalidXmlException {
1180      this.path = st.getValue("PATH");
1181      this.replication = Short.parseShort(st.getValue("REPLICATION"));
1182    }
1183  }
1184
1185  /** {@literal @AtMostOnce} for {@link ClientProtocol#concat} */
1186  static class ConcatDeleteOp extends FSEditLogOp {
1187    int length;
1188    String trg;
1189    String[] srcs;
1190    long timestamp;
1191    final static public int MAX_CONCAT_SRC = 1024 * 1024;
1192
1193    ConcatDeleteOp() {
1194      super(OP_CONCAT_DELETE);
1195    }
1196
1197    static ConcatDeleteOp getInstance(OpInstanceCache cache) {
1198      return (ConcatDeleteOp)cache.get(OP_CONCAT_DELETE);
1199    }
1200
1201    @Override
1202    void resetSubFields() {
1203      length = 0;
1204      trg = null;
1205      srcs = null;
1206      timestamp = 0L;
1207    }
1208
1209    ConcatDeleteOp setTarget(String trg) {
1210      this.trg = trg;
1211      return this;
1212    }
1213
1214    ConcatDeleteOp setSources(String[] srcs) {
1215      if (srcs.length > MAX_CONCAT_SRC) {
1216        throw new RuntimeException("ConcatDeleteOp can only have " +
1217            MAX_CONCAT_SRC + " sources at most.");
1218      }
1219      this.srcs = srcs;
1220
1221      return this;
1222    }
1223
1224    ConcatDeleteOp setTimestamp(long timestamp) {
1225      this.timestamp = timestamp;
1226      return this;
1227    }
1228
1229    @Override
1230    public void writeFields(DataOutputStream out) throws IOException {
1231      FSImageSerialization.writeString(trg, out);
1232            
1233      DeprecatedUTF8 info[] = new DeprecatedUTF8[srcs.length];
1234      int idx = 0;
1235      for(int i=0; i<srcs.length; i++) {
1236        info[idx++] = new DeprecatedUTF8(srcs[i]);
1237      }
1238      new ArrayWritable(DeprecatedUTF8.class, info).write(out);
1239
1240      FSImageSerialization.writeLong(timestamp, out);
1241      
1242      // rpc ids
1243      writeRpcIds(rpcClientId, rpcCallId, out);
1244    }
1245
1246    @Override
1247    void readFields(DataInputStream in, int logVersion)
1248        throws IOException {
1249      if (!NameNodeLayoutVersion.supports(
1250          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1251        this.length = in.readInt();
1252        if (length < 3) { // trg, srcs.., timestamp
1253          throw new IOException("Incorrect data format " +
1254              "for ConcatDeleteOp.");
1255        }
1256      }
1257      this.trg = FSImageSerialization.readString(in);
1258      int srcSize = 0;
1259      if (NameNodeLayoutVersion.supports(
1260          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1261        srcSize = in.readInt();
1262      } else {
1263        srcSize = this.length - 1 - 1; // trg and timestamp
1264      }
1265      if (srcSize < 0) {
1266          throw new IOException("Incorrect data format. "
1267              + "ConcatDeleteOp cannot have a negative number of data " +
1268              " sources.");
1269      } else if (srcSize > MAX_CONCAT_SRC) {
1270          throw new IOException("Incorrect data format. "
1271              + "ConcatDeleteOp can have at most " + MAX_CONCAT_SRC +
1272              " sources, but we tried to have " + (length - 3) + " sources.");
1273      }
1274      this.srcs = new String [srcSize];
1275      for(int i=0; i<srcSize;i++) {
1276        srcs[i]= FSImageSerialization.readString(in);
1277      }
1278      
1279      if (NameNodeLayoutVersion.supports(
1280          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1281        this.timestamp = FSImageSerialization.readLong(in);
1282      } else {
1283        this.timestamp = readLong(in);
1284      }
1285      // read RPC ids if necessary
1286      readRpcIds(in, logVersion);
1287    }
1288
1289    @Override
1290    public String toString() {
1291      StringBuilder builder = new StringBuilder();
1292      builder.append("ConcatDeleteOp [length=");
1293      builder.append(length);
1294      builder.append(", trg=");
1295      builder.append(trg);
1296      builder.append(", srcs=");
1297      builder.append(Arrays.toString(srcs));
1298      builder.append(", timestamp=");
1299      builder.append(timestamp);
1300      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
1301      builder.append(", opCode=");
1302      builder.append(opCode);
1303      builder.append(", txid=");
1304      builder.append(txid);
1305      builder.append("]");
1306      return builder.toString();
1307    }
1308    
1309    @Override
1310    protected void toXml(ContentHandler contentHandler) throws SAXException {
1311      XMLUtils.addSaxString(contentHandler, "LENGTH",
1312          Integer.toString(length));
1313      XMLUtils.addSaxString(contentHandler, "TRG", trg);
1314      XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
1315          Long.toString(timestamp));
1316      contentHandler.startElement("", "", "SOURCES", new AttributesImpl());
1317      for (int i = 0; i < srcs.length; ++i) {
1318        XMLUtils.addSaxString(contentHandler,
1319            "SOURCE" + (i + 1), srcs[i]);
1320      }
1321      contentHandler.endElement("", "", "SOURCES");
1322      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
1323    }
1324    
1325    @Override void fromXml(Stanza st) throws InvalidXmlException {
1326      this.length = Integer.parseInt(st.getValue("LENGTH"));
1327      this.trg = st.getValue("TRG");
1328      this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
1329      List<Stanza> sources = st.getChildren("SOURCES");
1330      int i = 0;
1331      while (true) {
1332        if (!sources.get(0).hasChildren("SOURCE" + (i + 1)))
1333          break;
1334        i++;
1335      }
1336      srcs = new String[i];
1337      for (i = 0; i < srcs.length; i++) {
1338        srcs[i] = sources.get(0).getValue("SOURCE" + (i + 1));
1339      }
1340      readRpcIdsFromXml(st);
1341    }
1342  }
1343
1344  /** {@literal @AtMostOnce} for {@link ClientProtocol#rename} */
1345  static class RenameOldOp extends FSEditLogOp {
1346    int length;
1347    String src;
1348    String dst;
1349    long timestamp;
1350
1351    RenameOldOp() {
1352      super(OP_RENAME_OLD);
1353    }
1354
1355    static RenameOldOp getInstance(OpInstanceCache cache) {
1356      return (RenameOldOp)cache.get(OP_RENAME_OLD);
1357    }
1358
1359    @Override
1360    void resetSubFields() {
1361      length = 0;
1362      src = null;
1363      dst = null;
1364      timestamp = 0L;
1365    }
1366
1367    RenameOldOp setSource(String src) {
1368      this.src = src;
1369      return this;
1370    }
1371
1372    RenameOldOp setDestination(String dst) {
1373      this.dst = dst;
1374      return this;
1375    }
1376
1377    RenameOldOp setTimestamp(long timestamp) {
1378      this.timestamp = timestamp;
1379      return this;
1380    }
1381
1382    @Override
1383    public 
1384    void writeFields(DataOutputStream out) throws IOException {
1385      FSImageSerialization.writeString(src, out);
1386      FSImageSerialization.writeString(dst, out);
1387      FSImageSerialization.writeLong(timestamp, out);
1388      writeRpcIds(rpcClientId, rpcCallId, out);
1389    }
1390
1391    @Override
1392    void readFields(DataInputStream in, int logVersion)
1393        throws IOException {
1394      if (!NameNodeLayoutVersion.supports(
1395          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1396        this.length = in.readInt();
1397        if (this.length != 3) {
1398          throw new IOException("Incorrect data format. "
1399              + "Old rename operation.");
1400        }
1401      }
1402      this.src = FSImageSerialization.readString(in);
1403      this.dst = FSImageSerialization.readString(in);
1404      if (NameNodeLayoutVersion.supports(
1405          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1406        this.timestamp = FSImageSerialization.readLong(in);
1407      } else {
1408        this.timestamp = readLong(in);
1409      }
1410      
1411      // read RPC ids if necessary
1412      readRpcIds(in, logVersion);
1413    }
1414
1415    @Override
1416    public String toString() {
1417      StringBuilder builder = new StringBuilder();
1418      builder.append("RenameOldOp [length=");
1419      builder.append(length);
1420      builder.append(", src=");
1421      builder.append(src);
1422      builder.append(", dst=");
1423      builder.append(dst);
1424      builder.append(", timestamp=");
1425      builder.append(timestamp);
1426      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
1427      builder.append(", opCode=");
1428      builder.append(opCode);
1429      builder.append(", txid=");
1430      builder.append(txid);
1431      builder.append("]");
1432      return builder.toString();
1433    }
1434    
1435    @Override
1436    protected void toXml(ContentHandler contentHandler) throws SAXException {
1437      XMLUtils.addSaxString(contentHandler, "LENGTH",
1438          Integer.toString(length));
1439      XMLUtils.addSaxString(contentHandler, "SRC", src);
1440      XMLUtils.addSaxString(contentHandler, "DST", dst);
1441      XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
1442          Long.toString(timestamp));
1443      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
1444    }
1445    
1446    @Override 
1447    void fromXml(Stanza st) throws InvalidXmlException {
1448      this.length = Integer.parseInt(st.getValue("LENGTH"));
1449      this.src = st.getValue("SRC");
1450      this.dst = st.getValue("DST");
1451      this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
1452      
1453      readRpcIdsFromXml(st);
1454    }
1455  }
1456
1457  /** {@literal @AtMostOnce} for {@link ClientProtocol#delete} */
1458  static class DeleteOp extends FSEditLogOp {
1459    int length;
1460    String path;
1461    long timestamp;
1462
1463    DeleteOp() {
1464      super(OP_DELETE);
1465    }
1466
1467    static DeleteOp getInstance(OpInstanceCache cache) {
1468      return (DeleteOp)cache.get(OP_DELETE);
1469    }
1470
1471    @Override
1472    void resetSubFields() {
1473      length = 0;
1474      path = null;
1475      timestamp = 0L;
1476    }
1477
1478    DeleteOp setPath(String path) {
1479      this.path = path;
1480      return this;
1481    }
1482
1483    DeleteOp setTimestamp(long timestamp) {
1484      this.timestamp = timestamp;
1485      return this;
1486    }
1487
1488    @Override
1489    public 
1490    void writeFields(DataOutputStream out) throws IOException {
1491      FSImageSerialization.writeString(path, out);
1492      FSImageSerialization.writeLong(timestamp, out);
1493      writeRpcIds(rpcClientId, rpcCallId, out);
1494    }
1495
1496    @Override
1497    void readFields(DataInputStream in, int logVersion)
1498        throws IOException {
1499      if (!NameNodeLayoutVersion.supports(
1500          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1501        this.length = in.readInt();
1502        if (this.length != 2) {
1503          throw new IOException("Incorrect data format. " + "delete operation.");
1504        }
1505      }
1506      this.path = FSImageSerialization.readString(in);
1507      if (NameNodeLayoutVersion.supports(
1508          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1509        this.timestamp = FSImageSerialization.readLong(in);
1510      } else {
1511        this.timestamp = readLong(in);
1512      }
1513      // read RPC ids if necessary
1514      readRpcIds(in, logVersion);
1515    }
1516
1517    @Override
1518    public String toString() {
1519      StringBuilder builder = new StringBuilder();
1520      builder.append("DeleteOp [length=");
1521      builder.append(length);
1522      builder.append(", path=");
1523      builder.append(path);
1524      builder.append(", timestamp=");
1525      builder.append(timestamp);
1526      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
1527      builder.append(", opCode=");
1528      builder.append(opCode);
1529      builder.append(", txid=");
1530      builder.append(txid);
1531      builder.append("]");
1532      return builder.toString();
1533    }
1534    
1535    @Override
1536    protected void toXml(ContentHandler contentHandler) throws SAXException {
1537      XMLUtils.addSaxString(contentHandler, "LENGTH",
1538          Integer.toString(length));
1539      XMLUtils.addSaxString(contentHandler, "PATH", path);
1540      XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
1541          Long.toString(timestamp));
1542      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
1543    }
1544    
1545    @Override void fromXml(Stanza st) throws InvalidXmlException {
1546      this.length = Integer.parseInt(st.getValue("LENGTH"));
1547      this.path = st.getValue("PATH");
1548      this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
1549      
1550      readRpcIdsFromXml(st);
1551    }
1552  }
1553
1554  /** {@literal @Idempotent} for {@link ClientProtocol#mkdirs} */
1555  static class MkdirOp extends FSEditLogOp {
1556    int length;
1557    long inodeId;
1558    String path;
1559    long timestamp;
1560    PermissionStatus permissions;
1561    List<AclEntry> aclEntries;
1562    List<XAttr> xAttrs;
1563
1564    MkdirOp() {
1565      super(OP_MKDIR);
1566    }
1567    
1568    static MkdirOp getInstance(OpInstanceCache cache) {
1569      return (MkdirOp)cache.get(OP_MKDIR);
1570    }
1571
1572    @Override
1573    void resetSubFields() {
1574      length = 0;
1575      inodeId = 0L;
1576      path = null;
1577      timestamp = 0L;
1578      permissions = null;
1579      aclEntries = null;
1580      xAttrs = null;
1581    }
1582
1583    MkdirOp setInodeId(long inodeId) {
1584      this.inodeId = inodeId;
1585      return this;
1586    }
1587    
1588    MkdirOp setPath(String path) {
1589      this.path = path;
1590      return this;
1591    }
1592
1593    MkdirOp setTimestamp(long timestamp) {
1594      this.timestamp = timestamp;
1595      return this;
1596    }
1597
1598    MkdirOp setPermissionStatus(PermissionStatus permissions) {
1599      this.permissions = permissions;
1600      return this;
1601    }
1602
1603    MkdirOp setAclEntries(List<AclEntry> aclEntries) {
1604      this.aclEntries = aclEntries;
1605      return this;
1606    }
1607
1608    MkdirOp setXAttrs(List<XAttr> xAttrs) {
1609      this.xAttrs = xAttrs;
1610      return this;
1611    }
1612
1613    @Override
1614    public 
1615    void writeFields(DataOutputStream out) throws IOException {
1616      FSImageSerialization.writeLong(inodeId, out);
1617      FSImageSerialization.writeString(path, out);
1618      FSImageSerialization.writeLong(timestamp, out); // mtime
1619      FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
1620      permissions.write(out);
1621      AclEditLogUtil.write(aclEntries, out);
1622      XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
1623      b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
1624      b.build().writeDelimitedTo(out);
1625    }
1626    
1627    @Override
1628    void readFields(DataInputStream in, int logVersion) throws IOException {
1629      if (!NameNodeLayoutVersion.supports(
1630          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1631        this.length = in.readInt();
1632      }
1633      if (-17 < logVersion && length != 2 ||
1634          logVersion <= -17 && length != 3
1635          && !NameNodeLayoutVersion.supports(
1636              LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1637        throw new IOException("Incorrect data format. Mkdir operation.");
1638      }
1639      if (NameNodeLayoutVersion.supports(
1640          LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
1641        this.inodeId = FSImageSerialization.readLong(in);
1642      } else {
1643        // This id should be updated when this editLogOp is applied
1644        this.inodeId = HdfsConstants.GRANDFATHER_INODE_ID;
1645      }
1646      this.path = FSImageSerialization.readString(in);
1647      if (NameNodeLayoutVersion.supports(
1648          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1649        this.timestamp = FSImageSerialization.readLong(in);
1650      } else {
1651        this.timestamp = readLong(in);
1652      }
1653
1654      // The disk format stores atimes for directories as well.
1655      // However, currently this is not being updated/used because of
1656      // performance reasons.
1657      if (NameNodeLayoutVersion.supports(
1658          LayoutVersion.Feature.FILE_ACCESS_TIME, logVersion)) {
1659        if (NameNodeLayoutVersion.supports(
1660            LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
1661          FSImageSerialization.readLong(in);
1662        } else {
1663          readLong(in);
1664        }
1665      }
1666
1667      this.permissions = PermissionStatus.read(in);
1668      aclEntries = AclEditLogUtil.read(in, logVersion);
1669
1670      xAttrs = readXAttrsFromEditLog(in, logVersion);
1671    }
1672
1673    @Override
1674    public String toString() {
1675      StringBuilder builder = new StringBuilder();
1676      builder.append("MkdirOp [length=");
1677      builder.append(length);
1678      builder.append(", inodeId=");
1679      builder.append(inodeId);
1680      builder.append(", path=");
1681      builder.append(path);
1682      builder.append(", timestamp=");
1683      builder.append(timestamp);
1684      builder.append(", permissions=");
1685      builder.append(permissions);
1686      builder.append(", aclEntries=");
1687      builder.append(aclEntries);
1688      builder.append(", opCode=");
1689      builder.append(opCode);
1690      builder.append(", txid=");
1691      builder.append(txid);
1692      builder.append(", xAttrs=");
1693      builder.append(xAttrs);
1694      builder.append("]");
1695      return builder.toString();
1696    }
1697
1698    @Override
1699    protected void toXml(ContentHandler contentHandler) throws SAXException {
1700      XMLUtils.addSaxString(contentHandler, "LENGTH",
1701          Integer.toString(length));
1702      XMLUtils.addSaxString(contentHandler, "INODEID",
1703          Long.toString(inodeId));
1704      XMLUtils.addSaxString(contentHandler, "PATH", path);
1705      XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
1706          Long.toString(timestamp));
1707      FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
1708      if (aclEntries != null) {
1709        appendAclEntriesToXml(contentHandler, aclEntries);
1710      }
1711      if (xAttrs != null) {
1712        appendXAttrsToXml(contentHandler, xAttrs);
1713      }
1714    }
1715    
1716    @Override void fromXml(Stanza st) throws InvalidXmlException {
1717      this.length = Integer.parseInt(st.getValue("LENGTH"));
1718      this.inodeId = Long.parseLong(st.getValue("INODEID"));
1719      this.path = st.getValue("PATH");
1720      this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
1721      this.permissions = permissionStatusFromXml(st);
1722      aclEntries = readAclEntriesFromXml(st);
1723      xAttrs = readXAttrsFromXml(st);
1724    }
1725  }
1726
1727  /**
1728   * The corresponding operations are either {@literal @Idempotent} (
1729   * {@link ClientProtocol#updateBlockForPipeline},
1730   * {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or
1731   * already bound with other editlog op which records rpc ids (
1732   * {@link ClientProtocol#create}). Thus no need to record rpc ids here.
1733   */
1734  static class SetGenstampV1Op extends FSEditLogOp {
1735    long genStampV1;
1736
1737    SetGenstampV1Op() {
1738      super(OP_SET_GENSTAMP_V1);
1739    }
1740
1741    static SetGenstampV1Op getInstance(OpInstanceCache cache) {
1742      return (SetGenstampV1Op)cache.get(OP_SET_GENSTAMP_V1);
1743    }
1744
1745    @Override
1746    void resetSubFields() {
1747      genStampV1 = 0L;
1748    }
1749
1750    SetGenstampV1Op setGenerationStamp(long genStamp) {
1751      this.genStampV1 = genStamp;
1752      return this;
1753    }
1754
1755    @Override
1756    public
1757    void writeFields(DataOutputStream out) throws IOException {
1758      FSImageSerialization.writeLong(genStampV1, out);
1759    }
1760
1761    @Override
1762    void readFields(DataInputStream in, int logVersion)
1763        throws IOException {
1764      this.genStampV1 = FSImageSerialization.readLong(in);
1765    }
1766
1767    @Override
1768    public String toString() {
1769      StringBuilder builder = new StringBuilder();
1770      builder.append("SetGenstampOp [GenStamp=");
1771      builder.append(genStampV1);
1772      builder.append(", opCode=");
1773      builder.append(opCode);
1774      builder.append(", txid=");
1775      builder.append(txid);
1776      builder.append("]");
1777      return builder.toString();
1778    }
1779
1780    @Override
1781    protected void toXml(ContentHandler contentHandler) throws SAXException {
1782      XMLUtils.addSaxString(contentHandler, "GENSTAMP",
1783                            Long.toString(genStampV1));
1784    }
1785
1786    @Override void fromXml(Stanza st) throws InvalidXmlException {
1787      this.genStampV1 = Long.parseLong(st.getValue("GENSTAMP"));
1788    }
1789  }
1790
1791  /** Similar with {@link SetGenstampV1Op} */
1792  static class SetGenstampV2Op extends FSEditLogOp {
1793    long genStampV2;
1794
1795    SetGenstampV2Op() {
1796      super(OP_SET_GENSTAMP_V2);
1797    }
1798
1799    static SetGenstampV2Op getInstance(OpInstanceCache cache) {
1800      return (SetGenstampV2Op)cache.get(OP_SET_GENSTAMP_V2);
1801    }
1802
1803    @Override
1804    void resetSubFields() {
1805      genStampV2 = 0L;
1806    }
1807
1808    SetGenstampV2Op setGenerationStamp(long genStamp) {
1809      this.genStampV2 = genStamp;
1810      return this;
1811    }
1812
1813    @Override
1814    public
1815    void writeFields(DataOutputStream out) throws IOException {
1816      FSImageSerialization.writeLong(genStampV2, out);
1817    }
1818
1819    @Override
1820    void readFields(DataInputStream in, int logVersion)
1821        throws IOException {
1822      this.genStampV2 = FSImageSerialization.readLong(in);
1823    }
1824
1825    @Override
1826    public String toString() {
1827      StringBuilder builder = new StringBuilder();
1828      builder.append("SetGenstampV2Op [GenStampV2=");
1829      builder.append(genStampV2);
1830      builder.append(", opCode=");
1831      builder.append(opCode);
1832      builder.append(", txid=");
1833      builder.append(txid);
1834      builder.append("]");
1835      return builder.toString();
1836    }
1837
1838    @Override
1839    protected void toXml(ContentHandler contentHandler) throws SAXException {
1840      XMLUtils.addSaxString(contentHandler, "GENSTAMPV2",
1841                            Long.toString(genStampV2));
1842    }
1843
1844    @Override void fromXml(Stanza st) throws InvalidXmlException {
1845      this.genStampV2 = Long.parseLong(st.getValue("GENSTAMPV2"));
1846    }
1847  }
1848
1849  /** {@literal @Idempotent} for {@link ClientProtocol#addBlock} */
1850  static class AllocateBlockIdOp extends FSEditLogOp {
1851    long blockId;
1852
1853    AllocateBlockIdOp() {
1854      super(OP_ALLOCATE_BLOCK_ID);
1855    }
1856
1857    static AllocateBlockIdOp getInstance(OpInstanceCache cache) {
1858      return (AllocateBlockIdOp)cache.get(OP_ALLOCATE_BLOCK_ID);
1859    }
1860
1861    @Override
1862    void resetSubFields() {
1863      blockId = 0L;
1864    }
1865
1866    AllocateBlockIdOp setBlockId(long blockId) {
1867      this.blockId = blockId;
1868      return this;
1869    }
1870
1871    @Override
1872    public
1873    void writeFields(DataOutputStream out) throws IOException {
1874      FSImageSerialization.writeLong(blockId, out);
1875    }
1876
1877    @Override
1878    void readFields(DataInputStream in, int logVersion)
1879        throws IOException {
1880      this.blockId = FSImageSerialization.readLong(in);
1881    }
1882
1883    @Override
1884    public String toString() {
1885      StringBuilder builder = new StringBuilder();
1886      builder.append("AllocateBlockIdOp [blockId=");
1887      builder.append(blockId);
1888      builder.append(", opCode=");
1889      builder.append(opCode);
1890      builder.append(", txid=");
1891      builder.append(txid);
1892      builder.append("]");
1893      return builder.toString();
1894    }
1895
1896    @Override
1897    protected void toXml(ContentHandler contentHandler) throws SAXException {
1898      XMLUtils.addSaxString(contentHandler, "BLOCK_ID",
1899                            Long.toString(blockId));
1900    }
1901
1902    @Override void fromXml(Stanza st) throws InvalidXmlException {
1903      this.blockId = Long.parseLong(st.getValue("BLOCK_ID"));
1904    }
1905  }
1906
1907  /** {@literal @Idempotent} for {@link ClientProtocol#setPermission} */
1908  static class SetPermissionsOp extends FSEditLogOp {
1909    String src;
1910    FsPermission permissions;
1911
1912    SetPermissionsOp() {
1913      super(OP_SET_PERMISSIONS);
1914    }
1915
1916    static SetPermissionsOp getInstance(OpInstanceCache cache) {
1917      return (SetPermissionsOp)cache.get(OP_SET_PERMISSIONS);
1918    }
1919
1920    @Override
1921    void resetSubFields() {
1922      src = null;
1923      permissions = null;
1924    }
1925
1926    SetPermissionsOp setSource(String src) {
1927      this.src = src;
1928      return this;
1929    }
1930
1931    SetPermissionsOp setPermissions(FsPermission permissions) {
1932      this.permissions = permissions;
1933      return this;
1934    }
1935
1936    @Override
1937    public 
1938    void writeFields(DataOutputStream out) throws IOException {
1939      FSImageSerialization.writeString(src, out);
1940      permissions.write(out);
1941     }
1942 
1943    @Override
1944    void readFields(DataInputStream in, int logVersion)
1945        throws IOException {
1946      this.src = FSImageSerialization.readString(in);
1947      this.permissions = FsPermission.read(in);
1948    }
1949
1950    @Override
1951    public String toString() {
1952      StringBuilder builder = new StringBuilder();
1953      builder.append("SetPermissionsOp [src=");
1954      builder.append(src);
1955      builder.append(", permissions=");
1956      builder.append(permissions);
1957      builder.append(", opCode=");
1958      builder.append(opCode);
1959      builder.append(", txid=");
1960      builder.append(txid);
1961      builder.append("]");
1962      return builder.toString();
1963    }
1964    
1965    @Override
1966    protected void toXml(ContentHandler contentHandler) throws SAXException {
1967      XMLUtils.addSaxString(contentHandler, "SRC", src);
1968      XMLUtils.addSaxString(contentHandler, "MODE",
1969          Short.toString(permissions.toShort()));
1970    }
1971    
1972    @Override void fromXml(Stanza st) throws InvalidXmlException {
1973      this.src = st.getValue("SRC");
1974      this.permissions = new FsPermission(
1975          Short.parseShort(st.getValue("MODE")));
1976    }
1977  }
1978
1979  /** {@literal @Idempotent} for {@link ClientProtocol#setOwner} */
1980  static class SetOwnerOp extends FSEditLogOp {
1981    String src;
1982    String username;
1983    String groupname;
1984
1985    SetOwnerOp() {
1986      super(OP_SET_OWNER);
1987    }
1988
1989    static SetOwnerOp getInstance(OpInstanceCache cache) {
1990      return (SetOwnerOp)cache.get(OP_SET_OWNER);
1991    }
1992
1993    @Override
1994    void resetSubFields() {
1995      src = null;
1996      username = null;
1997      groupname = null;
1998    }
1999
2000    SetOwnerOp setSource(String src) {
2001      this.src = src;
2002      return this;
2003    }
2004
2005    SetOwnerOp setUser(String username) {
2006      this.username = username;
2007      return this;
2008    }
2009
2010    SetOwnerOp setGroup(String groupname) {
2011      this.groupname = groupname;
2012      return this;
2013    }
2014
2015    @Override
2016    public 
2017    void writeFields(DataOutputStream out) throws IOException {
2018      FSImageSerialization.writeString(src, out);
2019      FSImageSerialization.writeString(username == null ? "" : username, out);
2020      FSImageSerialization.writeString(groupname == null ? "" : groupname, out);
2021    }
2022
2023    @Override
2024    void readFields(DataInputStream in, int logVersion)
2025        throws IOException {
2026      this.src = FSImageSerialization.readString(in);
2027      this.username = FSImageSerialization.readString_EmptyAsNull(in);
2028      this.groupname = FSImageSerialization.readString_EmptyAsNull(in);
2029    }
2030
2031    @Override
2032    public String toString() {
2033      StringBuilder builder = new StringBuilder();
2034      builder.append("SetOwnerOp [src=");
2035      builder.append(src);
2036      builder.append(", username=");
2037      builder.append(username);
2038      builder.append(", groupname=");
2039      builder.append(groupname);
2040      builder.append(", opCode=");
2041      builder.append(opCode);
2042      builder.append(", txid=");
2043      builder.append(txid);
2044      builder.append("]");
2045      return builder.toString();
2046    }
2047    
2048    @Override
2049    protected void toXml(ContentHandler contentHandler) throws SAXException {
2050      XMLUtils.addSaxString(contentHandler, "SRC", src);
2051      if (username != null) {
2052        XMLUtils.addSaxString(contentHandler, "USERNAME", username);
2053      }
2054      if (groupname != null) {
2055        XMLUtils.addSaxString(contentHandler, "GROUPNAME", groupname);
2056      }
2057    }
2058    
2059    @Override void fromXml(Stanza st) throws InvalidXmlException {
2060      this.src = st.getValue("SRC");
2061      this.username = (st.hasChildren("USERNAME")) ? 
2062          st.getValue("USERNAME") : null;
2063      this.groupname = (st.hasChildren("GROUPNAME")) ? 
2064          st.getValue("GROUPNAME") : null;
2065    }
2066  }
2067  
2068  static class SetNSQuotaOp extends FSEditLogOp {
2069    String src;
2070    long nsQuota;
2071
2072    SetNSQuotaOp() {
2073      super(OP_SET_NS_QUOTA);
2074    }
2075
2076    static SetNSQuotaOp getInstance(OpInstanceCache cache) {
2077      return (SetNSQuotaOp)cache.get(OP_SET_NS_QUOTA);
2078    }
2079
2080    @Override
2081    void resetSubFields() {
2082      src = null;
2083      nsQuota = 0L;
2084    }
2085
2086    @Override
2087    public 
2088    void writeFields(DataOutputStream out) throws IOException {
2089      throw new IOException("Deprecated");      
2090    }
2091
2092    @Override
2093    void readFields(DataInputStream in, int logVersion)
2094        throws IOException {
2095      this.src = FSImageSerialization.readString(in);
2096      this.nsQuota = FSImageSerialization.readLong(in);
2097    }
2098
2099    @Override
2100    public String toString() {
2101      StringBuilder builder = new StringBuilder();
2102      builder.append("SetNSQuotaOp [src=");
2103      builder.append(src);
2104      builder.append(", nsQuota=");
2105      builder.append(nsQuota);
2106      builder.append(", opCode=");
2107      builder.append(opCode);
2108      builder.append(", txid=");
2109      builder.append(txid);
2110      builder.append("]");
2111      return builder.toString();
2112    }
2113    
2114    @Override
2115    protected void toXml(ContentHandler contentHandler) throws SAXException {
2116      XMLUtils.addSaxString(contentHandler, "SRC", src);
2117      XMLUtils.addSaxString(contentHandler, "NSQUOTA",
2118          Long.toString(nsQuota));
2119    }
2120    
2121    @Override void fromXml(Stanza st) throws InvalidXmlException {
2122      this.src = st.getValue("SRC");
2123      this.nsQuota = Long.parseLong(st.getValue("NSQUOTA"));
2124    }
2125  }
2126
2127  static class ClearNSQuotaOp extends FSEditLogOp {
2128    String src;
2129
2130    ClearNSQuotaOp() {
2131      super(OP_CLEAR_NS_QUOTA);
2132    }
2133
2134    static ClearNSQuotaOp getInstance(OpInstanceCache cache) {
2135      return (ClearNSQuotaOp)cache.get(OP_CLEAR_NS_QUOTA);
2136    }
2137
2138    @Override
2139    void resetSubFields() {
2140      src = null;
2141    }
2142
2143    @Override
2144    public 
2145    void writeFields(DataOutputStream out) throws IOException {
2146      throw new IOException("Deprecated");      
2147    }
2148
2149    @Override
2150    void readFields(DataInputStream in, int logVersion)
2151        throws IOException {
2152      this.src = FSImageSerialization.readString(in);
2153    }
2154
2155    @Override
2156    public String toString() {
2157      StringBuilder builder = new StringBuilder();
2158      builder.append("ClearNSQuotaOp [src=");
2159      builder.append(src);
2160      builder.append(", opCode=");
2161      builder.append(opCode);
2162      builder.append(", txid=");
2163      builder.append(txid);
2164      builder.append("]");
2165      return builder.toString();
2166    }
2167    
2168    @Override
2169    protected void toXml(ContentHandler contentHandler) throws SAXException {
2170      XMLUtils.addSaxString(contentHandler, "SRC", src);
2171    }
2172    
2173    @Override void fromXml(Stanza st) throws InvalidXmlException {
2174      this.src = st.getValue("SRC");
2175    }
2176  }
2177
2178  /** {@literal @Idempotent} for {@link ClientProtocol#setQuota} */
2179  static class SetQuotaOp extends FSEditLogOp {
2180    String src;
2181    long nsQuota;
2182    long dsQuota;
2183
2184    SetQuotaOp() {
2185      super(OP_SET_QUOTA);
2186    }
2187
2188    static SetQuotaOp getInstance(OpInstanceCache cache) {
2189      return (SetQuotaOp)cache.get(OP_SET_QUOTA);
2190    }
2191
2192    @Override
2193    void resetSubFields() {
2194      src = null;
2195      nsQuota = 0L;
2196      dsQuota = 0L;
2197    }
2198
2199    SetQuotaOp setSource(String src) {
2200      this.src = src;
2201      return this;
2202    }
2203
2204    SetQuotaOp setNSQuota(long nsQuota) {
2205      this.nsQuota = nsQuota;
2206      return this;
2207    }
2208
2209    SetQuotaOp setDSQuota(long dsQuota) {
2210      this.dsQuota = dsQuota;
2211      return this;
2212    }
2213
2214    @Override
2215    public 
2216    void writeFields(DataOutputStream out) throws IOException {
2217      FSImageSerialization.writeString(src, out);
2218      FSImageSerialization.writeLong(nsQuota, out);
2219      FSImageSerialization.writeLong(dsQuota, out);
2220    }
2221
2222    @Override
2223    void readFields(DataInputStream in, int logVersion)
2224        throws IOException {
2225      this.src = FSImageSerialization.readString(in);
2226      this.nsQuota = FSImageSerialization.readLong(in);
2227      this.dsQuota = FSImageSerialization.readLong(in);
2228    }
2229
2230    @Override
2231    public String toString() {
2232      StringBuilder builder = new StringBuilder();
2233      builder.append("SetQuotaOp [src=");
2234      builder.append(src);
2235      builder.append(", nsQuota=");
2236      builder.append(nsQuota);
2237      builder.append(", dsQuota=");
2238      builder.append(dsQuota);
2239      builder.append(", opCode=");
2240      builder.append(opCode);
2241      builder.append(", txid=");
2242      builder.append(txid);
2243      builder.append("]");
2244      return builder.toString();
2245    }
2246    
2247    @Override
2248    protected void toXml(ContentHandler contentHandler) throws SAXException {
2249      XMLUtils.addSaxString(contentHandler, "SRC", src);
2250      XMLUtils.addSaxString(contentHandler, "NSQUOTA",
2251          Long.toString(nsQuota));
2252      XMLUtils.addSaxString(contentHandler, "DSQUOTA",
2253          Long.toString(dsQuota));
2254    }
2255    
2256    @Override void fromXml(Stanza st) throws InvalidXmlException {
2257      this.src = st.getValue("SRC");
2258      this.nsQuota = Long.parseLong(st.getValue("NSQUOTA"));
2259      this.dsQuota = Long.parseLong(st.getValue("DSQUOTA"));
2260    }
2261  }
2262
2263  /** {@literal @Idempotent} for {@link ClientProtocol#setQuota} */
2264  static class SetQuotaByStorageTypeOp extends FSEditLogOp {
2265    String src;
2266    long dsQuota;
2267    StorageType type;
2268
2269    SetQuotaByStorageTypeOp() {
2270      super(OP_SET_QUOTA_BY_STORAGETYPE);
2271    }
2272
2273    static SetQuotaByStorageTypeOp getInstance(OpInstanceCache cache) {
2274      return (SetQuotaByStorageTypeOp)cache.get(OP_SET_QUOTA_BY_STORAGETYPE);
2275    }
2276
2277    @Override
2278    void resetSubFields() {
2279      src = null;
2280      dsQuota = -1L;
2281      type = StorageType.DEFAULT;
2282    }
2283
2284    SetQuotaByStorageTypeOp setSource(String src) {
2285      this.src = src;
2286      return this;
2287    }
2288
2289    SetQuotaByStorageTypeOp setQuotaByStorageType(long dsQuota, StorageType type) {
2290      this.type = type;
2291      this.dsQuota = dsQuota;
2292      return this;
2293    }
2294
2295    @Override
2296    public
2297    void writeFields(DataOutputStream out) throws IOException {
2298      FSImageSerialization.writeString(src, out);
2299      FSImageSerialization.writeInt(type.ordinal(), out);
2300      FSImageSerialization.writeLong(dsQuota, out);
2301    }
2302
2303    @Override
2304    void readFields(DataInputStream in, int logVersion)
2305      throws IOException {
2306      this.src = FSImageSerialization.readString(in);
2307      this.type = StorageType.parseStorageType(FSImageSerialization.readInt(in));
2308      this.dsQuota = FSImageSerialization.readLong(in);
2309    }
2310
2311    @Override
2312    public String toString() {
2313      StringBuilder builder = new StringBuilder();
2314      builder.append("SetTypeQuotaOp [src=");
2315      builder.append(src);
2316      builder.append(", storageType=");
2317      builder.append(type);
2318      builder.append(", dsQuota=");
2319      builder.append(dsQuota);
2320      builder.append(", opCode=");
2321      builder.append(opCode);
2322      builder.append(", txid=");
2323      builder.append(txid);
2324      builder.append("]");
2325      return builder.toString();
2326    }
2327
2328    @Override
2329    protected void toXml(ContentHandler contentHandler) throws SAXException {
2330      XMLUtils.addSaxString(contentHandler, "SRC", src);
2331      XMLUtils.addSaxString(contentHandler, "STORAGETYPE",
2332        Integer.toString(type.ordinal()));
2333      XMLUtils.addSaxString(contentHandler, "DSQUOTA",
2334        Long.toString(dsQuota));
2335    }
2336
2337    @Override void fromXml(Stanza st) throws InvalidXmlException {
2338      this.src = st.getValue("SRC");
2339      this.type = StorageType.parseStorageType(
2340          Integer.parseInt(st.getValue("STORAGETYPE")));
2341      this.dsQuota = Long.parseLong(st.getValue("DSQUOTA"));
2342    }
2343  }
2344
2345  /** {@literal @Idempotent} for {@link ClientProtocol#setTimes} */
2346  static class TimesOp extends FSEditLogOp {
2347    int length;
2348    String path;
2349    long mtime;
2350    long atime;
2351
2352    TimesOp() {
2353      super(OP_TIMES);
2354    }
2355
2356    static TimesOp getInstance(OpInstanceCache cache) {
2357      return (TimesOp)cache.get(OP_TIMES);
2358    }
2359
2360    @Override
2361    void resetSubFields() {
2362      length = 0;
2363      path = null;
2364      mtime = 0L;
2365      atime = 0L;
2366    }
2367
2368    TimesOp setPath(String path) {
2369      this.path = path;
2370      return this;
2371    }
2372
2373    TimesOp setModificationTime(long mtime) {
2374      this.mtime = mtime;
2375      return this;
2376    }
2377
2378    TimesOp setAccessTime(long atime) {
2379      this.atime = atime;
2380      return this;
2381    }
2382
2383    @Override
2384    public 
2385    void writeFields(DataOutputStream out) throws IOException {
2386      FSImageSerialization.writeString(path, out);
2387      FSImageSerialization.writeLong(mtime, out);
2388      FSImageSerialization.writeLong(atime, out);
2389    }
2390
2391    @Override
2392    void readFields(DataInputStream in, int logVersion)
2393        throws IOException {
2394      if (!NameNodeLayoutVersion.supports(
2395          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
2396        this.length = in.readInt();
2397        if (length != 3) {
2398          throw new IOException("Incorrect data format. " + "times operation.");
2399        }
2400      }
2401      this.path = FSImageSerialization.readString(in);
2402
2403      if (NameNodeLayoutVersion.supports(
2404          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
2405        this.mtime = FSImageSerialization.readLong(in);
2406        this.atime = FSImageSerialization.readLong(in);
2407      } else {
2408        this.mtime = readLong(in);
2409        this.atime = readLong(in);
2410      }
2411    }
2412
2413    @Override
2414    public String toString() {
2415      StringBuilder builder = new StringBuilder();
2416      builder.append("TimesOp [length=");
2417      builder.append(length);
2418      builder.append(", path=");
2419      builder.append(path);
2420      builder.append(", mtime=");
2421      builder.append(mtime);
2422      builder.append(", atime=");
2423      builder.append(atime);
2424      builder.append(", opCode=");
2425      builder.append(opCode);
2426      builder.append(", txid=");
2427      builder.append(txid);
2428      builder.append("]");
2429      return builder.toString();
2430    }
2431    
2432    @Override
2433    protected void toXml(ContentHandler contentHandler) throws SAXException {
2434      XMLUtils.addSaxString(contentHandler, "LENGTH",
2435          Integer.toString(length));
2436      XMLUtils.addSaxString(contentHandler, "PATH", path);
2437      XMLUtils.addSaxString(contentHandler, "MTIME",
2438          Long.toString(mtime));
2439      XMLUtils.addSaxString(contentHandler, "ATIME",
2440          Long.toString(atime));
2441    }
2442    
2443    @Override void fromXml(Stanza st) throws InvalidXmlException {
2444      this.length = Integer.parseInt(st.getValue("LENGTH"));
2445      this.path = st.getValue("PATH");
2446      this.mtime = Long.parseLong(st.getValue("MTIME"));
2447      this.atime = Long.parseLong(st.getValue("ATIME"));
2448    }
2449  }
2450
2451  /** {@literal @AtMostOnce} for {@link ClientProtocol#createSymlink} */
2452  static class SymlinkOp extends FSEditLogOp {
2453    int length;
2454    long inodeId;
2455    String path;
2456    String value;
2457    long mtime;
2458    long atime;
2459    PermissionStatus permissionStatus;
2460
2461    SymlinkOp() {
2462      super(OP_SYMLINK);
2463    }
2464
2465    static SymlinkOp getInstance(OpInstanceCache cache) {
2466      return (SymlinkOp)cache.get(OP_SYMLINK);
2467    }
2468
2469    @Override
2470    void resetSubFields() {
2471      length = 0;
2472      inodeId = 0L;
2473      path = null;
2474      value = null;
2475      mtime = 0L;
2476      atime = 0L;
2477      permissionStatus = null;
2478    }
2479
2480    SymlinkOp setId(long inodeId) {
2481      this.inodeId = inodeId;
2482      return this;
2483    }
2484    
2485    SymlinkOp setPath(String path) {
2486      this.path = path;
2487      return this;
2488    }
2489
2490    SymlinkOp setValue(String value) {
2491      this.value = value;
2492      return this;
2493    }
2494
2495    SymlinkOp setModificationTime(long mtime) {
2496      this.mtime = mtime;
2497      return this;
2498    }
2499
2500    SymlinkOp setAccessTime(long atime) {
2501      this.atime = atime;
2502      return this;
2503    }
2504
2505    SymlinkOp setPermissionStatus(PermissionStatus permissionStatus) {
2506      this.permissionStatus = permissionStatus;
2507      return this;
2508    }
2509
2510    @Override
2511    public void writeFields(DataOutputStream out) throws IOException {
2512      FSImageSerialization.writeLong(inodeId, out);      
2513      FSImageSerialization.writeString(path, out);
2514      FSImageSerialization.writeString(value, out);
2515      FSImageSerialization.writeLong(mtime, out);
2516      FSImageSerialization.writeLong(atime, out);
2517      permissionStatus.write(out);
2518      writeRpcIds(rpcClientId, rpcCallId, out);
2519    }
2520
2521    @Override
2522    void readFields(DataInputStream in, int logVersion)
2523        throws IOException {
2524      if (!NameNodeLayoutVersion.supports(
2525          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
2526        this.length = in.readInt();
2527        if (this.length != 4) {
2528          throw new IOException("Incorrect data format. "
2529              + "symlink operation.");
2530        }
2531      }
2532      if (NameNodeLayoutVersion.supports(
2533          LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
2534        this.inodeId = FSImageSerialization.readLong(in);
2535      } else {
2536        // This id should be updated when the editLogOp is applied
2537        this.inodeId = HdfsConstants.GRANDFATHER_INODE_ID;
2538      }
2539      this.path = FSImageSerialization.readString(in);
2540      this.value = FSImageSerialization.readString(in);
2541
2542      if (NameNodeLayoutVersion.supports(
2543          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
2544        this.mtime = FSImageSerialization.readLong(in);
2545        this.atime = FSImageSerialization.readLong(in);
2546      } else {
2547        this.mtime = readLong(in);
2548        this.atime = readLong(in);
2549      }
2550      this.permissionStatus = PermissionStatus.read(in);
2551      
2552      // read RPC ids if necessary
2553      readRpcIds(in, logVersion);
2554    }
2555
2556    @Override
2557    public String toString() {
2558      StringBuilder builder = new StringBuilder();
2559      builder.append("SymlinkOp [length=");
2560      builder.append(length);
2561      builder.append(", inodeId=");
2562      builder.append(inodeId);
2563      builder.append(", path=");
2564      builder.append(path);
2565      builder.append(", value=");
2566      builder.append(value);
2567      builder.append(", mtime=");
2568      builder.append(mtime);
2569      builder.append(", atime=");
2570      builder.append(atime);
2571      builder.append(", permissionStatus=");
2572      builder.append(permissionStatus);
2573      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
2574      builder.append(", opCode=");
2575      builder.append(opCode);
2576      builder.append(", txid=");
2577      builder.append(txid);
2578      builder.append("]");
2579      return builder.toString();
2580    }
2581    
2582    @Override
2583    protected void toXml(ContentHandler contentHandler) throws SAXException {
2584      XMLUtils.addSaxString(contentHandler, "LENGTH",
2585          Integer.toString(length));
2586      XMLUtils.addSaxString(contentHandler, "INODEID",
2587          Long.toString(inodeId));
2588      XMLUtils.addSaxString(contentHandler, "PATH", path);
2589      XMLUtils.addSaxString(contentHandler, "VALUE", value);
2590      XMLUtils.addSaxString(contentHandler, "MTIME",
2591          Long.toString(mtime));
2592      XMLUtils.addSaxString(contentHandler, "ATIME",
2593          Long.toString(atime));
2594      FSEditLogOp.permissionStatusToXml(contentHandler, permissionStatus);
2595      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
2596    }
2597
2598    @Override 
2599    void fromXml(Stanza st) throws InvalidXmlException {
2600      this.length = Integer.parseInt(st.getValue("LENGTH"));
2601      this.inodeId = Long.parseLong(st.getValue("INODEID"));
2602      this.path = st.getValue("PATH");
2603      this.value = st.getValue("VALUE");
2604      this.mtime = Long.parseLong(st.getValue("MTIME"));
2605      this.atime = Long.parseLong(st.getValue("ATIME"));
2606      this.permissionStatus = permissionStatusFromXml(st);
2607      
2608      readRpcIdsFromXml(st);
2609    }
2610  }
2611
2612  /** {@literal @AtMostOnce} for {@link ClientProtocol#rename2} */
2613  static class RenameOp extends FSEditLogOp {
2614    int length;
2615    String src;
2616    String dst;
2617    long timestamp;
2618    Rename[] options;
2619
2620    RenameOp() {
2621      super(OP_RENAME);
2622    }
2623
2624    static RenameOp getInstance(OpInstanceCache cache) {
2625      return (RenameOp)cache.get(OP_RENAME);
2626    }
2627
2628    @Override
2629    void resetSubFields() {
2630      length = 0;
2631      src = null;
2632      dst = null;
2633      timestamp = 0L;
2634      options = null;
2635    }
2636
2637    RenameOp setSource(String src) {
2638      this.src = src;
2639      return this;
2640    }
2641
2642    RenameOp setDestination(String dst) {
2643      this.dst = dst;
2644      return this;
2645    }
2646    
2647    RenameOp setTimestamp(long timestamp) {
2648      this.timestamp = timestamp;
2649      return this;
2650    }
2651    
2652    RenameOp setOptions(Rename[] options) {
2653      this.options = options;
2654      return this;
2655    }
2656
2657    @Override
2658    public 
2659    void writeFields(DataOutputStream out) throws IOException {
2660      FSImageSerialization.writeString(src, out);
2661      FSImageSerialization.writeString(dst, out);
2662      FSImageSerialization.writeLong(timestamp, out);
2663      toBytesWritable(options).write(out);
2664      writeRpcIds(rpcClientId, rpcCallId, out);
2665    }
2666
2667    @Override
2668    void readFields(DataInputStream in, int logVersion)
2669        throws IOException {
2670      if (!NameNodeLayoutVersion.supports(
2671          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
2672        this.length = in.readInt();
2673        if (this.length != 3) {
2674          throw new IOException("Incorrect data format. " + "Rename operation.");
2675        }
2676      }
2677      this.src = FSImageSerialization.readString(in);
2678      this.dst = FSImageSerialization.readString(in);
2679
2680      if (NameNodeLayoutVersion.supports(
2681          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
2682        this.timestamp = FSImageSerialization.readLong(in);
2683      } else {
2684        this.timestamp = readLong(in);
2685      }
2686      this.options = readRenameOptions(in);
2687      
2688      // read RPC ids if necessary
2689      readRpcIds(in, logVersion);
2690    }
2691
2692    private static Rename[] readRenameOptions(DataInputStream in) throws IOException {
2693      BytesWritable writable = new BytesWritable();
2694      writable.readFields(in);
2695
2696      byte[] bytes = writable.getBytes();
2697      int len = writable.getLength();
2698      Rename[] options = new Rename[len];
2699
2700      for (int i = 0; i < len; i++) {
2701        options[i] = Rename.valueOf(bytes[i]);
2702      }
2703      return options;
2704    }
2705
2706    static BytesWritable toBytesWritable(Rename... options) {
2707      byte[] bytes = new byte[options.length];
2708      for (int i = 0; i < options.length; i++) {
2709        bytes[i] = options[i].value();
2710      }
2711      return new BytesWritable(bytes);
2712    }
2713
2714    @Override
2715    public String toString() {
2716      StringBuilder builder = new StringBuilder();
2717      builder.append("RenameOp [length=");
2718      builder.append(length);
2719      builder.append(", src=");
2720      builder.append(src);
2721      builder.append(", dst=");
2722      builder.append(dst);
2723      builder.append(", timestamp=");
2724      builder.append(timestamp);
2725      builder.append(", options=");
2726      builder.append(Arrays.toString(options));
2727      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
2728      builder.append(", opCode=");
2729      builder.append(opCode);
2730      builder.append(", txid=");
2731      builder.append(txid);
2732      builder.append("]");
2733      return builder.toString();
2734    }
2735    
2736    @Override
2737    protected void toXml(ContentHandler contentHandler) throws SAXException {
2738      XMLUtils.addSaxString(contentHandler, "LENGTH",
2739          Integer.toString(length));
2740      XMLUtils.addSaxString(contentHandler, "SRC", src);
2741      XMLUtils.addSaxString(contentHandler, "DST", dst);
2742      XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
2743          Long.toString(timestamp));
2744      StringBuilder bld = new StringBuilder();
2745      String prefix = "";
2746      for (Rename r : options) {
2747        bld.append(prefix).append(r.toString());
2748        prefix = "|";
2749      }
2750      XMLUtils.addSaxString(contentHandler, "OPTIONS", bld.toString());
2751      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
2752    }
2753    
2754    @Override void fromXml(Stanza st) throws InvalidXmlException {
2755      this.length = Integer.parseInt(st.getValue("LENGTH"));
2756      this.src = st.getValue("SRC");
2757      this.dst = st.getValue("DST");
2758      this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
2759      String opts = st.getValue("OPTIONS");
2760      String o[] = opts.split("\\|");
2761      this.options = new Rename[o.length];
2762      for (int i = 0; i < o.length; i++) {
2763        if (o[i].equals(""))
2764          continue;
2765        try {
2766          this.options[i] = Rename.valueOf(o[i]);
2767        } finally {
2768          if (this.options[i] == null) {
2769            System.err.println("error parsing Rename value: \"" + o[i] + "\"");
2770          }
2771        }
2772      }
2773      readRpcIdsFromXml(st);
2774    }
2775  }
2776
2777  static class TruncateOp extends FSEditLogOp {
2778    String src;
2779    String clientName;
2780    String clientMachine;
2781    long newLength;
2782    long timestamp;
2783    Block truncateBlock;
2784
2785    TruncateOp() {
2786      super(OP_TRUNCATE);
2787    }
2788
2789    static TruncateOp getInstance(OpInstanceCache cache) {
2790      return (TruncateOp)cache.get(OP_TRUNCATE);
2791    }
2792
2793    @Override
2794    void resetSubFields() {
2795      src = null;
2796      clientName = null;
2797      clientMachine = null;
2798      newLength = 0L;
2799      timestamp = 0L;
2800    }
2801
2802    TruncateOp setPath(String src) {
2803      this.src = src;
2804      return this;
2805    }
2806
2807    TruncateOp setClientName(String clientName) {
2808      this.clientName = clientName;
2809      return this;
2810    }
2811
2812    TruncateOp setClientMachine(String clientMachine) {
2813      this.clientMachine = clientMachine;
2814      return this;
2815    }
2816
2817    TruncateOp setNewLength(long newLength) {
2818      this.newLength = newLength;
2819      return this;
2820    }
2821
2822    TruncateOp setTimestamp(long timestamp) {
2823      this.timestamp = timestamp;
2824      return this;
2825    }
2826
2827    TruncateOp setTruncateBlock(Block truncateBlock) {
2828      this.truncateBlock = truncateBlock;
2829      return this;
2830    }
2831
2832    @Override
2833    void readFields(DataInputStream in, int logVersion) throws IOException {
2834      src = FSImageSerialization.readString(in);
2835      clientName = FSImageSerialization.readString(in);
2836      clientMachine = FSImageSerialization.readString(in);
2837      newLength = FSImageSerialization.readLong(in);
2838      timestamp = FSImageSerialization.readLong(in);
2839      Block[] blocks =
2840          FSImageSerialization.readCompactBlockArray(in, logVersion);
2841      assert blocks.length <= 1 : "Truncate op should have 1 or 0 blocks";
2842      truncateBlock = (blocks.length == 0) ? null : blocks[0];
2843    }
2844
2845    @Override
2846    public void writeFields(DataOutputStream out) throws IOException {
2847      FSImageSerialization.writeString(src, out);
2848      FSImageSerialization.writeString(clientName, out);
2849      FSImageSerialization.writeString(clientMachine, out);
2850      FSImageSerialization.writeLong(newLength, out);
2851      FSImageSerialization.writeLong(timestamp, out);
2852      int size = truncateBlock != null ? 1 : 0;
2853      Block[] blocks = new Block[size];
2854      if (truncateBlock != null) {
2855        blocks[0] = truncateBlock;
2856      }
2857      FSImageSerialization.writeCompactBlockArray(blocks, out);
2858    }
2859
2860    @Override
2861    protected void toXml(ContentHandler contentHandler) throws SAXException {
2862      XMLUtils.addSaxString(contentHandler, "SRC", src);
2863      XMLUtils.addSaxString(contentHandler, "CLIENTNAME", clientName);
2864      XMLUtils.addSaxString(contentHandler, "CLIENTMACHINE", clientMachine);
2865      XMLUtils.addSaxString(contentHandler, "NEWLENGTH",
2866          Long.toString(newLength));
2867      XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
2868          Long.toString(timestamp));
2869      if(truncateBlock != null)
2870        FSEditLogOp.blockToXml(contentHandler, truncateBlock);
2871    }
2872
2873    @Override
2874    void fromXml(Stanza st) throws InvalidXmlException {
2875      this.src = st.getValue("SRC");
2876      this.clientName = st.getValue("CLIENTNAME");
2877      this.clientMachine = st.getValue("CLIENTMACHINE");
2878      this.newLength = Long.parseLong(st.getValue("NEWLENGTH"));
2879      this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
2880      if (st.hasChildren("BLOCK"))
2881        this.truncateBlock = FSEditLogOp.blockFromXml(st);
2882    }
2883
2884    @Override
2885    public String toString() {
2886      StringBuilder builder = new StringBuilder();
2887      builder.append("TruncateOp [src=");
2888      builder.append(src);
2889      builder.append(", clientName=");
2890      builder.append(clientName);
2891      builder.append(", clientMachine=");
2892      builder.append(clientMachine);
2893      builder.append(", newLength=");
2894      builder.append(newLength);
2895      builder.append(", timestamp=");
2896      builder.append(timestamp);
2897      builder.append(", truncateBlock=");
2898      builder.append(truncateBlock);
2899      builder.append(", opCode=");
2900      builder.append(opCode);
2901      builder.append(", txid=");
2902      builder.append(txid);
2903      builder.append("]");
2904      return builder.toString();
2905    }
2906  }
2907 
2908  /**
2909   * {@literal @Idempotent} for {@link ClientProtocol#recoverLease}. In the
2910   * meanwhile, startFile and appendFile both have their own corresponding
2911   * editlog op.
2912   */
2913  static class ReassignLeaseOp extends FSEditLogOp {
2914    String leaseHolder;
2915    String path;
2916    String newHolder;
2917
2918    ReassignLeaseOp() {
2919      super(OP_REASSIGN_LEASE);
2920    }
2921
2922    static ReassignLeaseOp getInstance(OpInstanceCache cache) {
2923      return (ReassignLeaseOp)cache.get(OP_REASSIGN_LEASE);
2924    }
2925
2926    @Override
2927    void resetSubFields() {
2928      leaseHolder = null;
2929      path = null;
2930      newHolder = null;
2931    }
2932
2933    ReassignLeaseOp setLeaseHolder(String leaseHolder) {
2934      this.leaseHolder = leaseHolder;
2935      return this;
2936    }
2937
2938    ReassignLeaseOp setPath(String path) {
2939      this.path = path;
2940      return this;
2941    }
2942
2943    ReassignLeaseOp setNewHolder(String newHolder) {
2944      this.newHolder = newHolder;
2945      return this;
2946    }
2947
2948    @Override
2949    public 
2950    void writeFields(DataOutputStream out) throws IOException {
2951      FSImageSerialization.writeString(leaseHolder, out);
2952      FSImageSerialization.writeString(path, out);
2953      FSImageSerialization.writeString(newHolder, out);
2954    }
2955
2956    @Override
2957    void readFields(DataInputStream in, int logVersion)
2958        throws IOException {
2959      this.leaseHolder = FSImageSerialization.readString(in);
2960      this.path = FSImageSerialization.readString(in);
2961      this.newHolder = FSImageSerialization.readString(in);
2962    }
2963
2964    @Override
2965    public String toString() {
2966      StringBuilder builder = new StringBuilder();
2967      builder.append("ReassignLeaseOp [leaseHolder=");
2968      builder.append(leaseHolder);
2969      builder.append(", path=");
2970      builder.append(path);
2971      builder.append(", newHolder=");
2972      builder.append(newHolder);
2973      builder.append(", opCode=");
2974      builder.append(opCode);
2975      builder.append(", txid=");
2976      builder.append(txid);
2977      builder.append("]");
2978      return builder.toString();
2979    }
2980    
2981    @Override
2982    protected void toXml(ContentHandler contentHandler) throws SAXException {
2983      XMLUtils.addSaxString(contentHandler, "LEASEHOLDER", leaseHolder);
2984      XMLUtils.addSaxString(contentHandler, "PATH", path);
2985      XMLUtils.addSaxString(contentHandler, "NEWHOLDER", newHolder);
2986    }
2987    
2988    @Override void fromXml(Stanza st) throws InvalidXmlException {
2989      this.leaseHolder = st.getValue("LEASEHOLDER");
2990      this.path = st.getValue("PATH");
2991      this.newHolder = st.getValue("NEWHOLDER");
2992    }
2993  }
2994
2995  /** {@literal @Idempotent} for {@link ClientProtocol#getDelegationToken} */
2996  static class GetDelegationTokenOp extends FSEditLogOp {
2997    DelegationTokenIdentifier token;
2998    long expiryTime;
2999
3000    GetDelegationTokenOp() {
3001      super(OP_GET_DELEGATION_TOKEN);
3002    }
3003
3004    static GetDelegationTokenOp getInstance(OpInstanceCache cache) {
3005      return (GetDelegationTokenOp)cache.get(OP_GET_DELEGATION_TOKEN);
3006    }
3007
3008    @Override
3009    void resetSubFields() {
3010      token = null;
3011      expiryTime = 0L;
3012    }
3013
3014    GetDelegationTokenOp setDelegationTokenIdentifier(
3015        DelegationTokenIdentifier token) {
3016      this.token = token;
3017      return this;
3018    }
3019
3020    GetDelegationTokenOp setExpiryTime(long expiryTime) {
3021      this.expiryTime = expiryTime;
3022      return this;
3023    }
3024
3025    @Override
3026    public 
3027    void writeFields(DataOutputStream out) throws IOException {
3028      token.write(out);
3029      FSImageSerialization.writeLong(expiryTime, out);
3030    }
3031
3032    @Override
3033    void readFields(DataInputStream in, int logVersion)
3034        throws IOException {
3035      this.token = new DelegationTokenIdentifier();
3036      this.token.readFields(in);
3037      if (NameNodeLayoutVersion.supports(
3038          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
3039        this.expiryTime = FSImageSerialization.readLong(in);
3040      } else {
3041        this.expiryTime = readLong(in);
3042      }
3043    }
3044
3045    @Override
3046    public String toString() {
3047      StringBuilder builder = new StringBuilder();
3048      builder.append("GetDelegationTokenOp [token=");
3049      builder.append(token);
3050      builder.append(", expiryTime=");
3051      builder.append(expiryTime);
3052      builder.append(", opCode=");
3053      builder.append(opCode);
3054      builder.append(", txid=");
3055      builder.append(txid);
3056      builder.append("]");
3057      return builder.toString();
3058    }
3059    
3060    @Override
3061    protected void toXml(ContentHandler contentHandler) throws SAXException {
3062      FSEditLogOp.delegationTokenToXml(contentHandler, token);
3063      XMLUtils.addSaxString(contentHandler, "EXPIRY_TIME",
3064          Long.toString(expiryTime));
3065    }
3066    
3067    @Override void fromXml(Stanza st) throws InvalidXmlException {
3068      this.token = delegationTokenFromXml(st.getChildren(
3069          "DELEGATION_TOKEN_IDENTIFIER").get(0));
3070      this.expiryTime = Long.parseLong(st.getValue("EXPIRY_TIME"));
3071    }
3072  }
3073
3074  /** {@literal @Idempotent} for {@link ClientProtocol#renewDelegationToken} */
3075  static class RenewDelegationTokenOp extends FSEditLogOp {
3076    DelegationTokenIdentifier token;
3077    long expiryTime;
3078
3079    RenewDelegationTokenOp() {
3080      super(OP_RENEW_DELEGATION_TOKEN);
3081    }
3082
3083    static RenewDelegationTokenOp getInstance(OpInstanceCache cache) {
3084      return (RenewDelegationTokenOp)cache.get(OP_RENEW_DELEGATION_TOKEN);
3085    }
3086
3087    @Override
3088    void resetSubFields() {
3089      token = null;
3090      expiryTime = 0L;
3091    }
3092
3093    RenewDelegationTokenOp setDelegationTokenIdentifier(
3094        DelegationTokenIdentifier token) {
3095      this.token = token;
3096      return this;
3097    }
3098
3099    RenewDelegationTokenOp setExpiryTime(long expiryTime) {
3100      this.expiryTime = expiryTime;
3101      return this;
3102    }
3103
3104    @Override
3105    public 
3106    void writeFields(DataOutputStream out) throws IOException {
3107      token.write(out);
3108      FSImageSerialization.writeLong(expiryTime, out);
3109    }
3110
3111    @Override
3112    void readFields(DataInputStream in, int logVersion)
3113        throws IOException {
3114      this.token = new DelegationTokenIdentifier();
3115      this.token.readFields(in);
3116      if (NameNodeLayoutVersion.supports(
3117          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
3118        this.expiryTime = FSImageSerialization.readLong(in);
3119      } else {
3120        this.expiryTime = readLong(in);
3121      }
3122    }
3123
3124    @Override
3125    public String toString() {
3126      StringBuilder builder = new StringBuilder();
3127      builder.append("RenewDelegationTokenOp [token=");
3128      builder.append(token);
3129      builder.append(", expiryTime=");
3130      builder.append(expiryTime);
3131      builder.append(", opCode=");
3132      builder.append(opCode);
3133      builder.append(", txid=");
3134      builder.append(txid);
3135      builder.append("]");
3136      return builder.toString();
3137    }
3138    
3139    @Override
3140    protected void toXml(ContentHandler contentHandler) throws SAXException {
3141      FSEditLogOp.delegationTokenToXml(contentHandler, token);
3142      XMLUtils.addSaxString(contentHandler, "EXPIRY_TIME",
3143          Long.toString(expiryTime));
3144    }
3145    
3146    @Override void fromXml(Stanza st) throws InvalidXmlException {
3147      this.token = delegationTokenFromXml(st.getChildren(
3148          "DELEGATION_TOKEN_IDENTIFIER").get(0));
3149      this.expiryTime = Long.parseLong(st.getValue("EXPIRY_TIME"));
3150    }
3151  }
3152
3153  /** {@literal @Idempotent} for {@link ClientProtocol#cancelDelegationToken} */
3154  static class CancelDelegationTokenOp extends FSEditLogOp {
3155    DelegationTokenIdentifier token;
3156
3157    CancelDelegationTokenOp() {
3158      super(OP_CANCEL_DELEGATION_TOKEN);
3159    }
3160
3161    static CancelDelegationTokenOp getInstance(OpInstanceCache cache) {
3162      return (CancelDelegationTokenOp)cache.get(OP_CANCEL_DELEGATION_TOKEN);
3163    }
3164
3165    @Override
3166    void resetSubFields() {
3167      token = null;
3168    }
3169
3170    CancelDelegationTokenOp setDelegationTokenIdentifier(
3171        DelegationTokenIdentifier token) {
3172      this.token = token;
3173      return this;
3174    }
3175
3176    @Override
3177    public 
3178    void writeFields(DataOutputStream out) throws IOException {
3179      token.write(out);
3180    }
3181
3182    @Override
3183    void readFields(DataInputStream in, int logVersion)
3184        throws IOException {
3185      this.token = new DelegationTokenIdentifier();
3186      this.token.readFields(in);
3187    }
3188
3189    @Override
3190    public String toString() {
3191      StringBuilder builder = new StringBuilder();
3192      builder.append("CancelDelegationTokenOp [token=");
3193      builder.append(token);
3194      builder.append(", opCode=");
3195      builder.append(opCode);
3196      builder.append(", txid=");
3197      builder.append(txid);
3198      builder.append("]");
3199      return builder.toString();
3200    }
3201    
3202    @Override
3203    protected void toXml(ContentHandler contentHandler) throws SAXException {
3204      FSEditLogOp.delegationTokenToXml(contentHandler, token);
3205    }
3206    
3207    @Override void fromXml(Stanza st) throws InvalidXmlException {
3208      this.token = delegationTokenFromXml(st.getChildren(
3209          "DELEGATION_TOKEN_IDENTIFIER").get(0));
3210    }
3211  }
3212
3213  static class UpdateMasterKeyOp extends FSEditLogOp {
3214    DelegationKey key;
3215
3216    UpdateMasterKeyOp() {
3217      super(OP_UPDATE_MASTER_KEY);
3218    }
3219
3220    static UpdateMasterKeyOp getInstance(OpInstanceCache cache) {
3221      return (UpdateMasterKeyOp)cache.get(OP_UPDATE_MASTER_KEY);
3222    }
3223
3224    @Override
3225    void resetSubFields() {
3226      key = null;
3227    }
3228
3229    UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
3230      this.key = key;
3231      return this;
3232    }
3233    
3234    @Override
3235    public 
3236    void writeFields(DataOutputStream out) throws IOException {
3237      key.write(out);
3238    }
3239
3240    @Override
3241    void readFields(DataInputStream in, int logVersion)
3242        throws IOException {
3243      this.key = new DelegationKey();
3244      this.key.readFields(in);
3245    }
3246
3247    @Override
3248    public String toString() {
3249      StringBuilder builder = new StringBuilder();
3250      builder.append("UpdateMasterKeyOp [key=");
3251      builder.append(key);
3252      builder.append(", opCode=");
3253      builder.append(opCode);
3254      builder.append(", txid=");
3255      builder.append(txid);
3256      builder.append("]");
3257      return builder.toString();
3258    }
3259    
3260    @Override
3261    protected void toXml(ContentHandler contentHandler) throws SAXException {
3262      FSEditLogOp.delegationKeyToXml(contentHandler, key);
3263    }
3264    
3265    @Override void fromXml(Stanza st) throws InvalidXmlException {
3266      this.key = delegationKeyFromXml(st.getChildren(
3267          "DELEGATION_KEY").get(0));
3268    }
3269  }
3270  
3271  static class LogSegmentOp extends FSEditLogOp {
3272    private LogSegmentOp(FSEditLogOpCodes code) {
3273      super(code);
3274      assert code == OP_START_LOG_SEGMENT ||
3275             code == OP_END_LOG_SEGMENT : "Bad op: " + code;
3276    }
3277
3278    static LogSegmentOp getInstance(OpInstanceCache cache,
3279        FSEditLogOpCodes code) {
3280      return (LogSegmentOp)cache.get(code);
3281    }
3282
3283    @Override
3284    void resetSubFields() {
3285      // no data stored in these ops yet
3286    }
3287
3288    @Override
3289    public void readFields(DataInputStream in, int logVersion)
3290        throws IOException {
3291      // no data stored in these ops yet
3292    }
3293
3294    @Override
3295    public
3296    void writeFields(DataOutputStream out) throws IOException {
3297      // no data stored
3298    }
3299
3300    @Override
3301    public String toString() {
3302      StringBuilder builder = new StringBuilder();
3303      builder.append("LogSegmentOp [opCode=");
3304      builder.append(opCode);
3305      builder.append(", txid=");
3306      builder.append(txid);
3307      builder.append("]");
3308      return builder.toString();
3309    }
3310
3311    @Override
3312    protected void toXml(ContentHandler contentHandler) throws SAXException {
3313      // no data stored
3314    }
3315    
3316    @Override void fromXml(Stanza st) throws InvalidXmlException {
3317      // do nothing
3318    }
3319  }
3320
3321  static class StartLogSegmentOp extends LogSegmentOp {
3322    StartLogSegmentOp() {
3323      super(OP_START_LOG_SEGMENT);
3324    }
3325  }
3326
3327  static class EndLogSegmentOp extends LogSegmentOp {
3328    EndLogSegmentOp() {
3329      super(OP_END_LOG_SEGMENT);
3330    }
3331  }
3332
3333  static class InvalidOp extends FSEditLogOp {
3334    InvalidOp() {
3335      super(OP_INVALID);
3336    }
3337
3338    static InvalidOp getInstance(OpInstanceCache cache) {
3339      return (InvalidOp)cache.get(OP_INVALID);
3340    }
3341
3342    @Override
3343    void resetSubFields() {
3344    }
3345
3346    @Override
3347    public 
3348    void writeFields(DataOutputStream out) throws IOException {
3349    }
3350    
3351    @Override
3352    void readFields(DataInputStream in, int logVersion)
3353        throws IOException {
3354      // nothing to read
3355    }
3356
3357    @Override
3358    public String toString() {
3359      StringBuilder builder = new StringBuilder();
3360      builder.append("InvalidOp [opCode=");
3361      builder.append(opCode);
3362      builder.append(", txid=");
3363      builder.append(txid);
3364      builder.append("]");
3365      return builder.toString();
3366    }
3367    @Override
3368    protected void toXml(ContentHandler contentHandler) throws SAXException {
3369      // no data stored
3370    }
3371    
3372    @Override void fromXml(Stanza st) throws InvalidXmlException {
3373      // do nothing
3374    }
3375  }
3376
3377  /**
3378   * Operation corresponding to creating a snapshot.
3379   * {@literal @AtMostOnce} for {@link ClientProtocol#createSnapshot}.
3380   */
3381  static class CreateSnapshotOp extends FSEditLogOp {
3382    String snapshotRoot;
3383    String snapshotName;
3384    
3385    public CreateSnapshotOp() {
3386      super(OP_CREATE_SNAPSHOT);
3387    }
3388    
3389    static CreateSnapshotOp getInstance(OpInstanceCache cache) {
3390      return (CreateSnapshotOp)cache.get(OP_CREATE_SNAPSHOT);
3391    }
3392
3393    @Override
3394    void resetSubFields() {
3395      snapshotRoot = null;
3396      snapshotName = null;
3397    }
3398
3399    CreateSnapshotOp setSnapshotName(String snapName) {
3400      this.snapshotName = snapName;
3401      return this;
3402    }
3403
3404    public CreateSnapshotOp setSnapshotRoot(String snapRoot) {
3405      snapshotRoot = snapRoot;
3406      return this;
3407    }
3408    
3409    @Override
3410    void readFields(DataInputStream in, int logVersion) throws IOException {
3411      snapshotRoot = FSImageSerialization.readString(in);
3412      snapshotName = FSImageSerialization.readString(in);
3413      
3414      // read RPC ids if necessary
3415      readRpcIds(in, logVersion);
3416    }
3417
3418    @Override
3419    public void writeFields(DataOutputStream out) throws IOException {
3420      FSImageSerialization.writeString(snapshotRoot, out);
3421      FSImageSerialization.writeString(snapshotName, out);
3422      writeRpcIds(rpcClientId, rpcCallId, out);
3423    }
3424
3425    @Override
3426    protected void toXml(ContentHandler contentHandler) throws SAXException {
3427      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
3428      XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
3429      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
3430    }
3431
3432    @Override
3433    void fromXml(Stanza st) throws InvalidXmlException {
3434      snapshotRoot = st.getValue("SNAPSHOTROOT");
3435      snapshotName = st.getValue("SNAPSHOTNAME");
3436      
3437      readRpcIdsFromXml(st);
3438    }
3439    
3440    @Override
3441    public String toString() {
3442      StringBuilder builder = new StringBuilder();
3443      builder.append("CreateSnapshotOp [snapshotRoot=");
3444      builder.append(snapshotRoot);
3445      builder.append(", snapshotName=");
3446      builder.append(snapshotName);
3447      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
3448      builder.append("]");
3449      return builder.toString();
3450    }
3451  }
3452  
3453  /**
3454   * Operation corresponding to delete a snapshot.
3455   * {@literal @AtMostOnce} for {@link ClientProtocol#deleteSnapshot}.
3456   */
3457  static class DeleteSnapshotOp extends FSEditLogOp {
3458    String snapshotRoot;
3459    String snapshotName;
3460    
3461    DeleteSnapshotOp() {
3462      super(OP_DELETE_SNAPSHOT);
3463    }
3464    
3465    static DeleteSnapshotOp getInstance(OpInstanceCache cache) {
3466      return (DeleteSnapshotOp)cache.get(OP_DELETE_SNAPSHOT);
3467    }
3468
3469    @Override
3470    void resetSubFields() {
3471      snapshotRoot = null;
3472      snapshotName = null;
3473    }
3474    
3475    DeleteSnapshotOp setSnapshotName(String snapName) {
3476      this.snapshotName = snapName;
3477      return this;
3478    }
3479
3480    DeleteSnapshotOp setSnapshotRoot(String snapRoot) {
3481      snapshotRoot = snapRoot;
3482      return this;
3483    }
3484    
3485    @Override
3486    void readFields(DataInputStream in, int logVersion) throws IOException {
3487      snapshotRoot = FSImageSerialization.readString(in);
3488      snapshotName = FSImageSerialization.readString(in);
3489      
3490      // read RPC ids if necessary
3491      readRpcIds(in, logVersion);
3492    }
3493
3494    @Override
3495    public void writeFields(DataOutputStream out) throws IOException {
3496      FSImageSerialization.writeString(snapshotRoot, out);
3497      FSImageSerialization.writeString(snapshotName, out);
3498      writeRpcIds(rpcClientId, rpcCallId, out);
3499    }
3500
3501    @Override
3502    protected void toXml(ContentHandler contentHandler) throws SAXException {
3503      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
3504      XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
3505      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
3506    }
3507
3508    @Override
3509    void fromXml(Stanza st) throws InvalidXmlException {
3510      snapshotRoot = st.getValue("SNAPSHOTROOT");
3511      snapshotName = st.getValue("SNAPSHOTNAME");
3512      
3513      readRpcIdsFromXml(st);
3514    }
3515    
3516    @Override
3517    public String toString() {
3518      StringBuilder builder = new StringBuilder();
3519      builder.append("DeleteSnapshotOp [snapshotRoot=");
3520      builder.append(snapshotRoot);
3521      builder.append(", snapshotName=");
3522      builder.append(snapshotName);
3523      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
3524      builder.append("]");
3525      return builder.toString();
3526    }
3527  }
3528  
3529  /**
3530   * Operation corresponding to rename a snapshot.
3531   * {@literal @AtMostOnce} for {@link ClientProtocol#renameSnapshot}.
3532   */
3533  static class RenameSnapshotOp extends FSEditLogOp {
3534    String snapshotRoot;
3535    String snapshotOldName;
3536    String snapshotNewName;
3537    
3538    RenameSnapshotOp() {
3539      super(OP_RENAME_SNAPSHOT);
3540    }
3541    
3542    static RenameSnapshotOp getInstance(OpInstanceCache cache) {
3543      return (RenameSnapshotOp) cache.get(OP_RENAME_SNAPSHOT);
3544    }
3545
3546    @Override
3547    void resetSubFields() {
3548      snapshotRoot = null;
3549      snapshotOldName = null;
3550      snapshotNewName = null;
3551    }
3552    
3553    RenameSnapshotOp setSnapshotOldName(String snapshotOldName) {
3554      this.snapshotOldName = snapshotOldName;
3555      return this;
3556    }
3557
3558    RenameSnapshotOp setSnapshotNewName(String snapshotNewName) {
3559      this.snapshotNewName = snapshotNewName;
3560      return this;
3561    }
3562    
3563    RenameSnapshotOp setSnapshotRoot(String snapshotRoot) {
3564      this.snapshotRoot = snapshotRoot;
3565      return this;
3566    }
3567    
3568    @Override
3569    void readFields(DataInputStream in, int logVersion) throws IOException {
3570      snapshotRoot = FSImageSerialization.readString(in);
3571      snapshotOldName = FSImageSerialization.readString(in);
3572      snapshotNewName = FSImageSerialization.readString(in);
3573      
3574      // read RPC ids if necessary
3575      readRpcIds(in, logVersion);
3576    }
3577
3578    @Override
3579    public void writeFields(DataOutputStream out) throws IOException {
3580      FSImageSerialization.writeString(snapshotRoot, out);
3581      FSImageSerialization.writeString(snapshotOldName, out);
3582      FSImageSerialization.writeString(snapshotNewName, out);
3583      
3584      writeRpcIds(rpcClientId, rpcCallId, out);
3585    }
3586
3587    @Override
3588    protected void toXml(ContentHandler contentHandler) throws SAXException {
3589      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
3590      XMLUtils.addSaxString(contentHandler, "SNAPSHOTOLDNAME", snapshotOldName);
3591      XMLUtils.addSaxString(contentHandler, "SNAPSHOTNEWNAME", snapshotNewName);
3592      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
3593    }
3594
3595    @Override
3596    void fromXml(Stanza st) throws InvalidXmlException {
3597      snapshotRoot = st.getValue("SNAPSHOTROOT");
3598      snapshotOldName = st.getValue("SNAPSHOTOLDNAME");
3599      snapshotNewName = st.getValue("SNAPSHOTNEWNAME");
3600      
3601      readRpcIdsFromXml(st);
3602    }
3603    
3604    @Override
3605    public String toString() {
3606      StringBuilder builder = new StringBuilder();
3607      builder.append("RenameSnapshotOp [snapshotRoot=");
3608      builder.append(snapshotRoot);
3609      builder.append(", snapshotOldName=");
3610      builder.append(snapshotOldName);
3611      builder.append(", snapshotNewName=");
3612      builder.append(snapshotNewName);
3613      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
3614      builder.append("]");
3615      return builder.toString();
3616    }
3617  }
3618
3619  /**
3620   * Operation corresponding to allow creating snapshot on a directory
3621   */
3622  static class AllowSnapshotOp extends FSEditLogOp { // @Idempotent
3623    String snapshotRoot;
3624
3625    public AllowSnapshotOp() {
3626      super(OP_ALLOW_SNAPSHOT);
3627    }
3628
3629    public AllowSnapshotOp(String snapRoot) {
3630      super(OP_ALLOW_SNAPSHOT);
3631      snapshotRoot = snapRoot;
3632    }
3633
3634    static AllowSnapshotOp getInstance(OpInstanceCache cache) {
3635      return (AllowSnapshotOp) cache.get(OP_ALLOW_SNAPSHOT);
3636    }
3637
3638    @Override
3639    void resetSubFields() {
3640      snapshotRoot = null;
3641    }
3642
3643    public AllowSnapshotOp setSnapshotRoot(String snapRoot) {
3644      snapshotRoot = snapRoot;
3645      return this;
3646    }
3647
3648    @Override
3649    void readFields(DataInputStream in, int logVersion) throws IOException {
3650      snapshotRoot = FSImageSerialization.readString(in);
3651    }
3652
3653    @Override
3654    public void writeFields(DataOutputStream out) throws IOException {
3655      FSImageSerialization.writeString(snapshotRoot, out);
3656    }
3657
3658    @Override
3659    protected void toXml(ContentHandler contentHandler) throws SAXException {
3660      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
3661    }
3662
3663    @Override
3664    void fromXml(Stanza st) throws InvalidXmlException {
3665      snapshotRoot = st.getValue("SNAPSHOTROOT");
3666    }
3667
3668    @Override
3669    public String toString() {
3670      StringBuilder builder = new StringBuilder();
3671      builder.append("AllowSnapshotOp [snapshotRoot=");
3672      builder.append(snapshotRoot);
3673      builder.append("]");
3674      return builder.toString();
3675    }
3676  }
3677
3678  /**
3679   * Operation corresponding to disallow creating snapshot on a directory
3680   */
3681  static class DisallowSnapshotOp extends FSEditLogOp { // @Idempotent
3682    String snapshotRoot;
3683
3684    public DisallowSnapshotOp() {
3685      super(OP_DISALLOW_SNAPSHOT);
3686    }
3687
3688    public DisallowSnapshotOp(String snapRoot) {
3689      super(OP_DISALLOW_SNAPSHOT);
3690      snapshotRoot = snapRoot;
3691    }
3692
3693    static DisallowSnapshotOp getInstance(OpInstanceCache cache) {
3694      return (DisallowSnapshotOp) cache.get(OP_DISALLOW_SNAPSHOT);
3695    }
3696
3697    void resetSubFields() {
3698      snapshotRoot = null;
3699    }
3700
3701    public DisallowSnapshotOp setSnapshotRoot(String snapRoot) {
3702      snapshotRoot = snapRoot;
3703      return this;
3704    }
3705
3706    @Override
3707    void readFields(DataInputStream in, int logVersion) throws IOException {
3708      snapshotRoot = FSImageSerialization.readString(in);
3709    }
3710
3711    @Override
3712    public void writeFields(DataOutputStream out) throws IOException {
3713      FSImageSerialization.writeString(snapshotRoot, out);
3714    }
3715
3716    @Override
3717    protected void toXml(ContentHandler contentHandler) throws SAXException {
3718      XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
3719    }
3720
3721    @Override
3722    void fromXml(Stanza st) throws InvalidXmlException {
3723      snapshotRoot = st.getValue("SNAPSHOTROOT");
3724    }
3725
3726    @Override
3727    public String toString() {
3728      StringBuilder builder = new StringBuilder();
3729      builder.append("DisallowSnapshotOp [snapshotRoot=");
3730      builder.append(snapshotRoot);
3731      builder.append("]");
3732      return builder.toString();
3733    }
3734  }
3735
3736  /**
3737   * {@literal @AtMostOnce} for
3738   * {@link ClientProtocol#addCacheDirective}
3739   */
3740  static class AddCacheDirectiveInfoOp extends FSEditLogOp {
3741    CacheDirectiveInfo directive;
3742
3743    public AddCacheDirectiveInfoOp() {
3744      super(OP_ADD_CACHE_DIRECTIVE);
3745    }
3746
3747    static AddCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
3748      return (AddCacheDirectiveInfoOp) cache
3749          .get(OP_ADD_CACHE_DIRECTIVE);
3750    }
3751
3752    @Override
3753    void resetSubFields() {
3754      directive = null;
3755    }
3756
3757    public AddCacheDirectiveInfoOp setDirective(
3758        CacheDirectiveInfo directive) {
3759      this.directive = directive;
3760      assert(directive.getId() != null);
3761      assert(directive.getPath() != null);
3762      assert(directive.getReplication() != null);
3763      assert(directive.getPool() != null);
3764      assert(directive.getExpiration() != null);
3765      return this;
3766    }
3767
3768    @Override
3769    void readFields(DataInputStream in, int logVersion) throws IOException {
3770      directive = FSImageSerialization.readCacheDirectiveInfo(in);
3771      readRpcIds(in, logVersion);
3772    }
3773
3774    @Override
3775    public void writeFields(DataOutputStream out) throws IOException {
3776      FSImageSerialization.writeCacheDirectiveInfo(out, directive);
3777      writeRpcIds(rpcClientId, rpcCallId, out);
3778    }
3779
3780    @Override
3781    protected void toXml(ContentHandler contentHandler) throws SAXException {
3782      FSImageSerialization.writeCacheDirectiveInfo(contentHandler, directive);
3783      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
3784    }
3785
3786    @Override
3787    void fromXml(Stanza st) throws InvalidXmlException {
3788      directive = FSImageSerialization.readCacheDirectiveInfo(st);
3789      readRpcIdsFromXml(st);
3790    }
3791
3792    @Override
3793    public String toString() {
3794      StringBuilder builder = new StringBuilder();
3795      builder.append("AddCacheDirectiveInfo [");
3796      builder.append("id=" + directive.getId() + ",");
3797      builder.append("path=" + directive.getPath().toUri().getPath() + ",");
3798      builder.append("replication=" + directive.getReplication() + ",");
3799      builder.append("pool=" + directive.getPool() + ",");
3800      builder.append("expiration=" + directive.getExpiration().getMillis());
3801      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
3802      builder.append("]");
3803      return builder.toString();
3804    }
3805  }
3806
3807  /**
3808   * {@literal @AtMostOnce} for
3809   * {@link ClientProtocol#modifyCacheDirective}
3810   */
3811  static class ModifyCacheDirectiveInfoOp extends FSEditLogOp {
3812    CacheDirectiveInfo directive;
3813
3814    public ModifyCacheDirectiveInfoOp() {
3815      super(OP_MODIFY_CACHE_DIRECTIVE);
3816    }
3817
3818    static ModifyCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
3819      return (ModifyCacheDirectiveInfoOp) cache
3820          .get(OP_MODIFY_CACHE_DIRECTIVE);
3821    }
3822
3823    @Override
3824    void resetSubFields() {
3825      directive = null;
3826    }
3827
3828    public ModifyCacheDirectiveInfoOp setDirective(
3829        CacheDirectiveInfo directive) {
3830      this.directive = directive;
3831      assert(directive.getId() != null);
3832      return this;
3833    }
3834
3835    @Override
3836    void readFields(DataInputStream in, int logVersion) throws IOException {
3837      this.directive = FSImageSerialization.readCacheDirectiveInfo(in);
3838      readRpcIds(in, logVersion);
3839    }
3840
3841    @Override
3842    public void writeFields(DataOutputStream out) throws IOException {
3843      FSImageSerialization.writeCacheDirectiveInfo(out, directive);
3844      writeRpcIds(rpcClientId, rpcCallId, out);
3845    }
3846
3847    @Override
3848    protected void toXml(ContentHandler contentHandler) throws SAXException {
3849      FSImageSerialization.writeCacheDirectiveInfo(contentHandler, directive);
3850      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
3851    }
3852
3853    @Override
3854    void fromXml(Stanza st) throws InvalidXmlException {
3855      this.directive = FSImageSerialization.readCacheDirectiveInfo(st);
3856      readRpcIdsFromXml(st);
3857    }
3858
3859    @Override
3860    public String toString() {
3861      StringBuilder builder = new StringBuilder();
3862      builder.append("ModifyCacheDirectiveInfoOp[");
3863      builder.append("id=").append(directive.getId());
3864      if (directive.getPath() != null) {
3865        builder.append(",").append("path=").append(directive.getPath());
3866      }
3867      if (directive.getReplication() != null) {
3868        builder.append(",").append("replication=").
3869            append(directive.getReplication());
3870      }
3871      if (directive.getPool() != null) {
3872        builder.append(",").append("pool=").append(directive.getPool());
3873      }
3874      if (directive.getExpiration() != null) {
3875        builder.append(",").append("expiration=").
3876            append(directive.getExpiration().getMillis());
3877      }
3878      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
3879      builder.append("]");
3880      return builder.toString();
3881    }
3882  }
3883
3884  /**
3885   * {@literal @AtMostOnce} for
3886   * {@link ClientProtocol#removeCacheDirective}
3887   */
3888  static class RemoveCacheDirectiveInfoOp extends FSEditLogOp {
3889    long id;
3890
3891    public RemoveCacheDirectiveInfoOp() {
3892      super(OP_REMOVE_CACHE_DIRECTIVE);
3893    }
3894
3895    static RemoveCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
3896      return (RemoveCacheDirectiveInfoOp) cache
3897          .get(OP_REMOVE_CACHE_DIRECTIVE);
3898    }
3899
3900    @Override
3901    void resetSubFields() {
3902      id = 0L;
3903    }
3904
3905    public RemoveCacheDirectiveInfoOp setId(long id) {
3906      this.id = id;
3907      return this;
3908    }
3909
3910    @Override
3911    void readFields(DataInputStream in, int logVersion) throws IOException {
3912      this.id = FSImageSerialization.readLong(in);
3913      readRpcIds(in, logVersion);
3914    }
3915
3916    @Override
3917    public void writeFields(DataOutputStream out) throws IOException {
3918      FSImageSerialization.writeLong(id, out);
3919      writeRpcIds(rpcClientId, rpcCallId, out);
3920    }
3921
3922    @Override
3923    protected void toXml(ContentHandler contentHandler) throws SAXException {
3924      XMLUtils.addSaxString(contentHandler, "ID", Long.toString(id));
3925      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
3926    }
3927
3928    @Override
3929    void fromXml(Stanza st) throws InvalidXmlException {
3930      this.id = Long.parseLong(st.getValue("ID"));
3931      readRpcIdsFromXml(st);
3932    }
3933
3934    @Override
3935    public String toString() {
3936      StringBuilder builder = new StringBuilder();
3937      builder.append("RemoveCacheDirectiveInfo [");
3938      builder.append("id=" + Long.toString(id));
3939      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
3940      builder.append("]");
3941      return builder.toString();
3942    }
3943  }
3944
3945  /** {@literal @AtMostOnce} for {@link ClientProtocol#addCachePool} */
3946  static class AddCachePoolOp extends FSEditLogOp {
3947    CachePoolInfo info;
3948
3949    public AddCachePoolOp() {
3950      super(OP_ADD_CACHE_POOL);
3951    }
3952
3953    static AddCachePoolOp getInstance(OpInstanceCache cache) {
3954      return (AddCachePoolOp) cache.get(OP_ADD_CACHE_POOL);
3955    }
3956
3957    @Override
3958    void resetSubFields() {
3959      info = null;
3960    }
3961
3962    public AddCachePoolOp setPool(CachePoolInfo info) {
3963      this.info = info;
3964      assert(info.getPoolName() != null);
3965      assert(info.getOwnerName() != null);
3966      assert(info.getGroupName() != null);
3967      assert(info.getMode() != null);
3968      assert(info.getLimit() != null);
3969      return this;
3970    }
3971
3972    @Override
3973    void readFields(DataInputStream in, int logVersion) throws IOException {
3974      info = FSImageSerialization.readCachePoolInfo(in);
3975      readRpcIds(in, logVersion);
3976    }
3977
3978    @Override
3979    public void writeFields(DataOutputStream out) throws IOException {
3980      FSImageSerialization.writeCachePoolInfo(out, info);
3981      writeRpcIds(rpcClientId, rpcCallId, out);
3982    }
3983
3984    @Override
3985    protected void toXml(ContentHandler contentHandler) throws SAXException {
3986      FSImageSerialization.writeCachePoolInfo(contentHandler, info);
3987      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
3988    }
3989
3990    @Override
3991    void fromXml(Stanza st) throws InvalidXmlException {
3992      this.info = FSImageSerialization.readCachePoolInfo(st);
3993      readRpcIdsFromXml(st);
3994    }
3995
3996    @Override
3997    public String toString() {
3998      StringBuilder builder = new StringBuilder();
3999      builder.append("AddCachePoolOp [");
4000      builder.append("poolName=" + info.getPoolName() + ",");
4001      builder.append("ownerName=" + info.getOwnerName() + ",");
4002      builder.append("groupName=" + info.getGroupName() + ",");
4003      builder.append("mode=" + Short.toString(info.getMode().toShort()) + ",");
4004      builder.append("limit=" + Long.toString(info.getLimit()));
4005      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
4006      builder.append("]");
4007      return builder.toString();
4008    }
4009  }
4010
4011  /** {@literal @AtMostOnce} for {@link ClientProtocol#modifyCachePool} */
4012  static class ModifyCachePoolOp extends FSEditLogOp {
4013    CachePoolInfo info;
4014
4015    public ModifyCachePoolOp() {
4016      super(OP_MODIFY_CACHE_POOL);
4017    }
4018
4019    static ModifyCachePoolOp getInstance(OpInstanceCache cache) {
4020      return (ModifyCachePoolOp) cache.get(OP_MODIFY_CACHE_POOL);
4021    }
4022
4023    @Override
4024    void resetSubFields() {
4025      info = null;
4026    }
4027
4028    public ModifyCachePoolOp setInfo(CachePoolInfo info) {
4029      this.info = info;
4030      return this;
4031    }
4032
4033    @Override
4034    void readFields(DataInputStream in, int logVersion) throws IOException {
4035      info = FSImageSerialization.readCachePoolInfo(in);
4036      readRpcIds(in, logVersion);
4037    }
4038
4039    @Override
4040    public void writeFields(DataOutputStream out) throws IOException {
4041      FSImageSerialization.writeCachePoolInfo(out, info);
4042      writeRpcIds(rpcClientId, rpcCallId, out);
4043    }
4044
4045    @Override
4046    protected void toXml(ContentHandler contentHandler) throws SAXException {
4047      FSImageSerialization.writeCachePoolInfo(contentHandler, info);
4048      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
4049    }
4050
4051    @Override
4052    void fromXml(Stanza st) throws InvalidXmlException {
4053      this.info = FSImageSerialization.readCachePoolInfo(st);
4054      readRpcIdsFromXml(st);
4055    }
4056
4057    @Override
4058    public String toString() {
4059      StringBuilder builder = new StringBuilder();
4060      builder.append("ModifyCachePoolOp [");
4061      ArrayList<String> fields = new ArrayList<String>(5);
4062      if (info.getPoolName() != null) {
4063        fields.add("poolName=" + info.getPoolName());
4064      }
4065      if (info.getOwnerName() != null) {
4066        fields.add("ownerName=" + info.getOwnerName());
4067      }
4068      if (info.getGroupName() != null) {
4069        fields.add("groupName=" + info.getGroupName());
4070      }
4071      if (info.getMode() != null) {
4072        fields.add("mode=" + info.getMode().toString());
4073      }
4074      if (info.getLimit() != null) {
4075        fields.add("limit=" + info.getLimit());
4076      }
4077      builder.append(Joiner.on(",").join(fields));
4078      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
4079      builder.append("]");
4080      return builder.toString();
4081    }
4082  }
4083
4084  /** {@literal @AtMostOnce} for {@link ClientProtocol#removeCachePool} */
4085  static class RemoveCachePoolOp extends FSEditLogOp {
4086    String poolName;
4087
4088    public RemoveCachePoolOp() {
4089      super(OP_REMOVE_CACHE_POOL);
4090    }
4091
4092    static RemoveCachePoolOp getInstance(OpInstanceCache cache) {
4093      return (RemoveCachePoolOp) cache.get(OP_REMOVE_CACHE_POOL);
4094    }
4095
4096    @Override
4097    void resetSubFields() {
4098      poolName = null;
4099    }
4100
4101    public RemoveCachePoolOp setPoolName(String poolName) {
4102      this.poolName = poolName;
4103      return this;
4104    }
4105
4106    @Override
4107    void readFields(DataInputStream in, int logVersion) throws IOException {
4108      poolName = FSImageSerialization.readString(in);
4109      readRpcIds(in, logVersion);
4110    }
4111
4112    @Override
4113    public void writeFields(DataOutputStream out) throws IOException {
4114      FSImageSerialization.writeString(poolName, out);
4115      writeRpcIds(rpcClientId, rpcCallId, out);
4116    }
4117
4118    @Override
4119    protected void toXml(ContentHandler contentHandler) throws SAXException {
4120      XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName);
4121      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
4122    }
4123
4124    @Override
4125    void fromXml(Stanza st) throws InvalidXmlException {
4126      this.poolName = st.getValue("POOLNAME");
4127      readRpcIdsFromXml(st);
4128    }
4129
4130    @Override
4131    public String toString() {
4132      StringBuilder builder = new StringBuilder();
4133      builder.append("RemoveCachePoolOp [");
4134      builder.append("poolName=" + poolName);
4135      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
4136      builder.append("]");
4137      return builder.toString();
4138    }
4139  }
4140  
4141  static class RemoveXAttrOp extends FSEditLogOp {
4142    List<XAttr> xAttrs;
4143    String src;
4144    
4145    RemoveXAttrOp() {
4146      super(OP_REMOVE_XATTR);
4147    }
4148    
4149    static RemoveXAttrOp getInstance() {
4150      return new RemoveXAttrOp();
4151    }
4152
4153    @Override
4154    void resetSubFields() {
4155      xAttrs = null;
4156      src = null;
4157    }
4158
4159    @Override
4160    void readFields(DataInputStream in, int logVersion) throws IOException {
4161      XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
4162      src = p.getSrc();
4163      xAttrs = PBHelperClient.convertXAttrs(p.getXAttrsList());
4164      readRpcIds(in, logVersion);
4165    }
4166
4167    @Override
4168    public void writeFields(DataOutputStream out) throws IOException {
4169      XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
4170      if (src != null) {
4171        b.setSrc(src);
4172      }
4173      b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
4174      b.build().writeDelimitedTo(out);
4175      // clientId and callId
4176      writeRpcIds(rpcClientId, rpcCallId, out);
4177    }
4178
4179    @Override
4180    protected void toXml(ContentHandler contentHandler) throws SAXException {
4181      XMLUtils.addSaxString(contentHandler, "SRC", src);
4182      appendXAttrsToXml(contentHandler, xAttrs);
4183      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
4184    }
4185
4186    @Override
4187    void fromXml(Stanza st) throws InvalidXmlException {
4188      src = st.getValue("SRC");
4189      xAttrs = readXAttrsFromXml(st);
4190      readRpcIdsFromXml(st);
4191    }
4192  }
4193  
4194  static class SetXAttrOp extends FSEditLogOp {
4195    List<XAttr> xAttrs;
4196    String src;
4197    
4198    SetXAttrOp() {
4199      super(OP_SET_XATTR);
4200    }
4201    
4202    static SetXAttrOp getInstance() {
4203      return new SetXAttrOp();
4204    }
4205
4206    @Override
4207    void resetSubFields() {
4208      xAttrs = null;
4209      src = null;
4210    }
4211
4212    @Override
4213    void readFields(DataInputStream in, int logVersion) throws IOException {
4214      XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
4215      src = p.getSrc();
4216      xAttrs = PBHelperClient.convertXAttrs(p.getXAttrsList());
4217      readRpcIds(in, logVersion);
4218    }
4219
4220    @Override
4221    public void writeFields(DataOutputStream out) throws IOException {
4222      XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
4223      if (src != null) {
4224        b.setSrc(src);
4225      }
4226      b.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
4227      b.build().writeDelimitedTo(out);
4228      // clientId and callId
4229      writeRpcIds(rpcClientId, rpcCallId, out);
4230    }
4231
4232    @Override
4233    protected void toXml(ContentHandler contentHandler) throws SAXException {
4234      XMLUtils.addSaxString(contentHandler, "SRC", src);
4235      appendXAttrsToXml(contentHandler, xAttrs);
4236      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
4237    }
4238
4239    @Override
4240    void fromXml(Stanza st) throws InvalidXmlException {
4241      src = st.getValue("SRC");
4242      xAttrs = readXAttrsFromXml(st);
4243      readRpcIdsFromXml(st);
4244    }
4245  }
4246
4247  static class SetAclOp extends FSEditLogOp {
4248    List<AclEntry> aclEntries = Lists.newArrayList();
4249    String src;
4250
4251    SetAclOp() {
4252      super(OP_SET_ACL);
4253    }
4254
4255    static SetAclOp getInstance() {
4256      return new SetAclOp();
4257    }
4258
4259    @Override
4260    void resetSubFields() {
4261      aclEntries = null;
4262      src = null;
4263    }
4264
4265    @Override
4266    void readFields(DataInputStream in, int logVersion) throws IOException {
4267      AclEditLogProto p = AclEditLogProto.parseDelimitedFrom(in);
4268      if (p == null) {
4269        throw new IOException("Failed to read fields from SetAclOp");
4270      }
4271      src = p.getSrc();
4272      aclEntries = PBHelperClient.convertAclEntry(p.getEntriesList());
4273    }
4274
4275    @Override
4276    public void writeFields(DataOutputStream out) throws IOException {
4277      AclEditLogProto.Builder b = AclEditLogProto.newBuilder();
4278      if (src != null)
4279        b.setSrc(src);
4280      b.addAllEntries(PBHelperClient.convertAclEntryProto(aclEntries));
4281      b.build().writeDelimitedTo(out);
4282    }
4283
4284    @Override
4285    protected void toXml(ContentHandler contentHandler) throws SAXException {
4286      XMLUtils.addSaxString(contentHandler, "SRC", src);
4287      appendAclEntriesToXml(contentHandler, aclEntries);
4288    }
4289
4290    @Override
4291    void fromXml(Stanza st) throws InvalidXmlException {
4292      src = st.getValue("SRC");
4293      aclEntries = readAclEntriesFromXml(st);
4294      if (aclEntries == null) {
4295        aclEntries = Lists.newArrayList();
4296      }
4297    }
4298  }
4299
4300  static private short readShort(DataInputStream in) throws IOException {
4301    return Short.parseShort(FSImageSerialization.readString(in));
4302  }
4303
4304  static private long readLong(DataInputStream in) throws IOException {
4305    return Long.parseLong(FSImageSerialization.readString(in));
4306  }
4307
4308  /**
4309   * A class to read in blocks stored in the old format. The only two
4310   * fields in the block were blockid and length.
4311   */
4312  static class BlockTwo implements Writable {
4313    long blkid;
4314    long len;
4315
4316    static {                                      // register a ctor
4317      WritableFactories.setFactory
4318        (BlockTwo.class,
4319         new WritableFactory() {
4320           @Override
4321           public Writable newInstance() { return new BlockTwo(); }
4322         });
4323    }
4324
4325
4326    BlockTwo() {
4327      blkid = 0;
4328      len = 0;
4329    }
4330    /////////////////////////////////////
4331    // Writable
4332    /////////////////////////////////////
4333    @Override
4334    public void write(DataOutput out) throws IOException {
4335      out.writeLong(blkid);
4336      out.writeLong(len);
4337    }
4338
4339    @Override
4340    public void readFields(DataInput in) throws IOException {
4341      this.blkid = in.readLong();
4342      this.len = in.readLong();
4343    }
4344  }
4345  /**
4346   * Operation corresponding to upgrade
4347   */
4348  abstract static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
4349    private final String name;
4350    private long time;
4351
4352    public RollingUpgradeOp(FSEditLogOpCodes code, String name) {
4353      super(code);
4354      this.name = StringUtils.toUpperCase(name);
4355    }
4356
4357    static RollingUpgradeOp getStartInstance(OpInstanceCache cache) {
4358      return (RollingUpgradeOp) cache.get(OP_ROLLING_UPGRADE_START);
4359    }
4360
4361    static RollingUpgradeOp getFinalizeInstance(OpInstanceCache cache) {
4362      return (RollingUpgradeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE);
4363    }
4364
4365    @Override
4366    void resetSubFields() {
4367      time = 0L;
4368    }
4369
4370    long getTime() {
4371      return time;
4372    }
4373
4374    void setTime(long time) {
4375      this.time = time;
4376    }
4377
4378    @Override
4379    void readFields(DataInputStream in, int logVersion) throws IOException {
4380      time = in.readLong();
4381    }
4382
4383    @Override
4384    public void writeFields(DataOutputStream out) throws IOException {
4385      FSImageSerialization.writeLong(time, out);
4386    }
4387
4388    @Override
4389    protected void toXml(ContentHandler contentHandler) throws SAXException {
4390      XMLUtils.addSaxString(contentHandler, name + "TIME",
4391          Long.toString(time));
4392    }
4393
4394    @Override
4395    void fromXml(Stanza st) throws InvalidXmlException {
4396      this.time = Long.parseLong(st.getValue(name + "TIME"));
4397    }
4398
4399    @Override
4400    public String toString() {
4401      return new StringBuilder().append("RollingUpgradeOp [").append(name)
4402          .append(", time=").append(time).append("]").toString();
4403    }
4404    
4405    static class RollbackException extends IOException {
4406      private static final long serialVersionUID = 1L;
4407    }
4408  }
4409
4410  /** {@literal @Idempotent} for {@link ClientProtocol#setStoragePolicy} */
4411  static class SetStoragePolicyOp extends FSEditLogOp {
4412    String path;
4413    byte policyId;
4414
4415    SetStoragePolicyOp() {
4416      super(OP_SET_STORAGE_POLICY);
4417    }
4418
4419    static SetStoragePolicyOp getInstance(OpInstanceCache cache) {
4420      return (SetStoragePolicyOp) cache.get(OP_SET_STORAGE_POLICY);
4421    }
4422
4423    @Override
4424    void resetSubFields() {
4425      path = null;
4426      policyId = 0;
4427    }
4428
4429    SetStoragePolicyOp setPath(String path) {
4430      this.path = path;
4431      return this;
4432    }
4433
4434    SetStoragePolicyOp setPolicyId(byte policyId) {
4435      this.policyId = policyId;
4436      return this;
4437    }
4438
4439    @Override
4440    public void writeFields(DataOutputStream out) throws IOException {
4441      FSImageSerialization.writeString(path, out);
4442      out.writeByte(policyId);
4443    }
4444
4445    @Override
4446    void readFields(DataInputStream in, int logVersion)
4447        throws IOException {
4448      this.path = FSImageSerialization.readString(in);
4449      this.policyId = in.readByte();
4450    }
4451
4452    @Override
4453    public String toString() {
4454      StringBuilder builder = new StringBuilder();
4455      builder.append("SetStoragePolicyOp [path=");
4456      builder.append(path);
4457      builder.append(", policyId=");
4458      builder.append(policyId);
4459      builder.append(", opCode=");
4460      builder.append(opCode);
4461      builder.append(", txid=");
4462      builder.append(txid);
4463      builder.append("]");
4464      return builder.toString();
4465    }
4466
4467    @Override
4468    protected void toXml(ContentHandler contentHandler) throws SAXException {
4469      XMLUtils.addSaxString(contentHandler, "PATH", path);
4470      XMLUtils.addSaxString(contentHandler, "POLICYID",
4471          Byte.toString(policyId));
4472    }
4473
4474    @Override
4475    void fromXml(Stanza st) throws InvalidXmlException {
4476      this.path = st.getValue("PATH");
4477      this.policyId = Byte.parseByte(st.getValue("POLICYID"));
4478    }
4479  }  
4480
4481  static class RollingUpgradeStartOp extends RollingUpgradeOp {
4482    RollingUpgradeStartOp() {
4483      super(OP_ROLLING_UPGRADE_START, "start");
4484    }
4485
4486    static RollingUpgradeStartOp getInstance(OpInstanceCache cache) {
4487      return (RollingUpgradeStartOp) cache.get(OP_ROLLING_UPGRADE_START);
4488    }
4489  }
4490
4491  static class RollingUpgradeFinalizeOp extends RollingUpgradeOp {
4492    RollingUpgradeFinalizeOp() {
4493      super(OP_ROLLING_UPGRADE_FINALIZE, "finalize");
4494    }
4495
4496    static RollingUpgradeFinalizeOp getInstance(OpInstanceCache cache) {
4497      return (RollingUpgradeFinalizeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE);
4498    }
4499  }
4500
4501  /**
4502   * Class for writing editlog ops
4503   */
4504  public static class Writer {
4505    private final DataOutputBuffer buf;
4506    private final Checksum checksum;
4507
4508    public Writer(DataOutputBuffer out) {
4509      this.buf = out;
4510      this.checksum = DataChecksum.newCrc32();
4511    }
4512
4513    /**
4514     * Write an operation to the output stream
4515     * 
4516     * @param op The operation to write
4517     * @throws IOException if an error occurs during writing.
4518     */
4519    public void writeOp(FSEditLogOp op) throws IOException {
4520      int start = buf.getLength();
4521      // write the op code first to make padding and terminator verification
4522      // work
4523      buf.writeByte(op.opCode.getOpCode());
4524      buf.writeInt(0); // write 0 for the length first
4525      buf.writeLong(op.txid);
4526      op.writeFields(buf);
4527      int end = buf.getLength();
4528      
4529      // write the length back: content of the op + 4 bytes checksum - op_code
4530      int length = end - start - 1;
4531      buf.writeInt(length, start + 1);
4532
4533      checksum.reset();
4534      checksum.update(buf.getData(), start, end-start);
4535      int sum = (int)checksum.getValue();
4536      buf.writeInt(sum);
4537    }
4538  }
4539
4540  /**
4541   * Class for reading editlog ops from a stream
4542   */
4543  public abstract static class Reader {
4544    final DataInputStream in;
4545    final StreamLimiter limiter;
4546    final OpInstanceCache cache;
4547    final byte[] temp = new byte[4096];
4548    final int logVersion;
4549    int maxOpSize;
4550
4551    public static Reader create(DataInputStream in, StreamLimiter limiter,
4552                                int logVersion) {
4553      if (logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION) {
4554        // Use the LengthPrefixedReader on edit logs which are newer than what
4555        // we can parse.  (Newer layout versions are represented by smaller
4556        // negative integers, for historical reasons.) Even though we can't
4557        // parse the Ops contained in them, we should still be able to call
4558        // scanOp on them.  This is important for the JournalNode during rolling
4559        // upgrade.
4560        return new LengthPrefixedReader(in, limiter, logVersion);
4561      } else if (NameNodeLayoutVersion.supports(
4562              NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)) {
4563        return new LengthPrefixedReader(in, limiter, logVersion);
4564      } else if (NameNodeLayoutVersion.supports(
4565          LayoutVersion.Feature.EDITS_CHECKSUM, logVersion)) {
4566        Checksum checksum = DataChecksum.newCrc32();
4567        return new ChecksummedReader(checksum, in, limiter, logVersion);
4568      } else {
4569        return new LegacyReader(in, limiter, logVersion);
4570      }
4571    }
4572
4573    /**
4574     * Construct the reader
4575     * @param in            The stream to read from.
4576     * @param limiter       The limiter for this stream.
4577     * @param logVersion    The version of the data coming from the stream.
4578     */
4579    Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
4580      this.in = in;
4581      this.limiter = limiter;
4582      this.logVersion = logVersion;
4583      this.cache = new OpInstanceCache();
4584      this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
4585    }
4586
4587    public void setMaxOpSize(int maxOpSize) {
4588      this.maxOpSize = maxOpSize;
4589    }
4590
4591    /**
4592     * Read an operation from the input stream.
4593     * 
4594     * Note that the objects returned from this method may be re-used by future
4595     * calls to the same method.
4596     * 
4597     * @param skipBrokenEdits    If true, attempt to skip over damaged parts of
4598     * the input stream, rather than throwing an IOException
4599     * @return the operation read from the stream, or null at the end of the 
4600     *         file
4601     * @throws IOException on error.  This function should only throw an
4602     *         exception when skipBrokenEdits is false.
4603     */
4604    public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
4605      while (true) {
4606        try {
4607          return decodeOp();
4608        } catch (IOException e) {
4609          in.reset();
4610          if (!skipBrokenEdits) {
4611            throw e;
4612          }
4613        } catch (RuntimeException e) {
4614          // FSEditLogOp#decodeOp is not supposed to throw RuntimeException.
4615          // However, we handle it here for recovery mode, just to be more
4616          // robust.
4617          in.reset();
4618          if (!skipBrokenEdits) {
4619            throw e;
4620          }
4621        } catch (Throwable e) {
4622          in.reset();
4623          if (!skipBrokenEdits) {
4624            throw new IOException("got unexpected exception " +
4625                e.getMessage(), e);
4626          }
4627        }
4628        // Move ahead one byte and re-try the decode process.
4629        if (in.skip(1) < 1) {
4630          return null;
4631        }
4632      }
4633    }
4634
4635    void verifyTerminator() throws IOException {
4636      /** The end of the edit log should contain only 0x00 or 0xff bytes.
4637       * If it contains other bytes, the log itself may be corrupt.
4638       * It is important to check this; if we don't, a stray OP_INVALID byte 
4639       * could make us stop reading the edit log halfway through, and we'd never
4640       * know that we had lost data.
4641       */
4642      limiter.clearLimit();
4643      int numRead = -1, idx = 0;
4644      while (true) {
4645        try {
4646          numRead = -1;
4647          idx = 0;
4648          numRead = in.read(temp);
4649          if (numRead == -1) {
4650            return;
4651          }
4652          while (idx < numRead) {
4653            if ((temp[idx] != (byte)0) && (temp[idx] != (byte)-1)) {
4654              throw new IOException("Read extra bytes after " +
4655                "the terminator!");
4656            }
4657            idx++;
4658          }
4659        } finally {
4660          // After reading each group of bytes, we reposition the mark one
4661          // byte before the next group.  Similarly, if there is an error, we
4662          // want to reposition the mark one byte before the error
4663          if (numRead != -1) { 
4664            in.reset();
4665            IOUtils.skipFully(in, idx);
4666            in.mark(temp.length + 1);
4667            IOUtils.skipFully(in, 1);
4668          }
4669        }
4670      }
4671    }
4672
4673    /**
4674     * Read an opcode from the input stream.
4675     *
4676     * @return   the opcode, or null on EOF.
4677     *
4678     * If an exception is thrown, the stream's mark will be set to the first
4679     * problematic byte.  This usually means the beginning of the opcode.
4680     */
4681    public abstract FSEditLogOp decodeOp() throws IOException;
4682
4683    /**
4684     * Similar to decodeOp(), but we only retrieve the transaction ID of the
4685     * Op rather than reading it.  If the edit log format supports length
4686     * prefixing, this can be much faster than full decoding.
4687     *
4688     * @return the last txid of the segment, or INVALID_TXID on EOF.
4689     */
4690    public abstract long scanOp() throws IOException;
4691  }
4692
4693  /**
4694   * Reads edit logs which are prefixed with a length.  These edit logs also
4695   * include a checksum and transaction ID.
4696   */
4697  private static class LengthPrefixedReader extends Reader {
4698    /**
4699     * The minimum length of a length-prefixed Op.
4700     *
4701     * The minimum Op has:
4702     * 1-byte opcode
4703     * 4-byte length
4704     * 8-byte txid
4705     * 0-byte body
4706     * 4-byte checksum
4707     */
4708    private static final int MIN_OP_LENGTH = 17;
4709
4710    /**
4711     * The op id length.
4712     *
4713     * Not included in the stored length.
4714     */
4715    private static final int OP_ID_LENGTH = 1;
4716
4717    /**
4718     * The checksum length.
4719     *
4720     * Not included in the stored length.
4721     */
4722    private static final int CHECKSUM_LENGTH = 4;
4723
4724    private final Checksum checksum;
4725
4726    LengthPrefixedReader(DataInputStream in, StreamLimiter limiter,
4727                         int logVersion) {
4728      super(in, limiter, logVersion);
4729      this.checksum = DataChecksum.newCrc32();
4730    }
4731
4732    @Override
4733    public FSEditLogOp decodeOp() throws IOException {
4734      long txid = decodeOpFrame();
4735      if (txid == HdfsServerConstants.INVALID_TXID) {
4736        return null;
4737      }
4738      in.reset();
4739      in.mark(maxOpSize);
4740      FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(in.readByte());
4741      FSEditLogOp op = cache.get(opCode);
4742      if (op == null) {
4743        throw new IOException("Read invalid opcode " + opCode);
4744      }
4745      op.setTransactionId(txid);
4746      IOUtils.skipFully(in, 4 + 8); // skip length and txid
4747      op.readFields(in, logVersion);
4748      // skip over the checksum, which we validated above.
4749      IOUtils.skipFully(in, CHECKSUM_LENGTH);
4750      return op;
4751    }
4752
4753    @Override
4754    public long scanOp() throws IOException {
4755      return decodeOpFrame();
4756    }
4757
4758    /**
4759     * Decode the opcode "frame".  This includes reading the opcode and
4760     * transaction ID, and validating the checksum and length.  It does not
4761     * include reading the opcode-specific fields.
4762     * The input stream will be advanced to the end of the op at the end of this
4763     * function.
4764     *
4765     * @return        An op with the txid set, but none of the other fields
4766     *                  filled in, or null if we hit EOF.
4767     */
4768    private long decodeOpFrame() throws IOException {
4769      limiter.setLimit(maxOpSize);
4770      in.mark(maxOpSize);
4771      byte opCodeByte;
4772      try {
4773        opCodeByte = in.readByte();
4774      } catch (EOFException eof) {
4775        // EOF at an opcode boundary is expected.
4776        return HdfsServerConstants.INVALID_TXID;
4777      }
4778      if (opCodeByte == FSEditLogOpCodes.OP_INVALID.getOpCode()) {
4779        verifyTerminator();
4780        return HdfsServerConstants.INVALID_TXID;
4781      }
4782      // Here, we verify that the Op size makes sense and that the
4783      // data matches its checksum before attempting to construct an Op.
4784      // This is important because otherwise we may encounter an
4785      // OutOfMemoryException which could bring down the NameNode or
4786      // JournalNode when reading garbage data.
4787      int opLength =  in.readInt() + OP_ID_LENGTH + CHECKSUM_LENGTH;
4788      if (opLength > maxOpSize) {
4789        throw new IOException("Op " + (int)opCodeByte + " has size " +
4790            opLength + ", but maxOpSize = " + maxOpSize);
4791      } else  if (opLength < MIN_OP_LENGTH) {
4792        throw new IOException("Op " + (int)opCodeByte + " has size " +
4793            opLength + ", but the minimum op size is " + MIN_OP_LENGTH);
4794      }
4795      long txid = in.readLong();
4796      // Verify checksum
4797      in.reset();
4798      in.mark(maxOpSize);
4799      checksum.reset();
4800      for (int rem = opLength - CHECKSUM_LENGTH; rem > 0;) {
4801        int toRead = Math.min(temp.length, rem);
4802        IOUtils.readFully(in, temp, 0, toRead);
4803        checksum.update(temp, 0, toRead);
4804        rem -= toRead;
4805      }
4806      int expectedChecksum = in.readInt();
4807      int calculatedChecksum = (int)checksum.getValue();
4808      if (expectedChecksum != calculatedChecksum) {
4809        throw new ChecksumException(
4810            "Transaction is corrupt. Calculated checksum is " +
4811            calculatedChecksum + " but read checksum " +
4812            expectedChecksum, txid);
4813      }
4814      return txid;
4815    }
4816  }
4817
4818  /**
4819   * Read edit logs which have a checksum and a transaction ID, but not a
4820   * length.
4821   */
4822  private static class ChecksummedReader extends Reader {
4823    private final Checksum checksum;
4824
4825    ChecksummedReader(Checksum checksum, DataInputStream in,
4826                      StreamLimiter limiter, int logVersion) {
4827      super(new DataInputStream(
4828          new CheckedInputStream(in, checksum)), limiter, logVersion);
4829      this.checksum = checksum;
4830    }
4831
4832    @Override
4833    public FSEditLogOp decodeOp() throws IOException {
4834      limiter.setLimit(maxOpSize);
4835      in.mark(maxOpSize);
4836      // Reset the checksum.  Since we are using a CheckedInputStream, each
4837      // subsequent read from the  stream will update the checksum.
4838      checksum.reset();
4839      byte opCodeByte;
4840      try {
4841        opCodeByte = in.readByte();
4842      } catch (EOFException eof) {
4843        // EOF at an opcode boundary is expected.
4844        return null;
4845      }
4846      FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
4847      if (opCode == OP_INVALID) {
4848        verifyTerminator();
4849        return null;
4850      }
4851      FSEditLogOp op = cache.get(opCode);
4852      if (op == null) {
4853        throw new IOException("Read invalid opcode " + opCode);
4854      }
4855      op.setTransactionId(in.readLong());
4856      op.readFields(in, logVersion);
4857      // Verify checksum
4858      int calculatedChecksum = (int)checksum.getValue();
4859      int expectedChecksum = in.readInt();
4860      if (expectedChecksum != calculatedChecksum) {
4861        throw new ChecksumException(
4862            "Transaction is corrupt. Calculated checksum is " +
4863                calculatedChecksum + " but read checksum " +
4864                expectedChecksum, op.txid);
4865      }
4866      return op;
4867    }
4868
4869    @Override
4870    public long scanOp() throws IOException {
4871      // Edit logs of this age don't have any length prefix, so we just have
4872      // to read the entire Op.
4873      FSEditLogOp op = decodeOp();
4874      return op == null ?
4875          HdfsServerConstants.INVALID_TXID : op.getTransactionId();
4876    }
4877  }
4878
4879  /**
4880   * Read older edit logs which may or may not have transaction IDs and other
4881   * features.  This code is used during upgrades and to allow HDFS INotify to
4882   * read older edit log files.
4883   */
4884  private static class LegacyReader extends Reader {
4885    LegacyReader(DataInputStream in,
4886                      StreamLimiter limiter, int logVersion) {
4887      super(in, limiter, logVersion);
4888    }
4889
4890    @Override
4891    public FSEditLogOp decodeOp() throws IOException {
4892      limiter.setLimit(maxOpSize);
4893      in.mark(maxOpSize);
4894      byte opCodeByte;
4895      try {
4896        opCodeByte = in.readByte();
4897      } catch (EOFException eof) {
4898        // EOF at an opcode boundary is expected.
4899        return null;
4900      }
4901      FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
4902      if (opCode == OP_INVALID) {
4903        verifyTerminator();
4904        return null;
4905      }
4906      FSEditLogOp op = cache.get(opCode);
4907      if (op == null) {
4908        throw new IOException("Read invalid opcode " + opCode);
4909      }
4910      if (NameNodeLayoutVersion.supports(
4911            LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
4912        op.setTransactionId(in.readLong());
4913      } else {
4914        op.setTransactionId(HdfsServerConstants.INVALID_TXID);
4915      }
4916      op.readFields(in, logVersion);
4917      return op;
4918    }
4919
4920    @Override
4921    public long scanOp() throws IOException {
4922      if (!NameNodeLayoutVersion.supports(
4923          LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
4924        throw new IOException("Can't scan a pre-transactional edit log.");
4925      }
4926      FSEditLogOp op = decodeOp();
4927      return op == null ?
4928          HdfsServerConstants.INVALID_TXID : op.getTransactionId();
4929    }
4930  }
4931
4932  public void outputToXml(ContentHandler contentHandler) throws SAXException {
4933    contentHandler.startElement("", "", "RECORD", new AttributesImpl());
4934    XMLUtils.addSaxString(contentHandler, "OPCODE", opCode.toString());
4935    contentHandler.startElement("", "", "DATA", new AttributesImpl());
4936    XMLUtils.addSaxString(contentHandler, "TXID", "" + txid);
4937    toXml(contentHandler);
4938    contentHandler.endElement("", "", "DATA");
4939    contentHandler.endElement("", "", "RECORD");
4940  }
4941
4942  protected abstract void toXml(ContentHandler contentHandler)
4943      throws SAXException;
4944  
4945  abstract void fromXml(Stanza st) throws InvalidXmlException;
4946  
4947  public void decodeXml(Stanza st) throws InvalidXmlException {
4948    this.txid = Long.parseLong(st.getValue("TXID"));
4949    fromXml(st);
4950  }
4951  
4952  public static void blockToXml(ContentHandler contentHandler, Block block) 
4953      throws SAXException {
4954    contentHandler.startElement("", "", "BLOCK", new AttributesImpl());
4955    XMLUtils.addSaxString(contentHandler, "BLOCK_ID",
4956        Long.toString(block.getBlockId()));
4957    XMLUtils.addSaxString(contentHandler, "NUM_BYTES",
4958        Long.toString(block.getNumBytes()));
4959    XMLUtils.addSaxString(contentHandler, "GENSTAMP",
4960        Long.toString(block.getGenerationStamp()));
4961    contentHandler.endElement("", "", "BLOCK");
4962  }
4963
4964  public static Block blockFromXml(Stanza st)
4965      throws InvalidXmlException {
4966    long blockId = Long.parseLong(st.getValue("BLOCK_ID"));
4967    long numBytes = Long.parseLong(st.getValue("NUM_BYTES"));
4968    long generationStamp = Long.parseLong(st.getValue("GENSTAMP"));
4969    return new Block(blockId, numBytes, generationStamp);
4970  }
4971
4972  public static void delegationTokenToXml(ContentHandler contentHandler,
4973      DelegationTokenIdentifier token) throws SAXException {
4974    contentHandler.startElement(
4975        "", "", "DELEGATION_TOKEN_IDENTIFIER", new AttributesImpl());
4976    XMLUtils.addSaxString(contentHandler, "KIND", token.getKind().toString());
4977    XMLUtils.addSaxString(contentHandler, "SEQUENCE_NUMBER",
4978        Integer.toString(token.getSequenceNumber()));
4979    XMLUtils.addSaxString(contentHandler, "OWNER",
4980        token.getOwner().toString());
4981    XMLUtils.addSaxString(contentHandler, "RENEWER",
4982        token.getRenewer().toString());
4983    XMLUtils.addSaxString(contentHandler, "REALUSER",
4984        token.getRealUser().toString());
4985    XMLUtils.addSaxString(contentHandler, "ISSUE_DATE",
4986        Long.toString(token.getIssueDate()));
4987    XMLUtils.addSaxString(contentHandler, "MAX_DATE",
4988        Long.toString(token.getMaxDate()));
4989    XMLUtils.addSaxString(contentHandler, "MASTER_KEY_ID",
4990        Integer.toString(token.getMasterKeyId()));
4991    contentHandler.endElement("", "", "DELEGATION_TOKEN_IDENTIFIER");
4992  }
4993
4994  public static DelegationTokenIdentifier delegationTokenFromXml(Stanza st)
4995      throws InvalidXmlException {
4996    String kind = st.getValue("KIND");
4997    if (!kind.equals(DelegationTokenIdentifier.
4998        HDFS_DELEGATION_KIND.toString())) {
4999      throw new InvalidXmlException("can't understand " +
5000        "DelegationTokenIdentifier KIND " + kind);
5001    }
5002    int seqNum = Integer.parseInt(st.getValue("SEQUENCE_NUMBER"));
5003    String owner = st.getValue("OWNER");
5004    String renewer = st.getValue("RENEWER");
5005    String realuser = st.getValue("REALUSER");
5006    long issueDate = Long.parseLong(st.getValue("ISSUE_DATE"));
5007    long maxDate = Long.parseLong(st.getValue("MAX_DATE"));
5008    int masterKeyId = Integer.parseInt(st.getValue("MASTER_KEY_ID"));
5009    DelegationTokenIdentifier token =
5010        new DelegationTokenIdentifier(new Text(owner),
5011            new Text(renewer), new Text(realuser));
5012    token.setSequenceNumber(seqNum);
5013    token.setIssueDate(issueDate);
5014    token.setMaxDate(maxDate);
5015    token.setMasterKeyId(masterKeyId);
5016    return token;
5017  }
5018
5019  public static void delegationKeyToXml(ContentHandler contentHandler,
5020      DelegationKey key) throws SAXException {
5021    contentHandler.startElement(
5022        "", "", "DELEGATION_KEY", new AttributesImpl());
5023    XMLUtils.addSaxString(contentHandler, "KEY_ID",
5024        Integer.toString(key.getKeyId()));
5025    XMLUtils.addSaxString(contentHandler, "EXPIRY_DATE",
5026        Long.toString(key.getExpiryDate()));
5027    if (key.getEncodedKey() != null) {
5028      XMLUtils.addSaxString(contentHandler, "KEY",
5029          Hex.encodeHexString(key.getEncodedKey()));
5030    }
5031    contentHandler.endElement("", "", "DELEGATION_KEY");
5032  }
5033  
5034  public static DelegationKey delegationKeyFromXml(Stanza st)
5035      throws InvalidXmlException {
5036    int keyId = Integer.parseInt(st.getValue("KEY_ID"));
5037    long expiryDate = Long.parseLong(st.getValue("EXPIRY_DATE"));
5038    byte key[] = null;
5039    try {
5040      key = Hex.decodeHex(st.getValue("KEY").toCharArray());
5041    } catch (DecoderException e) {
5042      throw new InvalidXmlException(e.toString());
5043    } catch (InvalidXmlException e) {
5044    }
5045    return new DelegationKey(keyId, expiryDate, key);
5046  }
5047
5048  public static void permissionStatusToXml(ContentHandler contentHandler,
5049      PermissionStatus perm) throws SAXException {
5050    contentHandler.startElement(
5051        "", "", "PERMISSION_STATUS", new AttributesImpl());
5052    XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName());
5053    XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName());
5054    fsPermissionToXml(contentHandler, perm.getPermission());
5055    contentHandler.endElement("", "", "PERMISSION_STATUS");
5056  }
5057
5058  public static PermissionStatus permissionStatusFromXml(Stanza st)
5059      throws InvalidXmlException {
5060    Stanza status = st.getChildren("PERMISSION_STATUS").get(0);
5061    String username = status.getValue("USERNAME");
5062    String groupname = status.getValue("GROUPNAME");
5063    FsPermission mode = fsPermissionFromXml(status);
5064    return new PermissionStatus(username, groupname, mode);
5065  }
5066
5067  public static void fsPermissionToXml(ContentHandler contentHandler,
5068      FsPermission mode) throws SAXException {
5069    XMLUtils.addSaxString(contentHandler, "MODE",
5070        Short.toString(mode.toShort()));
5071  }
5072
5073  public static FsPermission fsPermissionFromXml(Stanza st)
5074      throws InvalidXmlException {
5075    short mode = Short.parseShort(st.getValue("MODE"));
5076    return new FsPermission(mode);
5077  }
5078
5079  private static void fsActionToXml(ContentHandler contentHandler, FsAction v)
5080      throws SAXException {
5081    XMLUtils.addSaxString(contentHandler, "PERM", v.SYMBOL);
5082  }
5083
5084  private static FsAction fsActionFromXml(Stanza st)
5085      throws InvalidXmlException {
5086    FsAction v = FSACTION_SYMBOL_MAP.get(st.getValue("PERM"));
5087    if (v == null)
5088      throw new InvalidXmlException("Invalid value for FsAction");
5089    return v;
5090  }
5091
5092  private static void appendAclEntriesToXml(ContentHandler contentHandler,
5093      List<AclEntry> aclEntries) throws SAXException {
5094    for (AclEntry e : aclEntries) {
5095      contentHandler.startElement("", "", "ENTRY", new AttributesImpl());
5096      XMLUtils.addSaxString(contentHandler, "SCOPE", e.getScope().name());
5097      XMLUtils.addSaxString(contentHandler, "TYPE", e.getType().name());
5098      if (e.getName() != null) {
5099        XMLUtils.addSaxString(contentHandler, "NAME", e.getName());
5100      }
5101      fsActionToXml(contentHandler, e.getPermission());
5102      contentHandler.endElement("", "", "ENTRY");
5103    }
5104  }
5105
5106  private static List<AclEntry> readAclEntriesFromXml(Stanza st) {
5107    List<AclEntry> aclEntries = Lists.newArrayList();
5108    if (!st.hasChildren("ENTRY"))
5109      return null;
5110
5111    List<Stanza> stanzas = st.getChildren("ENTRY");
5112    for (Stanza s : stanzas) {
5113      AclEntry e = new AclEntry.Builder()
5114        .setScope(AclEntryScope.valueOf(s.getValue("SCOPE")))
5115        .setType(AclEntryType.valueOf(s.getValue("TYPE")))
5116        .setName(s.getValueOrNull("NAME"))
5117        .setPermission(fsActionFromXml(s)).build();
5118      aclEntries.add(e);
5119    }
5120    return aclEntries;
5121  }
5122
5123  private static void appendXAttrsToXml(ContentHandler contentHandler,
5124      List<XAttr> xAttrs) throws SAXException {
5125    for (XAttr xAttr: xAttrs) {
5126      contentHandler.startElement("", "", "XATTR", new AttributesImpl());
5127      XMLUtils.addSaxString(contentHandler, "NAMESPACE",
5128          xAttr.getNameSpace().toString());
5129      XMLUtils.addSaxString(contentHandler, "NAME", xAttr.getName());
5130      if (xAttr.getValue() != null) {
5131        try {
5132          XMLUtils.addSaxString(contentHandler, "VALUE",
5133              XAttrCodec.encodeValue(xAttr.getValue(), XAttrCodec.HEX));
5134        } catch (IOException e) {
5135          throw new SAXException(e);
5136        }
5137      }
5138      contentHandler.endElement("", "", "XATTR");
5139    }
5140  }
5141
5142  private static List<XAttr> readXAttrsFromXml(Stanza st)
5143      throws InvalidXmlException {
5144    if (!st.hasChildren("XATTR")) {
5145      return null;
5146    }
5147
5148    List<Stanza> stanzas = st.getChildren("XATTR");
5149    List<XAttr> xattrs = Lists.newArrayListWithCapacity(stanzas.size());
5150    for (Stanza a: stanzas) {
5151      XAttr.Builder builder = new XAttr.Builder();
5152      builder.setNameSpace(XAttr.NameSpace.valueOf(a.getValue("NAMESPACE"))).
5153          setName(a.getValue("NAME"));
5154      String v = a.getValueOrNull("VALUE");
5155      if (v != null) {
5156        try {
5157          builder.setValue(XAttrCodec.decodeValue(v));
5158        } catch (IOException e) {
5159          throw new InvalidXmlException(e.toString());
5160        }
5161      }
5162      xattrs.add(builder.build());
5163    }
5164    return xattrs;
5165  }
5166}