001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.util;
019
020import static org.apache.hadoop.util.Time.monotonicNow;
021
022/** 
023 * a class to throttle the data transfers.
024 * This class is thread safe. It can be shared by multiple threads.
025 * The parameter bandwidthPerSec specifies the total bandwidth shared by
026 * threads.
027 */
028public class DataTransferThrottler {
029  private final long period;          // period over which bw is imposed
030  private final long periodExtension; // Max period over which bw accumulates.
031  private long bytesPerPeriod;  // total number of bytes can be sent in each period
032  private long curPeriodStart;  // current period starting time
033  private long curReserve;      // remaining bytes can be sent in the period
034  private long bytesAlreadyUsed;
035
036  /** Constructor 
037   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
038   */
039  public DataTransferThrottler(long bandwidthPerSec) {
040    this(500, bandwidthPerSec);  // by default throttling period is 500ms 
041  }
042
043  /**
044   * Constructor
045   * @param period in milliseconds. Bandwidth is enforced over this
046   *        period.
047   * @param bandwidthPerSec bandwidth allowed in bytes per second. 
048   */
049  public DataTransferThrottler(long period, long bandwidthPerSec) {
050    this.curPeriodStart = monotonicNow();
051    this.period = period;
052    this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
053    this.periodExtension = period*3;
054  }
055
056  /**
057   * @return current throttle bandwidth in bytes per second.
058   */
059  public synchronized long getBandwidth() {
060    return bytesPerPeriod*1000/period;
061  }
062  
063  /**
064   * Sets throttle bandwidth. This takes affect latest by the end of current
065   * period.
066   */
067  public synchronized void setBandwidth(long bytesPerSecond) {
068    if ( bytesPerSecond <= 0 ) {
069      throw new IllegalArgumentException("" + bytesPerSecond);
070    }
071    bytesPerPeriod = bytesPerSecond*period/1000;
072  }
073  
074  /** Given the numOfBytes sent/received since last time throttle was called,
075   * make the current thread sleep if I/O rate is too fast
076   * compared to the given bandwidth.
077   *
078   * @param numOfBytes
079   *     number of bytes sent/received since last time throttle was called
080   */
081  public synchronized void throttle(long numOfBytes) {
082    throttle(numOfBytes, null);
083  }
084
085  /** Given the numOfBytes sent/received since last time throttle was called,
086   * make the current thread sleep if I/O rate is too fast
087   * compared to the given bandwidth.  Allows for optional external cancelation.
088   *
089   * @param numOfBytes
090   *     number of bytes sent/received since last time throttle was called
091   * @param canceler
092   *     optional canceler to check for abort of throttle
093   */
094  public synchronized void throttle(long numOfBytes, Canceler canceler) {
095    if ( numOfBytes <= 0 ) {
096      return;
097    }
098
099    curReserve -= numOfBytes;
100    bytesAlreadyUsed += numOfBytes;
101
102    while (curReserve <= 0) {
103      if (canceler != null && canceler.isCancelled()) {
104        return;
105      }
106      long now = monotonicNow();
107      long curPeriodEnd = curPeriodStart + period;
108
109      if ( now < curPeriodEnd ) {
110        // Wait for next period so that curReserve can be increased.
111        try {
112          wait( curPeriodEnd - now );
113        } catch (InterruptedException e) {
114          // Abort throttle and reset interrupted status to make sure other
115          // interrupt handling higher in the call stack executes.
116          Thread.currentThread().interrupt();
117          break;
118        }
119      } else if ( now <  (curPeriodStart + periodExtension)) {
120        curPeriodStart = curPeriodEnd;
121        curReserve += bytesPerPeriod;
122      } else {
123        // discard the prev period. Throttler might not have
124        // been used for a long time.
125        curPeriodStart = now;
126        curReserve = bytesPerPeriod - bytesAlreadyUsed;
127      }
128    }
129
130    bytesAlreadyUsed -= numOfBytes;
131  }
132}