您的位置:  首页 > 技术 > 中间件 > 正文

jedis 连接池 jedisPool 的实现原理浅析

2022-04-27 17:00 https://my.oschina.net/klblog/blog/5520235 KL博主 次阅读 条评论

 前言

jedisPool 是基于 Apache-commons-pool2 的抽象对象池(objectPool)技术实现的,在 jedisPool 中,一个连接即一个 jedisObject 对象实例。jedisPool 的核心也就是:根据设置的 poolConfig ,围绕连接池容器 LinkedBlockingDeque ,在相关动作中,做增删的逻辑。所以下面主要阐述关键配置和关键方法的行为,基本可以窥探 jedisPool 的实现全貌。

使用示例

/**

 * @author : kl (http://kailing.pub)

 * @since : 2022-04-26 12:37

 */

public class KLMain1 {

    public static void main(String[] args) {

        JedisPoolConfig config = new JedisPoolConfig();

        config.setMinIdle(10);



        JedisPool pool = new JedisPool(config,Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT);

        try (Jedis jedis = pool.getResource()) {

            jedis.set("foo", "kl");

            System.out.println(jedis.get("foo"));

        }

    }

}

关键 config 概述

jedis 连接池配置类为 GenericObjectPoolConfig ,以及子类 JedisPoolConfig,一般用子类 JedisPoolConfig 构造连接池参数,它会默认初始化管理 redis 连接生命周期的一些配置。

池子容量

控制池子大小,以及配置获取连接时的策略

名称
默认值
     
概要说明
maxTotal8最大连接数
maxIdle8最大空闲连接数
minIdle0最小空闲连接数

lifo

true归还连接时的策略,默认放回队列头部
blockWhenExhaustedtrue获取连接时,当连接池空了,是否阻塞等待连接
maxWaitDuration-1只有 blockWhenExhausted = true 时才生效,表示阻塞等待多久。-1 表示一直阻塞,直到获取连接
testOnCreatefalse创建连接时是否检测连接有效性,连接无效返回 null
testOnBorrowfalse租借连接时是否检测连接有效性,无效连接会直接销魂,并尝试继续获取
testOnReturnfalse归还连接时是否检测连接有效性,无效链接会直接销毁,并创建新的连接归还

生命周期

控制是否启用名称为 commons-pool-evictor 的线程,用于空闲连接、无效连接的检测。以及配置连接检测的策略

名称
GenericObjectPoolConfig默认值
jedisPoolConfig 默认值
概要说明

testWhileIdle

falsetrue开启驱逐线时,是否检测无效链接

durationBetweenEvictionRuns

-130s驱逐线程每次检测时间间隔,单位毫秒。-1表示不开启驱逐线程

numTestsPerEvictionRun

3-1每次检测多少链接,-1 表示检测所有

minEvictableIdleDuration

30min60s驱逐连接时,连接最小空闲存活时间

softMinEvictableIdleDuration

-1-1软驱逐时最小空闲存活时间,-1 表示 intMax

关键逻辑

下面代码都在 GenericObjectPool 类中,这里例举关键的方法定义,以及每个方法直接使用到的配置项,看完这部分能够知道平时配置的参数到底是怎么工作的。以下贴的源码因篇幅原因,部分源码有所删减,只保留并突出关键逻辑部分。

方法定义
使用到的配置项
概要说明
PooledObject<T> create()maxTotal、testOnCreate创建连接
T borrowObject(final long borrowMaxWaitMillis)blockWhenExhausted、maxWaitDuration、testOnBorrow租借连接
void returnObject(final T obj)maxIdle、lifo、testOnReturn返还连接
void evict()testWhileIdle、durationBetweenEvictionRuns、numTestsPerEvictionRun、minEvictableIdleDuration、softMinEvictableIdleDuration、minIdle驱逐连接
void ensureMinIdle()lifo、minIdle保持最小空闲连接
void addObject()lifo添加一个连接到连接池

create

    private PooledObject<T> create() throws Exception {

        int localMaxTotal = getMaxTotal();

        if (localMaxTotal < 0) {

            localMaxTotal = Integer.MAX_VALUE;

        }

        Boolean create = null;

        while (create == null) {

            synchronized (makeObjectCountLock) {

                final long newCreateCount = createCount.incrementAndGet();

                if (newCreateCount > localMaxTotal) {

                    // 池当前处于满负荷状态或正在制造足够的新对象以使其达到满负荷状态。

                    createCount.decrementAndGet();

                    if (makeObjectCount == 0) {

                        // 没有正在进行的 makeObject() 调用,因此池已满。不要尝试创建新对象。

                        create = Boolean.FALSE;

                    } else {

                        // 正在进行的 makeObject() 调用可能会使池达到容量。这些调用也可能会失败,因此请等待它们完成,然后重新测试池是否已满负荷

                        makeObjectCountLock.wait(localMaxWaitTimeMillis);

                    }

                } else {

                    // The pool is not at capacity. Create a new object.

                    makeObjectCount++;

                    create = Boolean.TRUE;

                }

            }



            // Do not block more if maxWaitTimeMillis is set.

            if (create == null &&

                    (localMaxWaitTimeMillis > 0 &&

                            System.currentTimeMillis() - localStartTimeMillis >= localMaxWaitTimeMillis)) {

                create = Boolean.FALSE;

            }

        }



        if (!create.booleanValue()) {

            return null;

        }



        final PooledObject<T> p;

        try {

            p = factory.makeObject();

            if (getTestOnCreate() && !factory.validateObject(p)) {

                createCount.decrementAndGet();

                return null;

            }

        }

        //...省略

        return p;

    }

创建连接是一个最底层的方法,在租借连接、归还连接、保持空闲连接都会用到。创建连接前,会判断当前连接是否到达 maxTotal ,如果到达 maxTotal 并且此时没有正在创建连接,则表示池子已满,返回 null。如果成功创建了连接,则根据 testOnCreate 的配置,如果 testOnCreate= true 则断连接是否可用,如果连接无效,依旧返回 null。

borrowObject

   public T borrowObject(final Duration borrowMaxWaitDuration) throws Exception {

        assertOpen();



        final AbandonedConfig ac = this.abandonedConfig;

        if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) &&

                (getNumActive() > getMaxTotal() - 3)) {

            removeAbandoned(ac);

        }



        PooledObject<T> p = null;



        // Get local copy of current config so it is consistent for entire

        // method execution

        final boolean blockWhenExhausted = getBlockWhenExhausted();



        boolean create;

        final long waitTimeMillis = System.currentTimeMillis();



        while (p == null) {

            create = false;

            p = idleObjects.pollFirst();

            if (p == null) {

                p = create();

                if (p != null) {

                    create = true;

                }

            }

            if (blockWhenExhausted) {

                if (p == null) {

                    if (borrowMaxWaitDuration.isNegative()) {

                        p = idleObjects.takeFirst();

                    } else {

                        p = idleObjects.pollFirst(borrowMaxWaitDuration);

                    }

                }

                if (p == null) {

                    throw new NoSuchElementException(appendStats(

                            "Timeout waiting for idle object, borrowMaxWaitDuration=" + borrowMaxWaitDuration));

                }

            } else if (p == null) {

                throw new NoSuchElementException(appendStats("Pool exhausted"));

            }

            if (!p.allocate()) {

                p = null;

            }



            if (p != null) {

                try {

                    factory.activateObject(p);

                } catch (final Exception e) {

                    try {

                        destroy(p, DestroyMode.NORMAL);

                    } catch (final Exception e1) {

                        // Ignore - activation failure is more important

                    }

                    p = null;

                    if (create) {

                        final NoSuchElementException nsee = new NoSuchElementException(

                                appendStats("Unable to activate object"));

                        nsee.initCause(e);

                        throw nsee;

                    }

                }

                if (p != null && getTestOnBorrow()) {

                    boolean validate = false;

                    Throwable validationThrowable = null;

                    try {

                        validate = factory.validateObject(p);

                    } catch (final Throwable t) {

                        PoolUtils.checkRethrow(t);

                        validationThrowable = t;

                    }

                    if (!validate) {

                        try {

                            destroy(p, DestroyMode.NORMAL);

                            destroyedByBorrowValidationCount.incrementAndGet();

                        } catch (final Exception e) {

                            // Ignore - validation failure is more important

                        }

                        p = null;

                        if (create) {

                            final NoSuchElementException nsee = new NoSuchElementException(

                                    appendStats("Unable to validate object"));

                            nsee.initCause(validationThrowable);

                            throw nsee;

                        }

                    }

                }

            }

        }



        updateStatsBorrow(p, Duration.ofMillis(System.currentTimeMillis() - waitTimeMillis));



        return p.getObject();

    }

租借连接是从连接池容器链表的头部获取连接,如果为 null ,则调用 create() 创建连接,如果还是 null ,则根据 blockWhenExhausted=true 的配置决定是否等待连接, maxWaitDuration=-1 则一直等待,否则等待 maxWaitDuration 的时间,等待完还没获取到连接,则抛 【Timeout waiting for idle object】异常。如果 blockWhenExhausted=false ,当连接池无可用连接时立马抛连接池已满【Pool exhausted】的异常。成功拿到连接后,如果 testOnBorrow=true ,则继续判断连接是否有效,无效链接直接销毁,并重复上面的操作直到获取可用的连接。

returnObject

   public void returnObject(final T obj) {

        final PooledObject<T> p = getPooledObject(obj);



        if (p == null) {

            if (!isAbandonedConfig()) {

                throw new IllegalStateException(

                        "Returned object not currently part of this pool");

            }

            return; // Object was abandoned and removed

        }



        markReturningState(p);



        final Duration activeTime = p.getActiveDuration();



        if (getTestOnReturn() && !factory.validateObject(p)) {

            try {

                destroy(p, DestroyMode.NORMAL);

            } catch (final Exception e) {

                swallowException(e);

            }

            try {

                ensureIdle(1, false);

            } catch (final Exception e) {

                swallowException(e);

            }

            updateStatsReturn(activeTime);

            return;

        }



        try {

            factory.passivateObject(p);

        } catch (final Exception e1) {

            swallowException(e1);

            try {

                destroy(p, DestroyMode.NORMAL);

            } catch (final Exception e) {

                swallowException(e);

            }

            try {

                ensureIdle(1, false);

            } catch (final Exception e) {

                swallowException(e);

            }

            updateStatsReturn(activeTime);

            return;

        }



        if (!p.deallocate()) {

            throw new IllegalStateException(

                    "Object has already been returned to this pool or is invalid");

        }



        final int maxIdleSave = getMaxIdle();

        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {

            try {

                destroy(p, DestroyMode.NORMAL);

            } catch (final Exception e) {

                swallowException(e);

            }

            try {

                ensureIdle(1, false);

            } catch (final Exception e) {

                swallowException(e);

            }

        } else {

            if (getLifo()) {

                idleObjects.addFirst(p);

            } else {

                idleObjects.addLast(p);

            }

            if (isClosed()) {

                // Pool closed while object was being added to idle objects.

                // Make sure the returned object is destroyed rather than left

                // in the idle object pool (which would effectively be a leak)

                clear();

            }

        }

        updateStatsReturn(activeTime);

    }

归还连接是把连接对象重新放回连接池容器。归还连接时,如果 testOnReturn=true ,则校验连接有效性,无效链接直接销毁并确保至少有一个空闲连接。继而判断 maxIdle 是否已满,已满则销毁当前连接,并确保至少有一个空闲连接,否则根据 lifo=true 将连接归还到链表的头部,否则归还到链表的尾部。这里用 lifo 来控制租借的连接是最新的连接还是最旧的连接,这里一般默认拿最新的连接就好。

evict

    public void evict() throws Exception {

        assertOpen();



        if (!idleObjects.isEmpty()) {



            PooledObject<T> underTest = null;

            final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();



            synchronized (evictionLock) {

                final EvictionConfig evictionConfig = new EvictionConfig(

                        getMinEvictableIdleDuration(),

                        getSoftMinEvictableIdleDuration(),

                        getMinIdle());



                final boolean testWhileIdle = getTestWhileIdle();



                for (int i = 0, m = getNumTests(); i < m; i++) {

                    if (evictionIterator == null || !evictionIterator.hasNext()) {

                        evictionIterator = new EvictionIterator(idleObjects);

                    }

                    if (!evictionIterator.hasNext()) {

                        // Pool exhausted, nothing to do here

                        return;

                    }



                    try {

                        underTest = evictionIterator.next();

                    } catch (final NoSuchElementException nsee) {

                        // Object was borrowed in another thread

                        // Don't count this as an eviction test so reduce i;

                        i--;

                        evictionIterator = null;

                        continue;

                    }



                    if (!underTest.startEvictionTest()) {

                        // Object was borrowed in another thread

                        // Don't count this as an eviction test so reduce i;

                        i--;

                        continue;

                    }



                    // User provided eviction policy could throw all sorts of

                    // crazy exceptions. Protect against such an exception

                    // killing the eviction thread.

                    boolean evict;

                    try {

                        evict = evictionPolicy.evict(evictionConfig, underTest,

                                idleObjects.size());

                    } catch (final Throwable t) {

                        // Slightly convoluted as SwallowedExceptionListener

                        // uses Exception rather than Throwable

                        PoolUtils.checkRethrow(t);

                        swallowException(new Exception(t));

                        // Don't evict on error conditions

                        evict = false;

                    }



                    if (evict) {

                        destroy(underTest, DestroyMode.NORMAL);

                        destroyedByEvictorCount.incrementAndGet();

                    } else {

                        if (testWhileIdle) {

                            boolean active = false;

                            try {

                                factory.activateObject(underTest);

                                active = true;

                            } catch (final Exception e) {

                                destroy(underTest, DestroyMode.NORMAL);

                                destroyedByEvictorCount.incrementAndGet();

                            }

                            if (active) {

                                boolean validate = false;

                                Throwable validationThrowable = null;

                                try {

                                    validate = factory.validateObject(underTest);

                                } catch (final Throwable t) {

                                    PoolUtils.checkRethrow(t);

                                    validationThrowable = t;

                                }

                                if (!validate) {

                                    destroy(underTest, DestroyMode.NORMAL);

                                    destroyedByEvictorCount.incrementAndGet();

                                    if (validationThrowable != null) {

                                        if (validationThrowable instanceof RuntimeException) {

                                            throw (RuntimeException) validationThrowable;

                                        }

                                        throw (Error) validationThrowable;

                                    }

                                } else {

                                    try {

                                        factory.passivateObject(underTest);

                                    } catch (final Exception e) {

                                        destroy(underTest, DestroyMode.NORMAL);

                                        destroyedByEvictorCount.incrementAndGet();

                                    }

                                }

                            }

                        }

                        if (!underTest.endEvictionTest(idleObjects)) {

                            // TODO - May need to add code here once additional

                            // states are used

                        }

                    }

                }

            }

        }

        final AbandonedConfig ac = this.abandonedConfig;

        if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {

            removeAbandoned(ac);

        }

    }

