001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.util; 019 020import java.nio.channels.AsynchronousCloseException; 021import java.nio.channels.ClosedChannelException; 022import java.util.concurrent.atomic.AtomicInteger; 023 024import com.google.common.base.Preconditions; 025 026/** 027 * A closeable object that maintains a reference count. 028 * 029 * Once the object is closed, attempting to take a new reference will throw 030 * ClosedChannelException. 031 */ 032public class CloseableReferenceCount { 033 /** 034 * Bit mask representing a closed domain socket. 035 */ 036 private static final int STATUS_CLOSED_MASK = 1 << 30; 037 038 /** 039 * The status bits. 040 * 041 * Bit 30: 0 = open, 1 = closed. 042 * Bits 29 to 0: the reference count. 043 */ 044 private final AtomicInteger status = new AtomicInteger(0); 045 046 public CloseableReferenceCount() { } 047 048 /** 049 * Increment the reference count. 050 * 051 * @throws ClosedChannelException If the status is closed. 052 */ 053 public void reference() throws ClosedChannelException { 054 int curBits = status.incrementAndGet(); 055 if ((curBits & STATUS_CLOSED_MASK) != 0) { 056 status.decrementAndGet(); 057 throw new ClosedChannelException(); 058 } 059 } 060 061 /** 062 * Decrement the reference count. 063 * 064 * @return True if the object is closed and has no outstanding 065 * references. 066 */ 067 public boolean unreference() { 068 int newVal = status.decrementAndGet(); 069 Preconditions.checkState(newVal != 0xffffffff, 070 "called unreference when the reference count was already at 0."); 071 return newVal == STATUS_CLOSED_MASK; 072 } 073 074 /** 075 * Decrement the reference count, checking to make sure that the 076 * CloseableReferenceCount is not closed. 077 * 078 * @throws AsynchronousCloseException If the status is closed. 079 */ 080 public void unreferenceCheckClosed() throws ClosedChannelException { 081 int newVal = status.decrementAndGet(); 082 if ((newVal & STATUS_CLOSED_MASK) != 0) { 083 throw new AsynchronousCloseException(); 084 } 085 } 086 087 /** 088 * Return true if the status is currently open. 089 * 090 * @return True if the status is currently open. 091 */ 092 public boolean isOpen() { 093 return ((status.get() & STATUS_CLOSED_MASK) == 0); 094 } 095 096 /** 097 * Mark the status as closed. 098 * 099 * Once the status is closed, it cannot be reopened. 100 * 101 * @return The current reference count. 102 * @throws ClosedChannelException If someone else closes the object 103 * before we do. 104 */ 105 public int setClosed() throws ClosedChannelException { 106 while (true) { 107 int curBits = status.get(); 108 if ((curBits & STATUS_CLOSED_MASK) != 0) { 109 throw new ClosedChannelException(); 110 } 111 if (status.compareAndSet(curBits, curBits | STATUS_CLOSED_MASK)) { 112 return curBits & (~STATUS_CLOSED_MASK); 113 } 114 } 115 } 116 117 /** 118 * Get the current reference count. 119 * 120 * @return The current reference count. 121 */ 122 public int getReferenceCount() { 123 return status.get() & (~STATUS_CLOSED_MASK); 124 } 125}