Redis在SpringBoot中的一些特殊用法

Redis在SpringBoot中的一些特殊用法

Redis除了操作那些数据类型的功能外,还能支持事务、流水线、发布订阅和Lua脚本等功能,这也是Redis常用的功能。在高并发的场景中,往往我们需要保证数据的一致性,这时考虑使用Redis务或者利用Redis执行Lua的原子性来达到数据一致性的目的,所以这里让我们对它们展开讨论。需要大批量执行Redis命令的时候,我们可以使用流水线来执行命令,这样可以极大地提Redis执行的速度。

Redis事务

首先Redis是支持一定事务能力的NoSQL,在Redis中使用事务,通常的命令组合是watch…multi…exec,也就是要在一个Redis连接中执行多个命令,这时我们可以考虑使用SessionCallback接口来达到这个目的。

其中,watch命令是可以监控Redis的一些键multi命令是开始事务,开始事务,该客户端的命令不会马上被执行,而是存放在一个队列里,这点是需要注意的地方,也就是在这时我们执行一些堪回数据的命令,Redis也是不会马上执行的,而是把命令放到一个队列里,所以时调用Redis的命令,结果都是返回null,这是初学者容易犯的错误:exe命令的意义在于执行事,只是它在队列命令执行前会判断被watch监控的Redis的键的数据是否发生过变化(即使赋予与前相同的值也会被认为是变化过〉,如果它认为发生了变化,那么Redis就会取消事务,否则就会行事务,Redis在执行事务时,要么全部执行,要么全部不执行,而且不会被其他客户端打断,这就保证了Redis事务下数据的一致性。

如图,这就是Redis事务的执行流程:

下面我们来测试一下这个过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
void test1(){

redisTemplate.opsForValue().set("key1","value1");
List list = (List) redisTemplate.execute((RedisOperations operations) -> {
// 设置要监控key1
operations.watch("key1");
// 开启事务,在exec命令执行前,全部都只是进入队列
operations.multi();
operations.opsForValue().set("key2", "value2");
operations.opsForValue().increment("key1", 1);// ①
// 获取值将为null,因为redis只是把命令放入队列,
Object value2 = operations.opsForValue().get("key2");
System.out.println("命令在队列,所以value为null【" + value2 + "】");
operations.opsForValue().set("key3", "value3");
Object value3 = operations.opsForValue().get("key3");
System.out.println("命令在队列,所以value为null【" + value3 + "】");
// 执行exec命令,将先判别key1是否在监控后被修改过,如果是不执行事务,否则执行事务
return operations.exec();// ②
});
System.out.println(list);
}

我们来做两种测试:

1:在Redis客户端清空key2和key3两个键的数据,然后在②处设置断点,在调试的环境下让请求达到断点,此时在Redis上修改keyl的值,然后再跳过断点,在请求完成后在Redis上查询key2和key3值,可以发现key2、key3返回的值都为空(nil),因为程序中先使得Redis的watch命令监控了keyl的值,而后的multi让之后的命令进入队列,而在exec方法运行前我们修改了keyl,根据Redis事务的规则,它在exec方法后会探测keyl是否被修改过,如果没有则会执行事务,否则就取消事务,所以key2和key3没有被保存到Redis服务器中。

测试结果:

2:续把key2和key3两个值清空,把①处的注释取消,让代码可以运行,因为key1是一个字符串,所以这里的代码是对字符串加一,这显然是不能运算的。同样地,我们运行这段代码后,可以看到服务器抛出了异常,然后我们去Redis服务器查询key2和key3,可以看到它们已经有了值。注意,这就是Redis事务和数据库事务的不一样,对于Redis事务是先让命令进入队列,所以一开始它并没有检测这个加一命令是否能够成功,只有在exec命令执行的时候,才能发现错误,对于出错的命令Redis只是报出错误,而错误后面的命令依旧被执行,所以key2和key3都存在数据,这就是Redis事务的特点,也是使用Redis事务需要特别注意的地方。为了克服这个问题,一般我们要在执行Redis事务前,严格地检查数据,以避免这样的情况发生。

测试结果:

使用Redis流水线

默认的情况下,Redis客户端是一条条命令发送给Redis服务器的,这样显然性能不高。在关系数据库中我们可以使用批量,也就是只有需要执行SQL时,才一次性地发送所有的SQL去执行,这样性能就提高了许多。对于Redis也是可以的,这便是流水线(pipline)技术,在很多情况下并不是Redis性能不佳,而是网络传输的速度造成瓶颈,使用流水线后就可以大幅度地在需要执行很多命令时提升Redis的性能。

接下来我们就来试试使用了SessionCallBack接口的,单流水线性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
void test2(){
Long start = System.currentTimeMillis();
List list = (List) redisTemplate.executePipelined((RedisOperations operations) -> {
for (int i = 1; i <= 100000; i++) {
operations.opsForValue().set("pipeline_" + i, "value_" + i);
String value = (String) operations.opsForValue().get("pipeline_" + i);
if (i == 100000) {
System.out.println("命令只是进入队列,所以值为空【" + value + "】");
}
}
return null;
});
Long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) + "毫秒。");
}

测试结果:

这里还是沿用SessionCallback接口执行写入和读出各10万次Redis命令,只是修改为了Lambda表达式而己如果你的JDK达不到8的版本以上,那么只能采用匿名类的形式来改写这段代码了。

为了测试性能,这里记录了开始执行时间和结束执行时间,并且打出了耗时。在我的测试中,这10万次读写基本在300~600ms,大约平均值在400~500ms,也就是不到ls就能执行10万次读和写命令,这个速度还是十分快的。在使用非流水线的情况下,我的测试大约每秒只能执行2~3万条命令,可见使用流水线后可以提升大约10倍的速度,它十分适合大数据量的执行。

但是需要注意以下两点:

  • 此代码只是运用于测试,在运行如此多的命令时,需要考虑的另外一个问题是内存空间的消耗,因对于程序而言,它最终会返一个List象,如果过的命令执行返回的结果都保存到这个List中,显然会造成内存消耗过大,尤其在那些高并发的网站中就很容易造成川、内存溢出的异常,这个时候应该考虑使用迭代的方法执行Redis命令。
  • 与事务一样,使用流水线的过程中,有的命令也只是进入队列而没有执行,所以执行的命令返回值也为空,这也是需要注意的地方。

使用Redis发布订阅

发布订阅是消息的一种常用模式。例如,在企业分配任务之后,可以通过邮件、短信或者微信通知到相关的责任人,这就是一种典型的发布订阅模式。首先是Redis提供一个渠道,让消息能够发送到这个渠道上,而多个系统可以监听这个渠道,如短信、微信和邮件系统都可以监昕这个渠道,当一条消息发送到渠道,渠道就会通知它的监昕者,这样短信、微信邮件系统就能够得到这个渠道给它们的消息了,这些监听者会根据自己的需要去处理这个消息,于是我们就可以得到各种各样的通知了。

为了接收到Redis发送过来的消息,我们需要定义一个消息监听器:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class RedisMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
// 消息体
String body = new String(message.getBody());
// 渠道名称
String topic = new String(pattern);
System.out.println(body);
System.out.println(topic);
}
}

这里的onMessage方法是得到消息后的处理方法,其中message参数代表Redis发送过来的消息,pattern是渠道名称,。且也在Message方法里打印了它们的内容。这里因为标注了@Component注解,所以在SpringBoot扫描后,会把它自动装配到IoC容器中。

