分布式学习-限流算法

限流的作用

由于API接口无法控制调用方的行为,因此当遇到瞬时请求量激增时,会导致接口占用过多服务器资源,使得其他请求响应速度降低或是超时,更有甚者可能导致服务器宕机。

限流(Rate limiting)指对应用服务的请求进行限制,例如某一接口的请求限制为100个每秒,对超过限制的请求则进行快速失败或丢弃。

限流可以应对:

  • 热点业务带来的突发请求;
  • 调用方bug导致的突发请求;
  • 恶意攻击请求。

因此,对于公开的接口最好采取限流措施。

为什么要分布式限流

单点应用

但线上业务出于各种原因考虑,多是分布式系统,单节点的限流仅能保护自身节点,但无法保护应用依赖的各种服务,并且在进行节点扩容、缩容时也无法准确控制整个服务的请求限制。

集群应用

而如果实现了分布式限流,那么就可以方便地控制整个服务集群的请求限制,且由于整个集群的请求数量得到了限制,因此服务依赖的各种资源也得到了限流的保护。

分布式限流

限流算法分类

1、计数器算法(固定时间窗口);
2、滑动时间窗口;
3、令牌桶算法;
4、漏桶算法;

计数器算法

算法解读

计数器算法是限流算法里最简单也是最容易实现的一种算法。计数器就是统计记录单位时间内进入系统或者某一接口的请求次数,在限定的次数内的请求则正常接收处理,超过次数的请求则拒绝掉,或者改为异步处理。

比如规定,对于A接口1分钟的访问次数不能超过100个。那么可以这么做:在一开始的时候,设置一个计数器counter,每当一个请求过来的时候,counter就加1,如果counter的值大于100并且该请求与第一个请求的间隔时间还在1分钟之内,那么说明请求数过多;如果该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter,具体算法的示意图如下:

计数器算法

算法示例

使用 AomicInteger 来进行统计当前正在并发执行的次数,如果超过域值就直接拒绝请求,提示系统繁忙…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.springboot.whb.study.currentLimiting;

import com.google.common.base.Joiner;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.time.LocalDateTime;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author: whb
* @description: 计数器限流算法
* 使用 AomicInteger 来进行统计当前正在并发执行的次数,如果超过域值就直接拒绝请求,提示系统繁忙
*/
public class CounterLimiter {
/**
* 计数器限流算法(比较暴力/超出直接拒绝)
* Atomic,限制总数
*/
private static final AtomicInteger atomic = new AtomicInteger(0);
/**
* 线程池
*/
public static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join("client-thread-pool-", "%s")).build());

/**
* 限流
*/
private void atomicLimiter() {
// 最大支持 3 個
if (atomic.get() >= 3) {
System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() + " - " + "拒絕...");
} else {
try {
atomic.incrementAndGet();
//处理核心逻辑
System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() + " - " + "通过...");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
atomic.decrementAndGet();
}
}
}

/**
* 测试限流
*
* @throws InterruptedException
*/
public void testAtomicLimiter() throws InterruptedException {
for (int i = 0; i < 5; i++) {
threadPoolExecutor.execute(this::atomicLimiter);
}
TimeUnit.SECONDS.sleep(5);
}

/**
* main方法调用
*/
public static void main(String[] args) throws InterruptedException {
CounterLimiter counterLimiter = new CounterLimiter();
counterLimiter.testAtomicLimiter();
}
}

运行结果

计数器算法示例运行结果

计数器算法虽然简单,但比较粗放,存在一个十分致命的问题,那就是临界问题,如下图:

计数器临界问题

如上图所示,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求,可以瞬间超过我们的速率限制。用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。

其实这个问题的本质就是统计的精度太低,那如何降低临界问题的影响?看下面的滑动时间窗口算法。

滑动时间窗口

算法解读

滑动窗口,又称rolling window,跟TCP中的滑动窗口名称一致,但要区分开来。先用一张图解释下滑动时间窗口算法:

滑动时间窗口算法

如上图,整个红色的矩形框表示一个时间窗口,在计数器算法中,我们规定一个时间窗口就是一分钟。在滑动时间窗口算法中我们将滑动窗口 划成了6格,所以每格代表的是10秒钟。每过10秒钟,时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。

