Saturday, June 1, 2013

What do you understand by Token Bucket Algorithm. What are its applications ?

Token Bucket Algorithm


Token bucket algorithm is used to define the upper limits on bandwidth and burstiness on the data transmission in a software application. The token bucket algorithm is based on an analogy of a fixed capacity bucket into which tokens, normally representing a unit of bytes or a single packet of predetermined size, are added at a fixed rate.

Applications

1.) To provide download bandwidth limits in software applications like torrent & download managers.

2.) To control the download speed on 3G network by our cellular provider.


Implementation

Lets try to create an implementation for this algorithm. We will choose a Leaky Bucket Implementation, where a fixed amount of tokens are filled after a predefined interval into the bucket. If no one utilizes those token, then they do not get accumulated over time, they just over flow after the capacity of bucket is reached. Let's name this strategy as FixedIntervalRefillStrategy.


Our TokenBucket Class will have following properties


1. ) Refill Strategy

2. ) Maximum Capacity of Tokens - this is the maximum amount of tokens that a client can ask for, otherwise an exception is thrown.

3.) Size - it is the current size of the bucket which will keep on changing as it is refilled after specific interval and emptied by the clients.



TokenBucket's consume() method accepts the number of tokens to consume. This method will then remove those number of Tokens from the bucket, refilling the bucket if required. This method utilizes CAS (CompareAndSet) operation of AtomicLong to make the resize operation atomic so that no-locking is required. This will make the class thread-safe when multiple threads will simultaneously demand for the tokens.



public class TokenBucket {
    private final RefillStrategy refillStrategy;
    private final long capacity;
    private AtomicLong size;

    public TokenBucket(long capacity, RefillStrategy refillStrategy) {
        this.refillStrategy = refillStrategy;
        this.capacity = capacity;
        this.size = new AtomicLong(0L);
    }

    public void consume(long numTokens) throws InterruptedException {
        if (numTokens < 0)
            throw new RuntimeException("Number of tokens to consume must be positive");
        if (numTokens >= capacity)
            throw new RuntimeException("Number of tokens to consume must be less than the capacity of the bucket");

        long newTokens = Math.max(0, refillStrategy.refill());
        while (!Thread.currentThread().isInterrupted()) {
            long existingSize = size.get();
            long newValue = Math.max(0, Math.min(existingSize + newTokens, capacity));
            if (numTokens <= newValue) {
                newValue -= numTokens;
                if (size.compareAndSet(existingSize, newValue))
                    break;
            } else {
                Thread.sleep(refillStrategy.getIntervalInMillis());
                newTokens = Math.max(0, refillStrategy.refill());
            }
        }
    }


@Override
 public String toString() {
     return "Capacity : " + capacity + ", Size : " + size;
 }
}


public static interface RefillStrategy {
     long refill();
     long getIntervalInMillis();
 }


public final class TokenBuckets {

    private TokenBuckets() {}

    public static TokenBucket newFixedIntervalRefill(long capacityTokens, long refillTokens, long period, TimeUnit unit)
    {
        TokenBucket.RefillStrategy strategy = new FixedIntervalRefillStrategy(refillTokens, period, unit);
        return new TokenBucket(capacityTokens, strategy);
    }

}

public class FixedIntervalRefillStrategy implements TokenBucket.RefillStrategy {
    private final long numTokens;
    private final long intervalInMillis;
    private AtomicLong nextRefillTime;

   public FixedIntervalRefillStrategy(long numTokens, long interval, TimeUnit unit) {
        this.numTokens = numTokens;
        this.intervalInMillis = unit.toMillis(interval);
        this.nextRefillTime = new AtomicLong(-1L);
    }

    public long refill() {
        final long now = System.currentTimeMillis();
        final long refillTime = nextRefillTime.get();
        if (now < refillTime) {
            return 0;
        }

        return nextRefillTime.compareAndSet(refillTime, now + intervalInMillis) ? numTokens : 0;
    }

    public long getIntervalInMillis() {
        return intervalInMillis;
    }

}

API Client User

TokenBucket bucket = TokenBuckets.newFixedIntervalRefill(1024 * 10, speedLimitKBps, 1, TimeUnit.SECONDS); 


 
References
http://en.wikipedia.org/wiki/Token_bucket

No comments:

Post a Comment

Your comment will be published after review from moderator