spring-data-redis的事务操作深度解析–原来客户端库还可以攒够了事务命令再发?

Java框架

浏览数:216

2019-6-26

AD:资源代下载服务

一、官方文档

简单介绍下redis的几个事务命令:

redis事务四大指令: MULTI、EXEC、DISCARD、WATCH。

这四个指令构成了redis事务处理的基础。

1.MULTI用来组装一个事务;
2.EXEC用来执行一个事务;
3.DISCARD用来取消一个事务;

4.WATCH类似于乐观锁机制里的版本号。

被WATCH的key如果在事务执行过程中被并发修改,则事务失败。需要重试或取消。

以后单独介绍。

下面是最新版本的spring-data-redis(2.1.3)的官方手册。

https://docs.spring.io/spring-data/redis/docs/2.1.3.RELEASE/reference/html/#tx

这里,我们注意这么一句话:

Redis provides support for transactions through the multiexec, and discard commands. These operations are available on RedisTemplate. However, RedisTemplate is not guaranteed to execute all operations in the transaction with the same connection. 

意思是redis服务器通过multi,exec,discard提供事务支持。这些操作在RedisTemplate中已经实现。然而,RedisTemplate不保证在同一个连接中执行所有的这些一个事务中的操作。

另外一句话:

Spring Data Redis provides the SessionCallback interface for use when multiple operations need to be performed with the same connection, such as when using Redis transactions. The following example uses the multi method:

意思是:spring-data-redis也提供另外一种方式,这种方式可以保证多个操作(比如使用redis事务)可以在同一个连接中进行。示例如下:

//execute a transaction
List<Object> txResults = redisTemplate.execute(new SessionCallback<List<Object>>() {
  public List<Object> execute(RedisOperations operations) throws DataAccessException {
    operations.multi();
    operations.opsForSet().add("key", "value1");

    // This will contain the results of all operations in the transaction
    return operations.exec();
  }
});
System.out.println("Number of items added to set: " + txResults.get(0));

二、实现事务的方式–RedisTemplate直接操作

在前言中我们说,通过RedisTemplate直接调用multi,exec,discard,不能保证在同一个连接中进行。

这几个操作都会调用RedisTemplate#execute(RedisCallback<T>, boolean),比如multi:

    public void multi() {
        execute(connection -> {
            connection.multi();
            return null;
        }, true);
    }

我们看看RedisTemplate的execute方法的源码:

 1 public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
 2 
 3         Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
 4         Assert.notNull(action, "Callback object must not be null");
 5 
 6         RedisConnectionFactory factory = getRequiredConnectionFactory();
 7         RedisConnection conn = null;
 8         try {
 9             --开启了enableTransactionSupport选项,则会将获取到的连接绑定到当前线程
10             if (enableTransactionSupport) {
11                 // only bind resources in case of potential transaction synchronization
12                 conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
13             } else {
-- 未开启,就会去获取新的连接 14 conn = RedisConnectionUtils.getConnection(factory); 15 } 16 17 boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); 18 19 RedisConnection connToUse = preProcessConnection(conn, existingConnection);
。。。忽略无关代码。。。
26 RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); 27 T result = action.doInRedis(connToExpose); -- 使用获取到的连接,执行定义在业务回调中的代码 28 。。。忽略无关代码。。。 33 34 // TODO: any other connection processing? 35 return postProcessResult(result, connToUse, existingConnection); 36 } finally { 37 RedisConnectionUtils.releaseConnection(conn, factory); 38 } 39 }

查看以上源码,我们发现,

  • 不启用enableTransactionSupport,默认每次获取新连接,代码如下:
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.multi();

template.opsForValue().set("test_long", 1);

template.opsForValue().increment("test_long", 1);

template.exec();
  • 启用enableTransactionSupport,每次获取与当前线程绑定的连接,代码如下:
RedisTemplate<String, Object> template = new RedisTemplate<>();

template.setEnableTransactionSupport(true);

template.multi();

template.opsForValue().set("test_long", 1);

template.opsForValue().increment("test_long", 1);

template.exec();  

三、实现事务的方式–SessionCallback

 采用这种方式,默认就会将所有操作放在同一个连接,因为在execute(SessionCallback<T> session)(注意,这里是重载函数,参数和上面不一样)源码中:

	public <T> T execute(SessionCallback<T> session) {

		Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
		Assert.notNull(session, "Callback object must not be null");

		RedisConnectionFactory factory = getRequiredConnectionFactory();
		//在执行业务回调前,手动进行了绑定
		RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
		try {   // 业务回调
			return session.execute(this);
		} finally {
			RedisConnectionUtils.unbindConnection(factory);
		}
	}

  

四、SessionCallback方式的示例代码:

 1         RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration("192.168.19.90");
 2         JedisConnectionFactory factory = new JedisConnectionFactory(configuration);
 3         factory.afterPropertiesSet();
 4 
 5         RedisTemplate<String, Object> template = new RedisTemplate<>();
 6         template.setConnectionFactory(factory);
 7         template.setDefaultSerializer(new GenericFastJsonRedisSerializer());
 8         StringRedisSerializer serializer = new StringRedisSerializer();
 9         template.setKeySerializer(serializer);
10         template.setHashKeySerializer(serializer);
11 
12         template.afterPropertiesSet();
14 
15         try {
16             List<Object> txResults = template.execute(new SessionCallback<List<Object>>() {
17                 @Override
18                 public List<Object> execute(RedisOperations operations) throws DataAccessException {
19 
20                     operations.multi();
21 
22                     operations.opsForValue().set("test_long", 1);
23                     int i = 1/0;
24                     operations.opsForValue().increment("test_long", 1);
25 
26                     // This will contain the results of all ops in the transaction
27                     return operations.exec();
28                 }
29             });
30 
31         } catch (Exception e) {
32             System.out.println("error");
33             e.printStackTrace();
34         }

有几个值得注意的点:

1、为什么加try catch

先说结论:只是为了防止调用的主线程失败。

因为事务里运行到23行,(int i = 1/0)时,会抛出异常。

但是在 template.execute(SessionCallback<T> session)中未对其进行捕获,只在finally块进行了连接释放。

所以会导致调用线程(这里是main线程)中断。

 2.try-catch了,事务到底得到保证了没

我们来测试下,测试需要,省略非关键代码

2.1 事务执行过程,抛出异常的情况:

            List<Object> txResults = template.execute(new SessionCallback<List<Object>>() {
                @Override
                public List<Object> execute(RedisOperations operations) throws DataAccessException {

                    operations.multi();

                    operations.opsForValue().set("test_long", 1);
                    int i = 1/0;
                    operations.opsForValue().increment("test_long", 1);

                    // This will contain the results of all ops in the transaction
                    return operations.exec();
                }
            });

  执行上述代码,执行到int i = 1/0时,会抛出异常。我们需要检查,抛出异常后,是否发送了“discard”命令给redis 服务器?

下面是我的执行结果,从最后的抓包可以看到,是发送了discard命令的:    

2.2 事务执行过程,不抛出异常的情况:

 这次我们注释了抛错的那行,可以看到“EXEC”命令已经发出去了:

3 抛出异常,不捕获异常的情况:

有些同学可能比较奇怪,为啥网上那么多教程,都是没有捕获异常的,我这里要捕获呢?

其实我也奇怪,但在我目前测试来看,不捕获的话,执行线程就中断了,因为template.execute是同步执行的。

来,看看:

从上图可以看到,主线程被未捕获的异常给中断了,但是,查看网络抓包,发现“DISCARD”命令还是发出去了的。

4.总结

从上面可以看出来,不管捕获异常没,事务都能得到保证。只是不捕获异常,会导致主线程中断。

不保证所有版本如此,在我这,spring-data-redis 2.1.3是这样的。

我跟了n趟代码,发现:

1、在执行sessionCallBack中的代码时,我们一般会先执行multi命令。

multi命令的代码如下:

    public void multi() {
        execute(connection -> {
            connection.multi();
            return null;
        }, true);
    }

即调用了当前线程绑定的connection的multi方法。

进入JedisConnection的multi方法,可以看到:

private @Nullable Transaction transaction;

public void multi() { if (isQueueing()) { return; } try { if (isPipelined()) { getRequiredPipeline().multi(); return; }
//赋值给了connection的实例变量 this.transaction = jedis.multi(); } catch (Exception ex) { throw convertJedisAccessException(ex); } }

2、在有异常抛出时,直接进入finally块,会去关闭connection,当然,这里的关闭只是还回到连接池。

大概的逻辑如下:

3.在没有异常抛出时,执行exec,在exec中会先将状态变量修改,后边进入finally的时候,就不会发送discard命令了。

 最后的结论就是:

所有这一切的前提是,共有同一个连接。(使用SessionCallBack的方式就能保证,总是共用同一个连接),否则multi用到的连接1里transcation是有值的,但是后面获取到的其他连接2,3,4,里面的transaction是空的,

还怎么保证事务呢?

五、思考

在不开启redisTemplate的enableTransactionSupport选项时,每执行一次redis操作,就会向服务器发送相应的命令。

但是,在开启了redisTemplate的enableTransactionSupport选项,或者使用SessionCallback方式时,会像下面这样发送命令:

 

 后来,我在《redis实战》这本书里的4.4节,Redis事务这一节里,找到了答案:

归根到底呢,因为重用同一个连接,所以可以延迟发;如果每次都不一样的连接,只能马上发了。

 这里另外说一句,不是所有客户端都这样,redis自带的redis-cli是不会延迟发送的。

六、源码

https://github.com/cctvckl/work_util/tree/master/spring-redis-template-2.1.3

 

作者:三国梦回