那么滑动窗口怎么解决刚才的临界问题的呢?看上图,0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在红色的格子中。当时间到达1:00时,窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触发了限流。

再回顾一下刚才的计数器算法,可以发现,计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。

由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

算法示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.springboot.whb.study.currentLimiting;

/**
* @author: whb
* @description: 滑动时间窗口限流算法
*/

import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;

public class TimeWindowLimited {
/**
* 缓存请求的队列
*/
private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<>();
/**
* 滑动时间窗口大小
*/
private int seconds;
/**
* 最大可接受请求
*/
private int max;

public TimeWindowLimited(int max, int timeWindowOfSeconds) {
this.seconds = timeWindowOfSeconds;
this.max = max;

new Thread(() -> {
for (; ; ) {
clean();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}

/**
* 获取令牌,并且添加时间
*/
public void take() {
long start = System.currentTimeMillis();
try {
int size = sizeOfValid();
if (size > max - 1) {
System.err.println(" 队列已满,队列容量:" + queue.size() + ",max:" + max + ",queue:" + printQueue());
throw new IllegalStateException("full");
}
synchronized (queue) {
if (sizeOfValid() > max - 1) {
System.err.println(" 队列已满: in synchronized,size:" + queue.size() + ",max:" + max + ",queue:" + printQueue());
throw new IllegalStateException("full");
}
this.queue.offer(System.currentTimeMillis());
}
System.out.println(" queue: d,size:" + queue.size() + ",max:" + max + ",queue:" + printQueue());
} finally {
System.out.println("耗时:" + (System.currentTimeMillis() - start) + " ms");
}
}

/**
* 打印队列内容
*
* @return
*/
private String printQueue() {
Iterator<Long> it = queue.iterator();
StringBuilder sb = new StringBuilder();
while (it.hasNext()) {
Long t = it.next();
sb.append(" ").append(t);
}
return sb.toString();
}

/**
* 有效请求
*
* @return
*/
public int sizeOfValid() {
Iterator<Long> it = queue.iterator();
Long ms = System.currentTimeMillis() - seconds * 1000;
int count = 0;
while (it.hasNext()) {
long t = it.next();
if (t > ms) {
count++;
}
}
return count;
}

/**
* 清理
*/
public void clean() {
Long c = System.currentTimeMillis() - seconds * 1000;
Long tl = null;
System.out.println("peek: " + queue.peek() + "c:" + c);
while ((tl = queue.peek()) != null && tl < c) {
System.out.println("peek: t:" + tl);
queue.poll();
}
}

public static void main(String[] args) {
final TimeWindowLimited timeWindow = new TimeWindowLimited(200, 2);
IntStream.range(0, 100).forEach((i) -> {
new Thread(() -> {
for (; ; ) {
System.out.println("before take i:" + i);
try {
Thread.sleep(new Random().nextInt(20) * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
timeWindow.take();
System.out.println("some option, i:" + i);
} catch (Exception e) {
System.err.println(" take i:" + i + ", encounter error:" + e.getMessage());
try {
Thread.sleep(10L);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
continue;
}
System.out.println("after take i:" + i);
}
}).start();
});
}
}

运行结果

滑动时间窗口算法示例运行结果

漏桶算法

算法解读

漏桶算法(Leaky Bucket):主要目的是控制数据注入到网络的速率,平滑网络上的突发流量,数据可以以任意速度流入到漏桶中。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。 漏桶可以看作是一个带有常量服务时间的单服务器队列,如果漏桶为空,则不需要流出水滴,如果漏桶(包缓存)溢出,那么水滴会被溢出丢弃。

用一个简单的例子描述就是注水漏水过程,往桶中以一定速率流出水,以任意速率流入水,当水超过桶流量则丢弃,因为桶容量是不变的,保证了整体的速率。

漏桶算法

算法示例

漏桶算法可以通过 信号量(Semaphore) 的方式实现,很好的达到削峰的目的,如下示例代码,队列中任务存活个数就如同是水桶最多能盛装的水量,当超出这个阀值就会丢弃任务….

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.springboot.whb.study.currentLimiting;

import com.google.common.base.Joiner;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.time.LocalDateTime;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* @author: whb
* @description: 漏桶限流算法
*/
public class LeakyBucket {
/**
* 信号量,用来达到削峰的目的,平滑流量
*/
private static final Semaphore semaphore = new Semaphore(3);
/**
* 线程池
*/
public static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join("client-thread-pool-", "%s")).build());

/**
* 限流算法
*/
private void semaphoreLimiter() {
// 队列中允许存活的任务个数不能超过 5 个
if (semaphore.getQueueLength() > 5) {
System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() + " - 拒絕...");
} else {
try {
semaphore.acquire();
System.out.println(LocalDateTime.now() + " - " + Thread.currentThread().getName() + " - 通过...");
//处理核心逻辑
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}

/**
* 测试限流算法
*
* @throws InterruptedException
*/
public void testSemaphore() throws InterruptedException {
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(this::semaphoreLimiter);
}
TimeUnit.SECONDS.sleep(5);
}

/**
* main方法中执行测试方法
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
LeakyBucket leakyBucket = new LeakyBucket();
leakyBucket.testSemaphore();
}
}

运行结果

漏桶限流算法示例运行结果

一共 10 个线程同时请求,初始信号量为3,表示最多可以同时处理 3 个任务,超出进入缓冲区排队等待,当缓冲区满了后则拒绝接收新的请求…

令牌桶算法

算法解读

令牌桶算法是比较常见的限流算法之一,大概描述如下:

  1. 所有的请求在处理之前都需要拿到一个可用的令牌才会被处理;
  2. 根据限流大小,设置按照一定的速率往桶里添加令牌;
  3. 桶设置最大的放置令牌限制,当桶满时、新添加的令牌就被丢弃或者拒绝;
  4. 请求达到后首先要获取令牌桶中的令牌,拿着令牌才可以进行其他的业务逻辑,处理完业务逻辑之后,将令牌直接删除;
  5. 令牌桶有最低限额,当桶中的令牌达到最低限额的时候,请求处理完之后将不会删除令牌,以此保证足够的限流;

令牌桶算法

算法示例

Google Guava中提供了一个RateLimiter 工具类,就是基于令牌桶算法实现平滑突发的限流策略.

令牌桶的好处是可以方便的改变速度,一旦需要提高速率,则按需提高放入桶中的令牌的速率。一般会定时(比如1000毫秒)往桶中增加一定数量的令牌, 有些变种算法则可以实时的计算应该增加的令牌的数量。

在示例代码中为每秒中产生 2 个令牌,意味着每500毫秒会产生一个令牌。

limiter.acquire(num) 表示消费多少个令牌。当桶中有足够的令牌时,则直接返回0,否则阻塞,直到有可用的令牌数才返回,返回的值为阻塞的时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.springboot.whb.study.currentLimiting;

import com.google.common.base.Joiner;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.time.LocalDateTime;
import java.util.concurrent.*;

/**
* @author: whb
* @description: 谷歌令牌桶算法测试
*/
public class TokenBucket {

/**
* 每秒生成2个令牌
*/
private static final RateLimiter limiter = RateLimiter.create(2);
/**
* 线程池
*/
public static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join("client-thread-pool-", "%s")).build());

/**
* 限流
*/
private void rateLimiter() {
// 默认就是 1
final double acquire = limiter.acquire(1);
System.out.println("当前时间 - " + LocalDateTime.now() + " - " + Thread.currentThread().getName() + " - 阻塞 - " + acquire + " 通过...");
}

/**
* 限流算法测试接口
*
* @throws InterruptedException
*/
public void testRateLimiter() throws InterruptedException {
for (int i = 0; i < 5; i++) {
threadPoolExecutor.execute(this::rateLimiter);
}
TimeUnit.SECONDS.sleep(5);
}

/**
* main方法中调用
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
TokenBucket rateLimiterTest = new TokenBucket();
rateLimiterTest.testRateLimiter();
}
}

运行结果

令牌桶算法示例运行结果

通过控制台的输出可以很直观的看出每个令牌产生时间间隔大约在 500 毫秒左右。

本文标题:分布式学习-限流算法

文章作者:王洪博

发布时间:2019年08月06日 - 11:08

最后更新:2019年09月12日 - 10:09

原始链接:http://whb1990.github.io/posts/6e0ef647.html

▄︻┻═┳一如果你喜欢这篇文章,请点击下方"打赏"按钮请我喝杯 ☕
0%