001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.util; 019 020import static org.apache.hadoop.util.Time.monotonicNow; 021 022/** 023 * a class to throttle the data transfers. 024 * This class is thread safe. It can be shared by multiple threads. 025 * The parameter bandwidthPerSec specifies the total bandwidth shared by 026 * threads. 027 */ 028public class DataTransferThrottler { 029 private final long period; // period over which bw is imposed 030 private final long periodExtension; // Max period over which bw accumulates. 031 private long bytesPerPeriod; // total number of bytes can be sent in each period 032 private long curPeriodStart; // current period starting time 033 private long curReserve; // remaining bytes can be sent in the period 034 private long bytesAlreadyUsed; 035 036 /** Constructor 037 * @param bandwidthPerSec bandwidth allowed in bytes per second. 038 */ 039 public DataTransferThrottler(long bandwidthPerSec) { 040 this(500, bandwidthPerSec); // by default throttling period is 500ms 041 } 042 043 /** 044 * Constructor 045 * @param period in milliseconds. Bandwidth is enforced over this 046 * period. 047 * @param bandwidthPerSec bandwidth allowed in bytes per second. 048 */ 049 public DataTransferThrottler(long period, long bandwidthPerSec) { 050 this.curPeriodStart = monotonicNow(); 051 this.period = period; 052 this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000; 053 this.periodExtension = period*3; 054 } 055 056 /** 057 * @return current throttle bandwidth in bytes per second. 058 */ 059 public synchronized long getBandwidth() { 060 return bytesPerPeriod*1000/period; 061 } 062 063 /** 064 * Sets throttle bandwidth. This takes affect latest by the end of current 065 * period. 066 */ 067 public synchronized void setBandwidth(long bytesPerSecond) { 068 if ( bytesPerSecond <= 0 ) { 069 throw new IllegalArgumentException("" + bytesPerSecond); 070 } 071 bytesPerPeriod = bytesPerSecond*period/1000; 072 } 073 074 /** Given the numOfBytes sent/received since last time throttle was called, 075 * make the current thread sleep if I/O rate is too fast 076 * compared to the given bandwidth. 077 * 078 * @param numOfBytes 079 * number of bytes sent/received since last time throttle was called 080 */ 081 public synchronized void throttle(long numOfBytes) { 082 throttle(numOfBytes, null); 083 } 084 085 /** Given the numOfBytes sent/received since last time throttle was called, 086 * make the current thread sleep if I/O rate is too fast 087 * compared to the given bandwidth. Allows for optional external cancelation. 088 * 089 * @param numOfBytes 090 * number of bytes sent/received since last time throttle was called 091 * @param canceler 092 * optional canceler to check for abort of throttle 093 */ 094 public synchronized void throttle(long numOfBytes, Canceler canceler) { 095 if ( numOfBytes <= 0 ) { 096 return; 097 } 098 099 curReserve -= numOfBytes; 100 bytesAlreadyUsed += numOfBytes; 101 102 while (curReserve <= 0) { 103 if (canceler != null && canceler.isCancelled()) { 104 return; 105 } 106 long now = monotonicNow(); 107 long curPeriodEnd = curPeriodStart + period; 108 109 if ( now < curPeriodEnd ) { 110 // Wait for next period so that curReserve can be increased. 111 try { 112 wait( curPeriodEnd - now ); 113 } catch (InterruptedException e) { 114 // Abort throttle and reset interrupted status to make sure other 115 // interrupt handling higher in the call stack executes. 116 Thread.currentThread().interrupt(); 117 break; 118 } 119 } else if ( now < (curPeriodStart + periodExtension)) { 120 curPeriodStart = curPeriodEnd; 121 curReserve += bytesPerPeriod; 122 } else { 123 // discard the prev period. Throttler might not have 124 // been used for a long time. 125 curPeriodStart = now; 126 curReserve = bytesPerPeriod - bytesAlreadyUsed; 127 } 128 } 129 130 bytesAlreadyUsed -= numOfBytes; 131 } 132}