redis客户端jedis&spring-data-redis源码赏析

Java框架

浏览数:25

2020-6-1

AD:资源代下载服务

背景

本文主要对当下开源流行的redis客户端jedis和spring-data-redis的部分核心源码进行剖析,记录一下怎么去实现一个redis的java客户端以及在使用redis集群时客户端的操作需要注意的要点。

版本

jedis:v2.9.0、

spring-data-redis:v2.0.8.RELEASE

源代码的分析

  • 先看最核心的入口类:org.springframework.data.redis.core.RedisTemplate

它继承于org.springframework.data.redis.core.RedisAccessor,主要是设置org.springframework.data.redis.connection.RedisConnectionFactory,并在spring bean初始化完毕对connectionFactory进行为空校验;

实现的核心接口org.springframework.data.redis.core.RedisOperations主要提供了一些基础操作,但这个接口并不经常使用,因为redis的数据结构比较复杂,更多具体的操作都封装在了ValueOperations、ListOperations等,以及BoundValueOperations、BoundListOperations等这两类操作中,这两类操作的主要区别在于是否绑定key了,BoundXX接口继承了BoundKeyOperations这个对key基础操作的接口。

与redis服务端的交互所有操作几乎都调用了如下方法:

/**
	 * Executes the given action object within a connection that can be exposed or not. Additionally, the connection can
	 * be pipelined. Note the results of the pipeline are discarded (making it suitable for write-only scenarios).
	 *
	 * @param <T> return type
	 * @param action callback object to execute
	 * @param exposeConnection whether to enforce exposure of the native Redis Connection to callback code
	 * @param pipeline whether to pipeline or not the connection for the execution
	 * @return object returned by the action
	 */
	@Nullable
	public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {

		Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
		Assert.notNull(action, "Callback object must not be null");

		RedisConnectionFactory factory = getRequiredConnectionFactory();
		RedisConnection conn = null;
		try {

			if (enableTransactionSupport) {
				// only bind resources in case of potential transaction synchronization
				conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
			} else {
				conn = RedisConnectionUtils.getConnection(factory);
			}

			boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);

			RedisConnection connToUse = preProcessConnection(conn, existingConnection);

			boolean pipelineStatus = connToUse.isPipelined();
			if (pipeline && !pipelineStatus) {
				connToUse.openPipeline();
			}

			RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
			T result = action.doInRedis(connToExpose);

			// close pipeline
			if (pipeline && !pipelineStatus) {
				connToUse.closePipeline();
			}

			// TODO: any other connection processing?
			return postProcessResult(result, connToUse, existingConnection);
		} finally {
			RedisConnectionUtils.releaseConnection(conn, factory);
		}
	}

我们可以看到:

  1. 对事务、管道单独做了一些程序逻辑处理;
  2. 很多初始化的操作都在spring容器启动完成时执行的;
  3. 有对连接选择是否进行生成代理连接的操作(一般,直接调用redisTemplate使用代理;使用各种operations间接处理各自的数据结构时并不使用代理连接进行操作);
  4. 利用匿名回调函数作为传参进行切面操作(连接的建立、数据处理、连接的释放):
Connection conn = null;
try {
   conn = connectionFactory.getConnection();   
   // handle data
   // ...

} finally {
   if (conn != null)
      conn.close();// or release, or disconnect
}
  • 连接的建立和释放(jedis、jedisSentinel、jedisCluster的比较)
/*
	 * (non-Javadoc)
	 * @see org.springframework.data.redis.connection.RedisConnectionFactory#getConnection()
	 */
	public RedisConnection getConnection() {

		if (isRedisClusterAware()) {// 如果clusterConfiguration不为空,那么使用集群模式连接
			return getClusterConnection();
		}

		Jedis jedis = fetchJedisConnector();// 否则,用Jedis单个节点进行连接(是否用连接池技术可以进行设置)
		String clientName = clientConfiguration.getClientName().orElse(null);
		JedisConnection connection = (getUsePool() ? new JedisConnection(jedis, pool, getDatabase(), clientName)
				: new JedisConnection(jedis, null, getDatabase(), clientName));
		connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
		return postProcessConnection(connection);
	}

集群模式的连接建立:

/*
	 * (non-Javadoc)
	 * @see org.springframework.data.redis.connection.RedisConnectionFactory#getClusterConnection()
	 */
	@Override
	public RedisClusterConnection getClusterConnection() {

		if (!isRedisClusterAware()) {
			throw new InvalidDataAccessApiUsageException("Cluster is not configured!");
		}
		return new JedisClusterConnection(cluster, clusterCommandExecutor);
	}// 将当前类的JedisCluster对象传递给一个新的JedisClusterConnection
// 而我们又发现:JedisConnectionFactory implements InitializingBean

/*
	 * (non-Javadoc)
	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
	 */
	public void afterPropertiesSet() {

...............
..............



		if (isRedisClusterAware()) {
			this.cluster = createCluster();
		}
	}


// 再看createCluster():
...
private JedisCluster createCluster() {

		JedisCluster cluster = createCluster(this.clusterConfig, getPoolConfig());
		JedisClusterTopologyProvider topologyProvider = new JedisClusterTopologyProvider(cluster);
		this.clusterCommandExecutor = new ClusterCommandExecutor(topologyProvider,
				new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster, topologyProvider), EXCEPTION_TRANSLATION);
		return cluster;
	}

	/**
	 * Creates {@link JedisCluster} for given {@link RedisClusterConfiguration} and {@link GenericObjectPoolConfig}.
	 *
	 * @param clusterConfig must not be {@literal null}.
	 * @param poolConfig can be {@literal null}.
	 * @return the actual {@link JedisCluster}.
	 * @since 1.7
	 */
	protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig, GenericObjectPoolConfig poolConfig) {

		Assert.notNull(clusterConfig, "Cluster configuration must not be null!");

		Set<HostAndPort> hostAndPort = new HashSet<>();
		for (RedisNode node : clusterConfig.getClusterNodes()) {
			hostAndPort.add(new HostAndPort(node.getHost(), node.getPort()));
		}

		int redirects = clusterConfig.getMaxRedirects() != null ? clusterConfig.getMaxRedirects() : 5;

		int connectTimeout = getConnectTimeout();
		int readTimeout = getReadTimeout();

		return StringUtils.hasText(getPassword())
				? new JedisCluster(hostAndPort, connectTimeout, readTimeout, redirects, getPassword(), poolConfig)
				: new JedisCluster(hostAndPort, connectTimeout, readTimeout, redirects, poolConfig);
	}

// 可以看出在spring容器初始化完成之后执行了当前类中JedisCluster对象的实例化

单节点模式的连接建立:

/**
	 * Returns a Jedis instance to be used as a Redis connection. The instance can be newly created or retrieved from a
	 * pool.
	 *
	 * @return Jedis instance ready for wrapping into a {@link RedisConnection}.
	 */
	protected Jedis fetchJedisConnector() {
		try {

			if (getUsePool() && pool != null) {
				return pool.getResource();
			}

			Jedis jedis = createJedis();
			// force initialization (see Jedis issue #82)
			jedis.connect();

			potentiallySetClientName(jedis);
			return jedis;
		} catch (Exception ex) {
			throw new RedisConnectionFailureException("Cannot get Jedis connection", ex);
		}
	}

	private Jedis createJedis() {

		if (providedShardInfo) {
			return new Jedis(getShardInfo());
		}
        // 可以看出,每次都重新建立新的客户端连接
		Jedis jedis = new Jedis(getHostName(), getPort(), getConnectTimeout(), getReadTimeout(), isUseSsl(),
				clientConfiguration.getSslSocketFactory().orElse(null), //
				clientConfiguration.getSslParameters().orElse(null), //
				clientConfiguration.getHostnameVerifier().orElse(null));

		Client client = jedis.getClient();

		getRedisPassword().map(String::new).ifPresent(client::setPassword);
		client.setDb(getDatabase());

		return jedis;
	}

再看两种模式下连接的关闭:

/*
	 * (non-Javadoc)
	 * @see org.springframework.beans.factory.DisposableBean#destroy()
	 */
	public void destroy() {
        // spring容器销毁时,连接池和jediscluster进行销毁或关闭(如果有的话)
		if (getUsePool() && pool != null) {

			try {
				pool.destroy();
			} catch (Exception ex) {
				log.warn("Cannot properly close Jedis pool", ex);
			}
			pool = null;
		}

		if (cluster != null) {

			try {
				cluster.close();
			} catch (Exception ex) {
				log.warn("Cannot properly close Jedis cluster", ex);
			}

			try {
				clusterCommandExecutor.destroy();
			} catch (Exception ex) {
				log.warn("Cannot properly close cluster command executor", ex);
			}
		}
	}

除此之外,单节点模式下的关闭:org.springframework.data.redis.connection.jedis.JedisConnection#close;

public void close() throws DataAccessException {
		super.close();
		// return the connection to the pool
		if (pool != null) {
			if (!broken) {
				// reset the connection
				try {
					if (dbIndex > 0) {
						jedis.select(0);
					}
					pool.returnResource(jedis);
					return;
				} catch (Exception ex) {
					DataAccessException dae = convertJedisAccessException(ex);
					if (broken) {
						pool.returnBrokenResource(jedis);
					} else {
						pool.returnResource(jedis);
					}
					throw dae;
				}
			} else {
				pool.returnBrokenResource(jedis);
				return;
			}
		}
		// else close the connection normally (doing the try/catch dance)
		Exception exc = null;
		if (isQueueing()) {
			try {
				client.quit();
			} catch (Exception ex) {
				exc = ex;
			}
			try {
				client.disconnect();
			} catch (Exception ex) {
				exc = ex;
			}
			return;
		}
		try {
			jedis.quit();
		} catch (Exception ex) {
			exc = ex;
		}
		try {
			jedis.disconnect();
		} catch (Exception ex) {
			exc = ex;
		}
		if (exc != null)
			throw convertJedisAccessException(exc);
	}

集群模式下的关闭:org.springframework.data.redis.connection.jedis.JedisClusterConnection#close

/*
	 * (non-Javadoc)
	 * @see org.springframework.data.redis.connection.RedisConnection#close()
	 */
	@Override
	public void close() throws DataAccessException {

		if (!closed && disposeClusterCommandExecutorOnClose) {
			try {
				clusterCommandExecutor.destroy();
			} catch (Exception ex) {
				log.warn("Cannot properly close cluster command executor", ex);
			}
		}

		closed = true;// 仅仅是设置了一下状态,顶多销毁一下相关的bean,bean销毁的代码如下:
	}

...
...

org.springframework.data.redis.connection.ClusterCommandExecutor#destroy

/*
	 * (non-Javadoc)
	 * @see org.springframework.beans.factory.DisposableBean#destroy()
	 */
	@Override
	public void destroy() throws Exception {

		if (executor instanceof DisposableBean) {
			((DisposableBean) executor).destroy();
		}

		if (resourceProvider instanceof DisposableBean) {
			((DisposableBean) resourceProvider).destroy();
		}
	}
  • 继续追踪jediscluster对连接的建立和释放:
redis.clients.jedis.JedisClusterCommand#runWithRetries:
// 这里又看到了try..finally操作,对连接进行建立和释放
// JedisCluster继承于BinaryJedisCluster
// BinaryJedisCluster又使用抽象类JedisClusterCommand实现的

  private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
    if (attempts <= 0) {
      throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
    }

    Jedis connection = null;
    try {

      if (asking) {
        // TODO: Pipeline asking with the original command to make it
        // faster....
        connection = askConnection.get();
        connection.asking();

        // if asking success, reset asking flag
        asking = false;
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
          connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
        }
      }

      return execute(connection);

    } catch (JedisNoReachableClusterNodeException jnrcne) {
      throw jnrcne;
    } catch (JedisConnectionException jce) {
      // release current connection before recursion
      releaseConnection(connection);
      connection = null;

      if (attempts <= 1) {
        //We need this because if node is not reachable anymore - we need to finally initiate slots renewing,
        //or we can stuck with cluster state without one node in opposite case.
        //But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed request.
        //TODO make tracking of successful/unsuccessful operations for node - do renewing only
        //if there were no successful responses from this node last few seconds
        this.connectionHandler.renewSlotCache();

        //no more redirections left, throw original exception, not JedisClusterMaxRedirectionsException, because it's not MOVED situation
        throw jce;
      }

      return runWithRetries(key, attempts - 1, tryRandomNode, asking);
    } catch (JedisRedirectionException jre) {
      // if MOVED redirection occurred,
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache
        // recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }

      // release current connection before recursion or renewing
      releaseConnection(connection);
      connection = null;

      if (jre instanceof JedisAskDataException) {
        asking = true;
        askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
      } else if (jre instanceof JedisMovedDataException) {
      } else {
        throw new JedisClusterException(jre);
      }

      return runWithRetries(key, attempts - 1, false, asking);
    } finally {
      releaseConnection(connection);
    }
  }

补充

Jedis的实现:

// 由Jedis extends BinaryJedis查看一下redis.clients.jedis.BinaryJedis中引用了redis.clients.jedis.Client对象进行数据交互;
// 由Client extends BinaryClient,且BinaryClient extends Connection可以知道连接的开启和关闭:

// redis.clients.jedis.Connection使用Socket与服务端进行通信

public void connect() {
    if (!isConnected()) {
      try {
        socket = new Socket();
        // ->@wjw_add
        socket.setReuseAddress(true);
        socket.setKeepAlive(true); // Will monitor the TCP connection is
        // valid
        socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
        // ensure timely delivery of data
        socket.setSoLinger(true, 0); // Control calls close () method,
        // the underlying socket is closed
        // immediately
        // <-@wjw_add

        socket.connect(new InetSocketAddress(host, port), connectionTimeout);
        socket.setSoTimeout(soTimeout);

        if (ssl) {
          if (null == sslSocketFactory) {
            sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
          }
          socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true);
          if (null != sslParameters) {
            ((SSLSocket) socket).setSSLParameters(sslParameters);
          }
          if ((null != hostnameVerifier) &&
              (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) {
            String message = String.format(
                "The connection to '%s' failed ssl/tls hostname verification.", host);
            throw new JedisConnectionException(message);
          }
        }

        outputStream = new RedisOutputStream(socket.getOutputStream());
        inputStream = new RedisInputStream(socket.getInputStream());
      } catch (IOException ex) {
        broken = true;
        throw new JedisConnectionException(ex);
      }
    }
  }

  @Override
  public void close() {
    disconnect();
  }

  public void disconnect() {
    if (isConnected()) {
      try {
        outputStream.flush();
        socket.close();
      } catch (IOException ex) {
        broken = true;
        throw new JedisConnectionException(ex);
      } finally {
        IOUtils.closeQuietly(socket);
      }
    }
  }

总结

  • RedisCluster的出现(主要是redis3.0及以后),实现了:自动分割数据到不同的节点,增强集群的可用性;但是同时它导致了Redis集群并不支持处理多个keys的命令,因为这需要在不同的节点间移动数据,从而达不到像Redis那样的性能,在高负载的情况下可能会导致不可预料的错误.
  • redis-cli 对集群的支持是非常基本的, 所以它总是依靠 Redis 集群节点来将它转向(redirect)至正确的节点。一个真正的(serious)集群客户端应该做得比这更好: 它应该用缓存记录起哈希槽与节点地址之间的映射(map), 从而直接将命令发送到正确的节点上面。这种映射只会在集群的配置出现某些修改时变化, 比如说, 在一次故障转移(failover)之后, 或者系统管理员通过添加节点或移除节点来修改了集群的布局(layout)之后, 诸如此类。
  • 基于前两点的考虑,我们可以将redis集群不支持的一些命令和功能如:mset、mget、rename、keys、scan、事务、管道、脚本等通过在客户端实现以hash槽和master数据节点进行分组,变相实现这些功能,当然这需要结合所处的业务去考虑,因为分组后的结果集可能由多个节点多步操作才能完成,导致与最初命令执行的原子性有所差异。
  • Jedis对rediscluster的封装还是略微有些粗糙的,JedisCluster内部必须使用连接池,而且每次调用都睡获取连接和释放连接,如果想要保持多步操作使用同一个连接(比如事务),还需要对连接和用户线程进行绑定,以防止:1、下次拿错连接(其他不同节点);2、连接频繁建立和关闭或取出和释放(从连接池)产生多余的资源消耗。

 

 

作者:菜蚜