这里贴出的代码不是全貌,只截取了关键逻辑。jedisPool 初始化时,会根据 durationBetweenEvictionRuns 配置启动一个名称为 commons-pool-evictor 的线程调度,如果 durationBetweenEvictionRuns=-1,则不启用,evict 的所有逻辑都不会运行,否则每隔 durationBetweenEvictionRuns 时间运行一次驱逐逻辑。numTestsPerEvictionRun 控制每次检测多少个连接。每次检测当前连接是否该被驱逐,有如下两个策略,满足其一即可:

  • 策略A:配置的 softMinEvictableIdleDuration 小于当前连接的空闲时间【并且】配置的 minIdle 小于空闲连接总数
  • 策略B:配置的 minEvictableIdleDuration 小于当前连接的空闲时间

一般默认参数下,不会触发策略A。很容易触发策略B。如果当前连接被驱逐,则继续下个连接判断。否则如果 testWhileIdle=true ,继续判断当前连接的有效性,无效链接会销毁并剔除连接池。一轮检测结束后,会调用 ensureMinIdle(),确保连接池中的最小空闲连接。

ensureMinIdle

    private void ensureIdle(final int idleCount, final boolean always) throws Exception {

        if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {

            return;

        }



        while (idleObjects.size() < idleCount) {

            final PooledObject<T> p = create();

            if (p == null) {

                // Can't create objects, no reason to think another call to

                // create will work. Give up.

                break;

            }

            if (getLifo()) {

                idleObjects.addFirst(p);

            } else {

                idleObjects.addLast(p);

            }

        }

        if (isClosed()) {

            // Pool closed while object was being added to idle objects.

            // Make sure the returned object is destroyed rather than left

            // in the idle object pool (which would effectively be a leak)

            clear();

        }

    }

ensureMinIdle 最终会调用 ensureIdle() 方法,idleCount 入参即为配置的 minIdle 。这里会根据当前的空闲连接和传入的 idleCount 比较,直到连接连接池的空闲连接达到 idelCount 值。

addObject

    public void addObject() throws Exception {

        assertOpen();

        if (factory == null) {

            throw new IllegalStateException("Cannot add objects without a factory.");

        }

        addIdleObject(create());

    }

创建一个连接并添加到连接池,这个方法一般用于连接池初始化。比如连接池提前预热。

结语

  • 大多数人一般只关注连接池容量的配置,忽略了连接生命周期的配置

包括我,之前一直有一个错误的认知,认为连接池会确保 minIdle 值,只要当前申请的连接小于 minIdle 连接池就不会伸缩创建新连接。直到看了源码才发现,只要启用了驱逐连接线程,只要空闲存活时间大于配置的值 minEvictableIdleDuration 都会被驱逐,然后又会根据 minIdle 重建新的连接。又因为 jedisPoolConfig 的 minEvictableIdleDuration 默认为 60s,当 minIdle 配置的远大于实际需要时,因为默认连接获取策略为 lifo(总是拿最新的连接)会导致总是拿到新创建连接,又销毁又继续创建的循环中。当突然流量增大时,连接池中不一定保有 minIdle 的连接数,突然需要创建大量连接,势必会对应用造成一定影响。这里建议启用了驱逐线程时,minEvictableIdleDuration 的值稍微设置的大一点,或者直接 -1,让已经创建的连接一直保活。

 
展开阅读全文                                    
  • 0
    感动
  • 0
    路过
  • 0
    高兴
  • 0
    难过
  • 0
    搞笑
  • 0
    无聊
  • 0
    愤怒
  • 0
    同情
热度排行
友情链接