当前位置: 代码迷 >> 综合 >> vert.x源码解析之redis集成
  详细解决方案

vert.x源码解析之redis集成

热度:8   发布时间:2024-01-10 06:01:50.0

文章目录

  • 前言
  • vertx是什么
  • 为什么要用vertx
  • vertx-redis-client是什么
  • vertx-redis-client有哪些特性
  • 怎么使用vertx-redis-client
    • 添加vertx-redis-client依赖
    • RedisConnection
    • redisOptions
      • 各项主要配置的含义
        • type
        • masterName
        • role
        • slaves
        • poolCleanerInterval
        • poolRecycleTimeout
        • maxPoolSize
        • maxPoolWaiting
        • maxWaitingHandlers
    • RedisClient
      • STANDALONE
      • SENTINEL
      • CLUSTER
    • RedisApi
  • 总结

前言

bala bala…

vertx是什么

vert.x是一个全异步框架,通过内部的event bus跟event loop做到了处处皆异步。关于里面的细节,后面我会出一篇文章详细跟大家探讨…

为什么要用vertx

充分利用机器性能,提升应用的吞吐量上限。

vertx-redis-client是什么

通俗来讲,vertx-redis提供了一个全异步的可配置的redis客户端

vertx-redis-client有哪些特性

相对常规的jedis或者Lettuce来说,这里的特性其实就是vertx贯穿全局的特性:异步。当我们调用api发送一条redis指令的时候,可以不用等待redis的响应,只需要绑定一个回调函数,程序就可以继续往下执行。当拿到redis的响应之后,就会触发这个回调。当然,像这种编程方式,需要把业务依赖关系处理得很明确,因为我们必须把依赖这个redis响应的所有处理都移到这个回调函数里去。

怎么使用vertx-redis-client

添加vertx-redis-client依赖

//gradle
compile("io.vertx:vertx-redis-client:$version")
//maven
<dependency><groupId>io.vertx</groupId><artifactId>vertx-redis-client</artifactId><vertion></version>
</dependency>

RedisConnection

RedisConnection是vertx-redis-client里暴露出来可操作redis的最基本的一组api了,使用如下

  Redis.createClient(Vertx.vertx(), "redis://ip:port").connect(onConnect -> {
    //连接成功if (onConnect.succeeded()) {
    //获取连接实例RedisConnection conn = onConnect.result();//创建redis get指令. Request command = Request.cmd(Command.GET);//添加指令参数aa. 最后到达redis的指令就是get aacommand.arg("aa");//发送指令conn.send(command, resp -> {
    //执行成功if (resp.succeeded()) {
    //拿到指令执行结果Response result = resp.result();}});}});

这里就谈谈怎么使用吧,其实内部的实现就是通过netty包装的channel跟redis建立起连接,有兴趣的朋友可以点进去看看。

redisOptions

在介绍该redisClient各种模式之前,我们需要了解options是什么。
简单来说,redisOptions是我们用来定制该redisClient的一种方式,像之前我们通过Redis.createClient(Vertx.vertx(), “redis://ip:port”) 这种仅仅提供一个地址的方式就建立起了一个redisClient,但是如果我们想要定制更复杂的redisClient呢?这个时候就需要用到redisOptions了

各项主要配置的含义

先大概看下redisOptions都有哪些属性:

public class RedisOptions {
    /*** The default redis endpoint = {@code redis://localhost:6379}*/public static final String DEFAULT_ENDPOINT = "redis://localhost:6379";private RedisClientType type;private NetClientOptions netClientOptions;private List<String> endpoints;private int maxWaitingHandlers;private int maxNestedArrays;private String masterName;private RedisRole role;private RedisSlaves slaves;private String password;// pool related optionsprivate int poolCleanerInterval;private int maxPoolSize;private int maxPoolWaiting;private int poolRecycleTimeout;
}
type

该枚举用于指定redisClient以什么模式去连接server,这里的模式跟redis server的模式一一对应,我们在用的时候也必须注意,server是以什么模式部署的,client就以什么模式去配置。源码如下

public enum RedisClientType {
    /*** 默认值,单机模式*/STANDALONE,/*** 哨兵*/SENTINEL,/*** 集群*/CLUSTER
}

这些配置决定了redisClient与server交互的方式,不同的类型会用不同的策略来应对,在我们调用Redis#creatClient的时候就会作出处理,源码如下

static Redis createClient(Vertx vertx, RedisOptions options) {
    switch (options.getType()) {
    case STANDALONE:return new RedisClient(vertx, options);case SENTINEL:return new RedisSentinelClient(vertx, options);case CLUSTER:return new RedisClusterClient(vertx, options);default:throw new IllegalStateException("Unknown Redis Client type: " + options.getType());}}

至于不同Client内部是怎么处理的,后面会一一介绍

masterName

这个只在Sentinel模式下有用,跟redisServer的redis_sentinel.conf里配置的masterName是一个东西,用来指明连接的是被哨兵监控的哪一个主从集群

role

这项配置只用于clientType为Sentinel的情况,主要用来指定当前的客户端到底是跟master连接还是只跟slave连接,如果配置为slave,则通过该客户端发起的所有指令都会被路由到slave节点,这意味着这个client对redisServer的所有操作都是只读的。
还有一个sentinel,这个意味着这个client只操作server端的sentinel,不会主动去获取到master或者slave的连接。按我目前看来,这个枚举值其实是给程序内部使用的,除非我们想手动实现跟哨兵的通信以及维护相应主从节点的一个failover情况,那么就可以使用这个配置。核心源码在RedisSentinelClient中,如下所示

private void createConnectionInternal(RedisOptions options, RedisRole role, Handler<AsyncResult<RedisConnection>> onCreate) {
    switch (role) {
    case SENTINEL:resolveClient(this::isSentinelOk, options, createAndConnect);break;case MASTER:resolveClient(this::getMasterFromEndpoint, options, createAndConnect);break;case SLAVE:resolveClient(this::getSlaveFromEndpoint, options, createAndConnect);}}

这里可以看到,根据不同的role,获取了不同类型的redis连接地址

private void getMasterFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
    final RedisURI uri = new RedisURI(endpoint);connectionManager.getConnection(context, getSentinelEndpoint(uri), null, onCreate -> {
    final RedisConnection conn = onCreate.result();final String masterName = options.getMasterName();// 根据masterName从sentinel获取相应主从集群的master节点信息conn.send(cmd(SENTINEL).arg("GET-MASTER-ADDR-BY-NAME").arg(masterName), getMasterAddrByName -> {
    //bala bala...});});
private void getSlaveFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
    final RedisURI uri = new RedisURI(endpoint);connectionManager.getConnection(context, getSentinelEndpoint(uri), null, onCreate -> {
    final RedisConnection conn = onCreate.result();final String masterName = options.getMasterName();// 获取masterName指定主从的slave节点conn.send(cmd(SENTINEL).arg("SLAVES").arg(masterName), sentinelSlaves -> {
    //...});});}
slaves

这项配置用来指定在集群模式下对server的读操作的行为,源码如下

public enum RedisSlaves {
    /*** 读操作只落在master*/NEVER,/*** 读操作会随机落在master跟slave*/SHARE,/*** 读操作只落在slave上*/ALWAYS
}

其核心原理是通过对slave节点执行readonly命令来开启slave的查询功能,因为默认集群模式下slave是不对外提供服务的。核心源码如下

private void connect(List<String> endpoints, int index, Handler<AsyncResult<RedisConnection>> onConnect) {
    //如果RedisSlaves的值不是NEVER,就执行readonlyconnectionManager.getConnection(context, endpoints.get(index), RedisSlaves.NEVER != options.getUseSlave() ? cmd(READONLY) : null, getConnection -> {
    }
}
poolCleanerInterval

连接池空闲连接清理间隔,每次扫描时会直接将闲置的连接关闭。-1表示不开启空闲连接清理,核心源码在RedisConnectionManager#start()

synchronized void start() {
    long period = options.getPoolCleanerInterval();//延迟period时间后执行checkExpired()进行空闲连接清理this.timerID = period > 0 ? vertx.setTimer(period, id -> checkExpired(period)) : -1;}
poolRecycleTimeout

这个是用来控制连接池中连接在执行完命令后还能存活的时间,超过这个时间连接就会被关闭,核心源码在RedisStandaloneConnection。如下

public boolean isValid() {
    return expirationTimestamp > 0 && System.currentTimeMillis() <= expirationTimestamp;}@Overridepublic void close() {
    // recycle this connection from the poolexpirationTimestamp = recycleTimeout > 0 ? System.currentTimeMillis() + recycleTimeout : 0L;listener.onRecycle();}
maxPoolSize

连接池最大大小。只有当等待获取连接的请求数量达到maxPoolWaiting时才会创建新的连接,可以类比ThreadPoolExecutormaximumPoolSize

maxPoolWaiting

连接池等待队列大小。当我们通过RedisClient或者RedisApi去操作reids时,如果当前已无空闲的redis连接,那么这个请求就会进入连接池的等待队列中。可以类比ThreadPoolExecutorworkQueue

maxWaitingHandlers

等待执行的handler数的最大值。当我们发送完redis指令后,一般是需要对这个响应作出处理,这个过程会被包装成一个task添加到eventLoopwaitQueue中,这项配置控制的就是这个queuesize

RedisClient

redisClient本质是对上述redisConnection的再封装,提供了针对redisServer不同模式的不同处理,分为3种:单机,哨兵以及集群模式。对应到具体的class:RedisClientRedisSentinelClientRedisSentinelClient
接下来来看一下如何配置这些客户端

STANDALONE

这个是redisClient默认的一种配置模式,现在我们用前面描述的redisOptions实践一下。

    RedisOptions options = new RedisOptions();options.setType(RedisClientType.STANDALONE)//指定server地址.setConnectionString("redis://ip:port").setMaxPoolSize(6).setMaxWaitingHandlers(1024).setPoolRecycleTimeout(15_000).setMaxPoolWaiting(2);Redis client = Redis.createClient(Vertx.vertx(), options);//创建redis 指令. getRequest command = Request.cmd(Command.GET);//添加指令参数aa. 最后到达redis的指令就是get aacommand.arg("aa");//发送指令并指定一个匿名handler处理结果client.send(command).onComplete(event -> {
    System.out.println("the value of redis key 'aa' is"+event.result().toString());});

SENTINEL

这里请原谅我秀了一下我那蹩脚的英语水平…

    RedisOptions options = new RedisOptions();options.setType(RedisClientType.SENTINEL)//the master name which are monitored by sentinels.setMasterName("myMaster").setRole(RedisRole.MASTER).setMaxPoolSize(6).setMaxWaitingHandlers(1024).setPoolRecycleTimeout(15_000).setMaxPoolWaiting(2);//这里只填哨兵的地址就行了//程序会自动从哨兵处获取节点信息List<String> sentinels = new ArrayList<>();sentinels.add("redis://ip:port");sentinels.add("redis://ip:port");options.setEndpoints(sentinels);...

这里可以只填一个哨兵地址,由于client不会主动去连接监控同一个master的哨兵节点,所以一旦这个唯一的哨兵挂了,那么server就会处于不可用的状态。

CLUSTER

    RedisOptions options = new RedisOptions();options.setType(RedisClientType.CLUSTER).setUseSlave(RedisSlaves.NEVER).setMaxPoolSize(6).setMaxWaitingHandlers(1024).setPoolRecycleTimeout(15_000).setMaxPoolWaiting(20);//添加集群节点,越多越好...List<String> clusters = new ArrayList<>();sentinels.add("redis://ip:port");sentinels.add("redis://ip:port");sentinels.add("redis://ip:port");options.setEndpoints(clusters);

这里也需要注意最好将cluster中所有的节点信息给添加进来。虽然连接任意一个节点,client都会通过执行cluster slots获取cluster中所有节点的信息,但是长久保持连接的只有我们在这里设置进去的节点。这意味着一旦我们手动设置的节点挂了,那么server对我们来说就不可用了

RedisApi

这个是基于RedisClient的封装,隐藏了Command的复杂细节,提供了一套可以直接调用的api。使用如下:

    Redis client = Redis.createClient(Vertx.vertx(), options);RedisAPI api = RedisAPI.api(client);api.get("aa").onComplete(event -> {
    System.out.println("the value of redis key 'aa' is" + event.result().toString());});

总结

全文描述了vertx-client是什么以及其相对某些redis-client所具备的特点。同时深挖了这个客户端各项配置的作用以及不同模式下的表现。
总的来说,这篇文章可以帮助大家去建立一个基于vertx的redis客户端,并且对这个客户端能够做到基本的知其然且知其所以然