此外,还需要监听容器的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Component
public class ListenerContainer {

// Redis连接工厂
@Autowired
private RedisConnectionFactory connectionFactory = null;

// Redis消息监听器
@Autowired
private MessageListener redisMsgListener = null;

// 任务池
private ThreadPoolTaskScheduler taskScheduler = null;

/**
* 创建任务池,运行线程等待处理Redis的消息
*
* @return
*/
@Bean
public ThreadPoolTaskScheduler initTaskScheduler() {
if (taskScheduler != null) {
return taskScheduler;
}
taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(20);
return taskScheduler;
}

/**
* 定义Redis的监听容器
*
* @return 监听容器
*/
@Bean
public RedisMessageListenerContainer initRedisContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// Redis连接工厂
container.setConnectionFactory(connectionFactory);
// 设置运行任务池
container.setTaskExecutor(initTaskScheduler());
// 定义监听渠道,名称为topic1
Topic topic = new ChannelTopic("topic1");
// 使用监听器监听Redis的消息
container.addMessageListener(redisMsgListener, topic);
return container;
}
}

这里RedisTemplate和RedisConnectionFactory象都是SpringBoot自动创建的,所以这里只是把它们注入进来,只需要使用@Autowired解即。然后定义了一个任务池,并设置了任务池大小为20,这样它将可以运行线程,井进行阻塞,等待Redis消息的传入。

接着再定义了一个Redis消息听的容器RedisMessageListenerContainer,并且往容器设置了Redis连接工厂和指定运行消息的线程池,定义了接收“topicl”渠道的消息,这样系统就可以监听Redis关于“topicl"渠道的消息了。

测试结果:

使用Lua脚本

Redis中有很多的命令,但是严格来说Redis提供的计算能力还是比较有限的。为了增强Redis计算能力,Redis在2.6版本后提供了Lua脚本的支持,而且执行Lua脚本在Redis中还具备原子性,所以在需要保证数据一致性的高并发环境中,我们也可以使用Redis的Lua语言来保证数据的一致性,且Lua脚本具备更加强大的运算功能,在高并发需要保证数据一致性时,Lua脚本方案比使用Redis自身提供的事务要更好一些。

在Redis中有两种运行Lua的方法,一种是直接发送Lua到Redis服务器去执行,另一种是先把Lua发送给Redis,Redis对Lua脚本进行缓存,然后返回一个SHAl的32位编码回来,之后只需要发送SHAl相关参数给Redis便可以执行了。这里需要解释的是为什么会存在通过32位编码执行的方法。如果Lua脚本很长,那么就需要通过网络传递脚本给Redis去执了,而现实的情况是网络的传递速度往往跟不上Redis的执行速度,所以网络就会成为Redis执行的瓶颈。如果只是传递32位编码参数,那么需要传递的消息就少了许多,这样就可以极大地减少网络传输的内容,从而提高系统的性能。

为了支持Redis的Lua脚本,Spring供了RedisScript接口,与此同时也有一个DefaultRedisScript实现类。让我们先来看看RedisScript接口的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public interface RedisScript<T> {

//获取脚本的sha1
String getSha1();

//获取脚本的返回值
@Nullable
Class<T> getResultType();

//获取脚本的字符串
String getScriptAsString();

default boolean returnsRawValue() {
return this.getResultType() == null;
}

static <T> RedisScript<T> of(String script) {
return new DefaultRedisScript(script);
}

static <T> RedisScript of(String script, Class<T> resultType) {
Assert.notNull(script, "Script must not be null!");
Assert.notNull(resultType, "ResultType must not be null!");
return new DefaultRedisScript(script, resultType);
}

static <T> RedisScript<T> of(Resource resource) {
Assert.notNull(resource, "Resource must not be null!");
DefaultRedisScript<T> script = new DefaultRedisScript();
script.setLocation(resource);
return script;
}

static <T> RedisScript<T> of(Resource resource, Class<T> resultType) {
Assert.notNull(resource, "Resource must not be null!");
Assert.notNull(resultType, "ResultType must not be null!");
DefaultRedisScript<T> script = new DefaultRedisScript();
script.setResultType(resultType);
script.setLocation(resource);
return script;
}
}

这里Spring会将Lua脚本发送到Redis服务器进行缓存,而此时Redis服务器会返回一个32位的SHAl编码,这时候通过getShal方法就可以得到Redis返回的这个编码了;getResultType方法是获取Lua脚本返回的Java类型getScriptAsString是返回脚本的字符串,以便我们观看脚本。

下面我们来测试一下吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
void test(){
DefaultRedisScript<String> redisScript=new DefaultRedisScript<String >();
//设置脚本
redisScript.setScriptText("return 'i love you'");
//定义返回类型
redisScript.setResultType(String.class);
RedisSerializer<String > redisSerializer=redisTemplate.getStringSerializer();

//执行脚本
String s=(String)redisTemplate.execute
(redisScript,redisSerializer,redisSerializer,null);
System.out.println();
}

在最后一行打一个断点,开始测试:

这里的代码,首先Lua只是定义了一个简单的字符串,然后就返回了,而返回类型则定义为宇符串。这里必须定义返回类型,否则对于Spring不会把脚本执行的结果返回。接着获取了由RedisTemplate自动创建的字符串序列化器,而后使用RedisTemplate的execute方法执行了脚本。在RedisTemplate中,execute方法执行脚本的方法有两种,其定义如下:

1
2
3
4
5
6
7
public <T> T execute(RedisScript<T> script, List<K> keys, Object... args) {
return this.scriptExecutor.execute(script, keys, args);
}

public <T> T execute(RedisScript<T> script, RedisSerializer<?> argsSerializer, RedisSerializer<T> resultSerializer, List<K> keys, Object... args) {
return this.scriptExecutor.execute(script, argsSerializer, resultSerializer, keys, args);
}

在这两个方法中,从参数的名称可以知道,script就是我们定义的RedisScript接口对象,keys代表Redis的键,args是这段脚本的参数。两个方法最大区别是一个存在序列化器的参数,另外个不存在。对于不存在序列化参数的方法,Spring将采用RedisTemplate提供的valueSerializer序列化器对传递的键和参进行序列化。这里我们采用了第二个方法调度脚本,并且设置为字符串序列化器,其中第一个序列化器是键的序列化器,二个是参数序列化器,这样键和参数就在字符串序列化器下被序列化了。

下面我们再来测试一下带参数的Lua吧:

这里的脚本中使用了两个键去保存两个参数,然后对这两个参数进行比较,如果相等则返回],否则返回0。注意脚本中kYS[l]和kYS[2]的写法,它们表客户端传递的第一个键和第二键,而ARGV[1]和ARGV[2则表示客户端传递第一个和第二个参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Test
void test2(){
String key1="this",key2="like";
String value1="not",value2="yourself";


String lua="redis.call('set',KEYS[1],ARGV[1]) \n"
+"redis.call('set',KEYS[2],ARGV[2]) \n"
+"local str1=redis.call('get',KEYS[1]) \n"
+"local str2=redis.call('get',KEYS[2]) \n"
+"if str1==str2 then \n"
+"return 1 \n"
+"end \n"
+"return 0 \n";
System.out.println(lua);

//返回结果为Long
DefaultRedisScript<Long> redisScript=new DefaultRedisScript<Long>();
redisScript.setScriptText(lua);
redisScript.setResultType(Long.class);
//采用字符串序列化器
RedisSerializer<String> stringRedisSerializer=redisTemplate.getStringSerializer();
//定义key参数
List<String> keyList =new ArrayList<>();
keyList.add(key1);
keyList.add(key2);
//传递两个参数,第一个序列化器是key的序列化器
//第二个是参数的序列化器
Long result=(Long) redisTemplate.execute
(redisScript,stringRedisSerializer,stringRedisSerializer,keyList,value1,value2);
}

这里使用keyList保存了各个键,然后通过Redis的execute方法传递,参数则可以使用可变化的方式传递,且设置了给键和参数的序列化器都为字符串序列化器,这样便能够运行这段脚本了。我们的脚本返回为一个数字,这里值得注意的是,因为Java会把整数当作长整型(Long),所以这里返回值设置为Long型。

测试结果: