池技术实现-commons-pool2

2020-07-06
pool 笔记

核心接口

PooledObjectFactory

顾名思义,这个接口是一个工厂,用于创建要池化的对象。在使用的时候一般都不直接实现,而是去继承它默认抽象实现类BasePooledObjectFactory

只需要实现这个抽象类中的两个抽象方法即可:

1
2
public abstract T create() throws Exception;
public abstract PooledObject<T> wrap(T obj);

ObjectPool

这个接口为对象池的抽象定义,定义了很多对池的操作方法。如addObject等。可以简单理解为一个装对象的容器像list之类的数据结构。默认的抽象实现为BaseGenericObjectPool具体实现有好几个,最常见的为GenericObjectPool.

GenericObjectPoolConfig

配置类对象,就是一个简单的实体,封装很多配置参数而已。

实现细节

先针对最基本的借出对象的操作实现

“借对象”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public T borrowObject() throws Exception {
return borrowObject(getMaxWaitMillis());
}
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
assertOpen();
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
(getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3) ) {
removeAbandoned(ac);
}
PooledObject<T> p = null;
// Get local copy of current config so it is consistent for entire
// method execution
final boolean blockWhenExhausted = getBlockWhenExhausted();
boolean create;
final long waitTime = System.currentTimeMillis();
while (p == null) {
create = false;
p = idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0) {
p = idleObjects.takeFirst();
} else {
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object");
}
} else {
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
}
if (!p.allocate()) {
p = null;
}
if (p != null) {
try {
factory.activateObject(p);
} catch (final Exception e) {
try {
destroy(p);
} catch (final Exception e1) {
// Ignore - activation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to activate object");
nsee.initCause(e);
throw nsee;
}
}
if (p != null && getTestOnBorrow()) {
boolean validate = false;
Throwable validationThrowable = null;
try {
validate = factory.validateObject(p);
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
if (!validate) {
try {
destroy(p);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (final Exception e) {
// Ignore - validation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
return p.getObject();
}

如果没有给定borrowMaxWaitMillis参数,那就使用默认的最大等待时间(仅仅当getBlockWhenExhausted为true的时候),超出这个时间就直接抛异常。

首先需要根据配置参数中的getRemoveAbandonedOnBorrow来决定是不是在每次“借”的时候抛弃掉那些快“过气”的对象。当然还是得有条件的:闲置的对象数量小于2,且当前活跃的对象数量比最大数量的差值小于3,翻译成人话就是池子快装满了呗。

“借对象”之前的判断

看看移除掉“过气”对象的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void removeAbandoned(final AbandonedConfig ac) {
// Generate a list of abandoned objects to remove
final long now = System.currentTimeMillis();
final long timeout =
now - (ac.getRemoveAbandonedTimeout() * 1000L);
final ArrayList<PooledObject<T>> remove = new ArrayList<>();
final Iterator<PooledObject<T>> it = allObjects.values().iterator();
while (it.hasNext()) {
final PooledObject<T> pooledObject = it.next();
synchronized (pooledObject) {
// 上次使用的时间在五分钟之前 说明需要抛弃
if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
pooledObject.getLastUsedTime() <= timeout) {
pooledObject.markAbandoned();
remove.add(pooledObject);
}
}
}
// Now remove the abandoned objects
final Iterator<PooledObject<T>> itr = remove.iterator();
while (itr.hasNext()) {
final PooledObject<T> pooledObject = itr.next();
if (ac.getLogAbandoned()) {
pooledObject.printStackTrace(ac.getLogWriter());
}
try {
invalidateObject(pooledObject.getObject());
} catch (final Exception e) {
e.printStackTrace();
}
}
}

其中有个配置参数getRemoveAbandonedTimeout用于判断对象是否“过气”,这个值默认是30分钟。在对象池中使用allObjects变量来保存所有的被“池化”的对象,这是个map类型的。

1
2
3
4
5
6
7
8
9
/*
* All of the objects currently associated with this pool in any state. It
* excludes objects that have been destroyed. The size of
* {@link #allObjects} will always be less than or equal to {@link
* #_maxActive}. Map keys are pooled objects, values are the PooledObject
* wrappers used internally by the pool.
*/
private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects =
new ConcurrentHashMap<>();

然而,并不是将我们要创建的对象直接保存到这个map中,而是将对象包装成PooledObject放进去的,key为对象的hash值。这个对象用于池子的内部处理,比如加几个属性什么的。这种设计在很多库或者框架中很常见。为了将复杂性不直接暴露给用户,因此重新包装了一下。

这里的逻辑是将所有被“池化”的对象遍历出来,判断如果使用时间超过给定的时间,那就说明这个对象“过气”了,得移除掉。使用invalidateObject方法来销毁这个对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void invalidateObject(final T obj) throws Exception {
final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
if (p == null) {
if (isAbandonedConfig()) {
return;
}
throw new IllegalStateException(
"Invalidated object not currently part of this pool");
}
synchronized (p) {
if (p.getState() != PooledObjectState.INVALID) {
destroy(p);
}
}
ensureIdle(1, false);
}
private void destroy(final PooledObject<T> toDestroy) throws Exception {
toDestroy.invalidate();
idleObjects.remove(toDestroy);
allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));
try {
factory.destroyObject(toDestroy);
} finally {
destroyedCount.incrementAndGet();
createCount.decrementAndGet();
}
}

销毁逻辑很简单,从空闲对象列表中移除,在allObjects中移除,然后再更新次数,这些数据仅仅是用于对象池数据统计。

最核心的方法还是ensureIdle方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void ensureIdle(final int idleCount, final boolean always) throws Exception {
if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {
return;
}
while (idleObjects.size() < idleCount) {
final PooledObject<T> p = create();
if (p == null) {
// Can't create objects, no reason to think another call to
// create will work. Give up.
break;
}
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
}
if (isClosed()) {
// Pool closed while object was being added to idle objects.
// Make sure the returned object is destroyed rather than left
// in the idle object pool (which would effectively be a leak)
clear();
}
}

顾名思义,这个方法保证对象池中有指定的空闲对象能用。不然你来借的时候对象池都空了,那多尴尬。

如果当前池中的空闲对象小于指定的值,那么就得去创建对象了,这里使用的create()方法就是在前文中说到的对象工厂抽象实现类中的create()做一样的事情,只不过这个方法创建的是一个PooledObject包装类型,以及一些其他的属性。然后将这个新创建的对象装到idleObjects容器中,这个容器类型为LinkedBlockingDeque<PooledObject<T>>链表结构,是这个库自己实现的。

“借对象”的细节

之前说到getBlockWhenExhausted属性用于判断当“借”的时候没有是否需要等待。如果给定超时时间那就按照超时时间去等,没等到那就返回空对象,如果没给超时时间,那就死等。

这个属性最终决定如果池子里没对象最终抛出的异常,如果这个属性为false,那就直接抛Pool exhausted异常,如果为true那就抛出"Timeout waiting for idle object"异常(给了超时时间)。

首先得从空闲的容器中取“空闲对象”,接着判断是否是有效的。然后使用之前提到的工厂方法“激活”这个对象,默认实现中是空实现,在使用过程中可以去重写这个方法实现自定义的需求。
updateStatsBorrow方法用来更新对象池的统计数据信息,不需要太关注。最后就直接返回我们需要的对象。

如果空闲的容器中没得可用的对象咋办?那只能去create一个了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
private PooledObject<T> create() throws Exception {
int localMaxTotal = getMaxTotal();
// This simplifies the code later in this method
if (localMaxTotal < 0) {
localMaxTotal = Integer.MAX_VALUE;
}
final long localStartTimeMillis = System.currentTimeMillis();
final long localMaxWaitTimeMillis = Math.max(getMaxWaitMillis(), 0);
// Flag that indicates if create should:
// - TRUE: call the factory to create an object
// - FALSE: return null
// - null: loop and re-test the condition that determines whether to
// call the factory
Boolean create = null;
while (create == null) {
synchronized (makeObjectCountLock) {
final long newCreateCount = createCount.incrementAndGet();
if (newCreateCount > localMaxTotal) {
// The pool is currently at capacity or in the process of
// making enough new objects to take it to capacity.
createCount.decrementAndGet();
if (makeObjectCount == 0) {
// There are no makeObject() calls in progress so the
// pool is at capacity. Do not attempt to create a new
// object. Return and wait for an object to be returned
create = Boolean.FALSE;
} else {
// There are makeObject() calls in progress that might
// bring the pool to capacity. Those calls might also
// fail so wait until they complete and then re-test if
// the pool is at capacity or not.
makeObjectCountLock.wait(localMaxWaitTimeMillis);
}
} else {
// The pool is not at capacity. Create a new object.
makeObjectCount++;
create = Boolean.TRUE;
}
}
// Do not block more if maxWaitTimeMillis is set.
if (create == null &&
(localMaxWaitTimeMillis > 0 &&
System.currentTimeMillis() - localStartTimeMillis >= localMaxWaitTimeMillis)) {
create = Boolean.FALSE;
}
}
if (!create.booleanValue()) {
return null;
}
final PooledObject<T> p;
try {
p = factory.makeObject();
if (getTestOnCreate() && !factory.validateObject(p)) {
createCount.decrementAndGet();
return null;
}
} catch (final Throwable e) {
createCount.decrementAndGet();
throw e;
} finally {
synchronized (makeObjectCountLock) {
makeObjectCount--;
makeObjectCountLock.notifyAll();
}
}
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getLogAbandoned()) {
p.setLogAbandoned(true);
p.setRequireFullStackTrace(ac.getRequireFullStackTrace());
}
createdCount.incrementAndGet();
allObjects.put(new IdentityWrapper<>(p.getObject()), p);
return p;
}

这个逻辑很简单,需要注意的是其中的while循环,如果池中正在创建的对象超过了最大数量,那就直接返回空。

至此,“借”对象的整个过程就结束了。

驱逐逻辑

在配置参数中有个属性timeBetweenEvictionRunsMillis,如果这个值为负数,就不会去运行驱逐线程,否则以配置的值作为时间间隔去扫描池中的对象,驱逐出“过气”对象。
具体的驱逐逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public void evict() throws Exception {
assertOpen();
if (idleObjects.size() > 0) {
PooledObject<T> underTest = null;
final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();
synchronized (evictionLock) {
final EvictionConfig evictionConfig = new EvictionConfig(
getMinEvictableIdleTimeMillis(),
getSoftMinEvictableIdleTimeMillis(),
getMinIdle());
final boolean testWhileIdle = getTestWhileIdle();
for (int i = 0, m = getNumTests(); i < m; i++) {
if (evictionIterator == null || !evictionIterator.hasNext()) {
evictionIterator = new EvictionIterator(idleObjects);
}
if (!evictionIterator.hasNext()) {
// Pool exhausted, nothing to do here
return;
}
try {
underTest = evictionIterator.next();
} catch (final NoSuchElementException nsee) {
// Object was borrowed in another thread
// Don't count this as an eviction test so reduce i;
i--;
evictionIterator = null;
continue;
}
if (!underTest.startEvictionTest()) {
// Object was borrowed in another thread
// Don't count this as an eviction test so reduce i;
i--;
continue;
}
// User provided eviction policy could throw all sorts of
// crazy exceptions. Protect against such an exception
// killing the eviction thread.
boolean evict;
try {
evict = evictionPolicy.evict(evictionConfig, underTest,
idleObjects.size());
} catch (final Throwable t) {
// Slightly convoluted as SwallowedExceptionListener
// uses Exception rather than Throwable
PoolUtils.checkRethrow(t);
swallowException(new Exception(t));
// Don't evict on error conditions
evict = false;
}
if (evict) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
} else {
if (testWhileIdle) {
boolean active = false;
try {
factory.activateObject(underTest);
active = true;
} catch (final Exception e) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
if (active) {
if (!factory.validateObject(underTest)) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
} else {
try {
factory.passivateObject(underTest);
} catch (final Exception e) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
}
}
}
if (!underTest.endEvictionTest(idleObjects)) {
// TODO - May need to add code here once additional
// states are used
}
}
}
}
}
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
removeAbandoned(ac);
}
}

EvictionPolicy为驱逐策略接口,用于指定使用哪种策略把过气对象清理出对象池。默认实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class DefaultEvictionPolicy<T> implements EvictionPolicy<T> {
@Override
public boolean evict(final EvictionConfig config, final PooledObject<T> underTest,
final int idleCount) {
if ((config.getIdleSoftEvictTime() < underTest.getIdleTimeMillis() &&
config.getMinIdle() < idleCount) ||
config.getIdleEvictTime() < underTest.getIdleTimeMillis()) {
return true;
}
return false;
}
}

其策略为

  • 对象池中某个对象的空闲的时间长于getMinEvictableIdleTimeMillis
  • 在对象池中的空闲对象有超过getMinIdle且某个对象的空闲时间长于getSoftMinEvictableIdleTimeMillis

满足其中之一即返回真,执行驱逐操作。

而这个线程每次只会扫描getNumTests数量的对象。使用EvictionIterator将池中所有空闲对象包装起来,其实这里使用的是迭代器模式(处处都体现设计模式呀)。如果没取到,那就将这个空对象跳过去,反正必须得取getNumTests这么多个,空的不算数。最后就是使用destroy方法将其销毁掉了。

总结

本文介绍了commons-pool几个核心接口以及相关配置参数和代表的含义,针对核心API的实现做了源码解读。总的来说这个工具的实现其实很简单,源码读起来没有太大难度。当然除了文中提到的默认对象池实现之外,commons-pool还提供基于key的对象池KeyedObjectPool以及基于代理的对象池ProxiedObjectPool的实现。由于暂时没找到使用的场景,就没有继续深入解读。

看了对象池的实现后,觉得这些看似“高大上”的东西也不过如此。接着尝试去了解一下更加专业的“池”–连接池。


留言: