Spring整合Redis

官方文档地址:https://docs.spring.io/spring-data/data-redis/docs/current/reference/html/#why-spring-redis

Spring Redis 需要Redis 2.6或更高版本,Spring Data Redis与两个流行的Redis开源Java库LettuceJedis集成 ,本文使用Lettuce客户端

 

环境 Springboot 2.1.0.RELEASE

步骤:

1、添加jar包:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

  commons-pool2:

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.9.0</version>
        </dependency>

  使用连接池时需自行引入commons-pool2包:

 

  spring-boot-starter-data-redis中包含的依赖:

<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>2.1.0.RELEASE</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-redis</artifactId>
      <version>2.1.2.RELEASE</version>
      <scope>compile</scope>
      <exclusions>
        <exclusion>
          <artifactId>jcl-over-slf4j</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>io.lettuce</groupId>
      <artifactId>lettuce-core</artifactId>
      <version>5.1.2.RELEASE</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>
View Code

 

 Spring默认为我们注入了RedisTemplateStringRedisTemplate ,如果我们没有手动注入相同名字的bean的话

  RedisTemplate默认的keyvaluehashKeyhashValue序列化方式都为JdkSerializationRedisSerializer,即二进制序列化方式

  StringRedisTemplate 所有的序列化方式都为RedisSerializer.string(),即String

@Configuration
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean(name = "redisTemplate")
    public RedisTemplate<Object, Object> redisTemplate(
            RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean
    public StringRedisTemplate stringRedisTemplate(
            RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

}

  Springboot 2.1.0.RELEASE 默认的Redis客户端为 Lettuce,默认的连接工厂为LettuceConnectionFactory:

  org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration#redisConnectionFactory

    @Bean
    @ConditionalOnMissingBean(RedisConnectionFactory.class)
    public LettuceConnectionFactory redisConnectionFactory(
            ClientResources clientResources) throws UnknownHostException {
        LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(
                clientResources, this.properties.getLettuce().getPool());
        return createLettuceConnectionFactory(clientConfig);
    } 

  另外,Spring Data Redis提供了如下的ConnectionFactory:

JedisConnectionFactory 使用Jedis作为Redis的客户端
JredisConnectionFactory 使用Jredis作为Redis的客户端
LettuceConnectionFactory 使用Letture作为Redis的客户端
SrpConnectionFactory

使用Spullara/redis-protocol作为Redis的客户端

 

 Spring-data-redis提供的序列化方式

  

  对于字符串,我们希望key,value序列化方式都为String,但是对于Hash,key的序列化方式为String,但是value的序列化方式

我们希望为JSON。所以我们需要自己配置RedisTemplate并注入到Spring容器中:

 

一、单点方案

 

2、自定义配置 RedisTemplate

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;

    /**
     * Redis配置类
     *
     * @author yangyongjie
     * @date 2019/10/29
     * @desc
     */
    @Configuration
    public class RedisConfig {

        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            // 序列化方式全部为String
            redisTemplate.setKeySerializer(RedisSerializer.string());
            redisTemplate.setValueSerializer(RedisSerializer.string());
            redisTemplate.setHashKeySerializer(RedisSerializer.string());
            // hash value序列化方式为JSON
            // Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            // redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
            redisTemplate.setHashValueSerializer(RedisSerializer.json());
            return redisTemplate;
        }
    }

 

3、redis.properties中Redis连接相关的配置

#redis config
# Redis服务器地址
redis.host=127.0.0.1
# Redis服务器连接端口
redis.port=6379
redis.password=
# 连接池最大连接数(使用负值表示没有限制)
redis.pool.maxActive=100
# 连接池没有空闲连接时最大阻塞等待时间(使用负值表示没有限制)
redis.pool.maxWait=30000
# 连接池中的最大空闲连接,达到后pool会开始回收空闲连接,直到空闲连接数达到Mindle个数。 主要避免空连接占用,资源浪费
redis.pool.maxIdle=30
# 连接池中的最小空闲连接,这部分不被回收。可防止流量增量时,连接创建不及时
redis.pool.minIdle=10
# 连接超时时间(毫秒)
redis.timeout=5000

#集群配置
redis.cluster.nodes=127.0.0.1:6379,127.0.0.1:6479
redis.cluster.maxRedirects=3

4、

  方式1:在application.properties中配置Redis连接信息

#Redis相关配置
# Redis服务器地址
spring.redis.host=${redis.host}
# Redis服务器连接端口
spring.redis.port=${redis.port}
# Redis服务器密码
spring.redis.password=${redis.password}
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=${redis.pool.max-active}
# 连接池没有空闲连接时最大阻塞等待时间(使用负值表示没有限制)
spring.redis.lettuce.pool.max-wait=${redis.pool.max-wait}
# 连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=${redis.pool.max-idle}
# 连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=${redis.pool.min-idle}
# 连接超时时间(毫秒)
spring.redis.timeout=${redis.timeout}

  这里的配置最终会加载到RedisProperties中。

  RedisProperties

@ConfigurationProperties(prefix = "spring.redis")
public class RedisProperties {
    private int database = 0; // Database index used by the connection factory
    private String url; // 连接URL,重写host,post和password,忽略用户名,如:redis://user:password@example.com:6379
    private String host = "localhost"; // Redis server host
    private String password; // Login password of the redis server
    private int port = 6379; // Redis server port.
    private boolean ssl; // Whether to enable SSL support.
    private Duration timeout; //Connection timeout.
    private Sentinel sentinel;
    private Cluster cluster;
    private final Jedis jedis = new Jedis();
    private final Lettuce lettuce = new Lettuce();
  // getter and setter...
    public static class Pool { // 连接池配置属性
        private int maxIdle = 8; //  连接池内空闲连接的最大数量,使用负值表示没有限制
        private int minIdle = 0; // 连接池内维护的最小空闲连接数,值为正时才有效
        private int maxActive = 8; // 给定时间连接池内最大连接数,使用负值表示没有限制
        private Duration maxWait = Duration.ofMillis(-1); // 当池耗尽时,在抛出异常之前,连接分配应阻塞的最长时间。使用负值可无限期阻止
     // getter and setter...
    }
    // Cluster properties.
    public static class Cluster {
        private List<String> nodes; // 逗号分隔的"host:port"列表,至少要有一个
        private Integer maxRedirects; // 在集群中执行命令时要遵循的最大重定向数
     // getter and setter... } // Redis sentinel properties. public static class Sentinel { private String master; // Name of the Redis server. private List<String> nodes; // Comma-separated list of "host:port" pairs.      // getter and setter... } // Jedis client properties. public static class Jedis { private Pool pool; // Jedis pool configuration. // getter and setter... } // Lettuce client properties. public static class Lettuce { private Duration shutdownTimeout = Duration.ofMillis(100); // Shutdown timeout. private Pool pool; // Lettuce pool configuration.     // getter and setter... } }

 

  方式2:不在application.properties中配置,使用redis.properties,自行配置连接工厂

  1)自定义RedisProperties:

/**
 * redis.properties 属性配置映射
 *
 * @Value() 注解取不到值会报错,因此实际按照redis.properties配置的属性来定义 RedisProperties中的属性
 */
@Component
@PropertySource("classpath:redis.properties")
//@ConfigurationProperties(prefix = "redis", ignoreInvalidFields = true)  // 如果是springboot,可以此注解进行属性映射(@Value则可去掉)
public class RedisProperties {
    @Value("${redis.database}")
    private int database;
    @Value("${redis.host}")
    private String host;
    @Value("${redis.password}")
    private String password;
    @Value("${redis.port}")
    private int port;

    @Value("${redis.timeout}")
    private Long timeout;

    @Value("${redis.pool.maxIdle}")
    private int maxIdle;
    @Value("${redis.pool.minIdle}")
    private int minIdle;
    @Value("${redis.pool.maxActive}")
    private int maxActive;
    @Value("${redis.pool.maxWait}")
    private long maxWait;


    @Value("${redis.cluster.nodes}")
    private List<String> clusterNodes; // host:port,逗号隔开


    @Value("${redis.sentinel.master}")
    private String master;
    @Value("${redis.sentinel.nodes}")
    private List<String> sentinelNodes;

}

  或: 

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@PropertySource("classpath:redis.properties")
@ConfigurationProperties(prefix = "redis")
public class RedisProperties {

    private int database;
    private String host;
    private String password;
    private int port;
    private Long timeout;

    private final Pool pool = new Pool();
    private final Cluster cluster = new Cluster();


    /**
     * 连接池配置属性
     */
    public static class Pool {
        private int maxIdle = 8; //  连接池内空闲连接的最大数量,使用负值表示没有限制
        private int minIdle = 0; // 连接池内维护的最小空闲连接数,值为正时才有效
        private int maxActive = 8; // 给定时间连接池内最大连接数,使用负值表示没有限制
        private long maxWait = -1; // 当池耗尽时,在抛出异常之前,连接分配应阻塞的最长时间。使用负值可无限期阻止

        public int getMaxIdle() {
            return maxIdle;
        }

        public void setMaxIdle(int maxIdle) {
            this.maxIdle = maxIdle;
        }

        public int getMinIdle() {
            return minIdle;
        }

        public void setMinIdle(int minIdle) {
            this.minIdle = minIdle;
        }

        public int getMaxActive() {
            return maxActive;
        }

        public void setMaxActive(int maxActive) {
            this.maxActive = maxActive;
        }

        public long getMaxWait() {
            return maxWait;
        }

        public void setMaxWait(long maxWait) {
            this.maxWait = maxWait;
        }
    }

    /**
     * Cluster properties
     */
    public static class Cluster {
        private List<String> nodes; // 逗号分隔的"host:port"列表,至少要有一个

        private Integer maxRedirects; // 在集群中执行命令时要遵循的最大重定向数

        public List<String> getNodes() {
            return nodes;
        }

        public void setNodes(List<String> nodes) {
            this.nodes = nodes;
        }

        public Integer getMaxRedirects() {
            return maxRedirects;
        }

        public void setMaxRedirects(Integer maxRedirects) {
            this.maxRedirects = maxRedirects;
        }
    }

    public int getDatabase() {
        return database;
    }

    public void setDatabase(int database) {
        this.database = database;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public Long getTimeout() {
        return timeout;
    }

    public void setTimeout(Long timeout) {
        this.timeout = timeout;
    }

    public Pool getPool() {
        return pool;
    }

    public Cluster getCluster() {
        return cluster;
    }
}
View Code

 

  2)RedisConfig 中客户端连接工厂配置:

  @Autowired
    private RedisProperties redisProperties;

    /**
     * redis 客户端连接工厂
     */
    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {
        // 单机服务配置
        RedisStandaloneConfiguration serverConfig = new RedisStandaloneConfiguration();
        // 主机、用户名、密码、端口、数据库
        serverConfig.setHostName(redisProperties.getHost());
        serverConfig.setPassword(redisProperties.getPassword());
        serverConfig.setPort(redisProperties.getPort());
        // 连接池信息
        GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
        poolConfig.setMaxIdle(redisProperties.getMaxIdle());
        poolConfig.setMinIdle(redisProperties.getMinIdle());
        poolConfig.setMaxTotal(redisProperties.getMaxActive());
        poolConfig.setMaxWaitMillis(redisProperties.getMaxWait());

        // 连接池客户端配置建造者
        LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder lettucePoolClientConfBuilder = LettucePoolingClientConfiguration.builder();
        lettucePoolClientConfBuilder.commandTimeout(Duration.ofMillis(redisProperties.getTimeout()));
        lettucePoolClientConfBuilder.poolConfig(poolConfig);

        // 客户端配置
        LettuceClientConfiguration clientConfig = lettucePoolClientConfBuilder.build();

        return new LettuceConnectionFactory(serverConfig, clientConfig);
    } 

   

  注意:在使用RedisTemplate时,每次执行完毕会自动释放连接;当我们自己封装工具类使用连接池时,切记执行完毕手动释放连接,否则会造成连接池中没有空闲连接。

 

5、Redis工具类

主要的数据访问方法:

opsForValue() 操作只有简单属性的数据
opsForList() 操作含有list的数据
opsForSet() 操作含有set的数据
opsForZSet() 操作含有ZSet(有序集合)的数据
opsForHash() 操作含有hash的数据
opsForStream() 操作Stream

 

RedisUtil工具类的方法全部设为静态方法,这样直接在代码中使用RedisUtil.xxx调用即可,不需要注入依赖

RedisUtil:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.connection.stream.StringRecord;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.core.types.Expiration;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * Redis工具类
 *
 * @author yangyongjie
 * @date 2020/2/15
 */
public class RedisUtil {

    private RedisUtil() {
    }

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisUtil.class);

    private static RedisTemplate<String, Object> redisTemplate;

    public static void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        RedisUtil.redisTemplate = redisTemplate;
    }


    /**
     * 为给定 key 设置生存时间
     *
     * @param key  键
     * @param time 时间(秒)
     * @return 设置成功返回 1 ;当 key 不存在或者不能为 key 设置生存时间时返回 0
     */
    public static Boolean expire(String key, long time) {
        try {
            return redisTemplate.expire(key, time, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.error("expire error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 以秒为单位,返回给定 key 的剩余生存时间(TTL, time to live)
     * 当 key 不存在时,返回 -2 。
     * 当 key 存在但没有设置剩余生存时间时,返回 -1
     *
     * @param key 键 不能为null
     */
    public static Long ttl(String key) {
        try {
            return redisTemplate.getExpire(key, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.error("ttl error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 检查给定 key 是否存在
     * 时间复杂度:O(1)
     *
     * @param key 键
     * @return true 存在 false不存在
     */
    public static Boolean exists(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            LOGGER.error("key exists error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 删除给定的一个key
     * 不存在的 key 会被忽略
     * 时间复杂度:O(N), N 为被删除的 key 的数量
     *
     * @param key 要删除的key
     * @return 是否删除成功
     */
    public static Boolean del(String key) {
        try {
            return redisTemplate.delete(key);
        } catch (Exception e) {
            LOGGER.error("del single key error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 删除给定的一个或多个 key
     *
     * @param keys
     * @return 被删除 key 的数量
     */
    public static Long del(Collection<String> keys) {
        try {
            return redisTemplate.delete(keys);
        } catch (Exception e) {
            LOGGER.error("del multi key error: " + e.getMessage(), e);
            return 0L;
        }
    }


// ========================================== String ============================================

    /**
     * 返回 key 所关联的字符串值
     *
     * @param key 键
     * @return 当 key 不存在时,返回 nil ,否则,返回 key 的值
     */
    public static String get(String key) {
        if (key == null) {
            return null;
        }
        try {
            Object value = redisTemplate.opsForValue().get(key);
            return value == null ? null : String.valueOf(value);
        } catch (Exception e) {
            LOGGER.error("get error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 将字符串值 value 关联到 key
     * 如果 key 已经持有其他值, SET 就覆写旧值,无视类型
     * 对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除
     *
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */
    public static boolean set(String key, String value) {
        try {
            return redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] byteKey = redisTemplate.getStringSerializer().serialize(key);
                    byte[] byteValue = redisTemplate.getStringSerializer().serialize(value);
                    return connection.set(byteKey, byteValue);
                }
            });
        } catch (Exception e) {
            LOGGER.error("set error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * set字符串值并设置有效期,具有原子性
     * 如果 key 已经存在, SETEX 命令将覆写旧值
     *
     * @param key
     * @param value
     * @param seconds
     */
    public static Boolean setex(String key, String value, long seconds) {
        try {
            return redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] byteKey = redisTemplate.getStringSerializer().serialize(key);
                    byte[] byteValue = redisTemplate.getStringSerializer().serialize(value);
                    return connection.setEx(byteKey, seconds, byteValue);
                }
            });
        } catch (Exception e) {
            LOGGER.error("setex error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 将 key 中储存的数字值增一
     * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作
     *
     * @param key 键
     * @return 执行 INCR 命令之后 key 的值,如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误
     */
    public static Long incr(String key) {
        try {
            return redisTemplate.opsForValue().increment(key);
        } catch (Exception e) {
            LOGGER.error("incr error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 将 key 所储存的值加上增量 delta
     * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCRBY 命令
     *
     * @param key   键
     * @param delta 增量
     * @return 加上 increment 之后, key 的值
     */
    public static Long incrby(String key, long delta) {
        try {
            return redisTemplate.opsForValue().increment(key, delta);
        } catch (Exception e) {
            LOGGER.error("incrby error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 将 key 中储存的数字值减一
     * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 DECR 操作
     *
     * @param key 键
     * @return 执行 DECR 命令之后 key 的值
     */
    public static Long decr(String key) {
        try {
            return redisTemplate.opsForValue().decrement(key);
        } catch (Exception e) {
            LOGGER.error("decr error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 将 key 所储存的值减去减量 delta
     * 如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 DECRBY 操作
     *
     * @param key   键
     * @param delta 减量
     * @return
     */
    public static Long decrby(String key, long delta) {
        try {
            return redisTemplate.opsForValue().decrement(key, delta);
        } catch (Exception e) {
            LOGGER.error("decrby error: " + e.getMessage(), e);
            return null;
        }
    }


// ========================================== Hash ============================================

    /**
     * 返回哈希表 key 中给定域 hashKey 的值
     *
     * @param key     保存Hash的key
     * @param hashKey Hash内的key
     * @return 给定域的值。当给定域不存在或是给定 key 不存在时,返回 nil
     */
    public static Object hget(String key, String hashKey) {
        try {
            return redisTemplate.opsForHash().get(key, hashKey);
        } catch (Exception e) {
            LOGGER.error("hget error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 返回哈希表 key 中,一个或多个给定域hashKeys的值
     *
     * @param key      保存Hash的key
     * @param hashKeys Hash内的keys
     * @return 一个包含多个给定域的关联值的表,表值的排列顺序和给定域参数的请求顺序一样
     */
    public static Object hmget(String key, Collection<Object> hashKeys) {
        try {
            return redisTemplate.opsForHash().multiGet(key, hashKeys);
        } catch (Exception e) {
            LOGGER.error("hmget error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 返回哈希表 key 中,所有的域和值
     *
     * @param key 保存Hash的key
     * @return 对应的多个键值
     */
    public static Map<Object, Object> hgetall(String key) {
        try {
            return redisTemplate.opsForHash().entries(key);
        } catch (Exception e) {
            LOGGER.error("hgetall error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 将哈希表 key 中的域 hashKey 的值设为 value
     * 如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。
     * 如果域 field 已经存在于哈希表中,旧值将被覆盖
     *
     * @param key
     * @param hashKey
     * @param value
     * @return true 成功 false失败
     */
    public static Boolean hset(String key, String hashKey, Object value) {
        try {
            redisTemplate.opsForHash().put(key, hashKey, value);
            return true;
        } catch (Exception e) {
            LOGGER.error("hset error: " + e.getMessage(), e);
            return false;
        }
    }


    /**
     * 将哈希表 key 中的域 hashKey 的值设为 value
     * 如果 key 不存在,一个新的哈希表被创建并进行 HSET 操作。
     * 如果域 field 已经存在于哈希表中,旧值将被覆盖
     *
     * @param key     键
     * @param hashKey 项
     * @param value   值
     * @param time    时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public static Boolean hset(String key, String hashKey, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, hashKey, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            LOGGER.error("hset and expire error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 同时将多个 field-value (域-值)对设置到哈希表 key 中
     *
     * @param key 键
     * @param map 对应多个键值
     * @return true 成功 false 失败
     */
    public static Boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            LOGGER.error("hmset error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 同时将多个 field-value (域-值)对设置到哈希表 key 中,并为整个哈希表设置有效期,不具有原子性
     *
     * @param key  键
     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public static Boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            LOGGER.error("hmset and expire error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 删除哈希表 key 中的一个或多个指定域,不存在的域将被忽略
     *
     * @param key
     * @param hashKeys
     * @return 被成功移除的域的数量,不包括被忽略的域
     */
    public static Long hdel(String key, Object... hashKeys) {
        try {
            return redisTemplate.opsForHash().delete(key, hashKeys);
        } catch (Exception e) {
            LOGGER.error("hdel error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 查看哈希表 key 中,给定域 field 是否存在
     *
     * @param key     键 不能为null
     * @param hashKey 项 不能为null
     * @return 如果哈希表含有给定域,返回true ;如果哈希表不含有给定域,或 key 不存在,返回 false
     */
    public static Boolean hexists(String key, String hashKey) {
        try {
            return redisTemplate.opsForHash().hasKey(key, hashKey);
        } catch (Exception e) {
            LOGGER.error("hexists error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 为哈希表 key 中的域 hashKey 的值加上增量 delta
     * 增量也可以为负数,相当于对给定域进行减法操作
     * 如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令
     * 如果域 hashKey 不存在,那么在执行命令前,域的值被初始化为 0
     *
     * @param key
     * @param hashKey
     * @param delta
     * @return 哈希表 key 中域 hashKey 的值
     */
    public static Double hincrby(String key, String hashKey, double delta) {
        try {
            return redisTemplate.opsForHash().increment(key, hashKey, delta);
        } catch (Exception e) {
            LOGGER.error("hincrby error: " + e.getMessage(), e);
            return 0d;
        }
    }


    // ========================================== list ============================================

    /**
     * 返回列表 key 中指定区间内的元素,区间以偏移量 start 和 stop 指定
     *
     * @param key   列表键
     * @param start 开始 0代表第一个元素,1 表示列表的第二个元素,以此类推
     * @param end   结束 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推
     * @return 一个列表,包含指定区间内的元素
     */
    public static List<Object> lrange(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            LOGGER.error("lrange error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 返回列表 key 的长度
     * 如果 key 不存在,则 key 被解释为一个空列表,返回 0
     *
     * @param key 列表键
     * @return 列表 key 的长度
     */
    public static Long llen(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            LOGGER.error("llen error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 返回列表 key 中,下标为 index 的元素
     *
     * @param key   键
     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     * @return
     */
    public static Object lindex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            LOGGER.error("lindex error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 将一个值 value 插入到列表 key 的表尾(最右边)
     * 如果 key 不存在,一个空列表会被创建并执行 RPUSH 操作
     *
     * @return 执行 RPUSH 操作后,表的长度
     */
    public static Long rpush(String key, Object value) {
        try {
            return redisTemplate.opsForList().rightPush(key, value);
        } catch (Exception e) {
            LOGGER.error("rpush error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 将一个或多个值 value 插入到列表 key 的表尾(最右边)
     * 如果 key 不存在,一个空列表会被创建并执行 RPUSH 操作
     *
     * @return 执行 RPUSH 操作后,表的长度
     */
    public static Long rpush(String key, List<Object> value) {
        try {
            return redisTemplate.opsForList().rightPushAll(key, value);
        } catch (Exception e) {
            LOGGER.error("rpush multi error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 将一个或多个值 value 插入到列表 key 的表尾(最右边)
     * 如果 key 不存在,一个空列表会被创建并执行 RPUSH 操作
     * 并为list设置有效期,非原子性
     *
     * @param time 时间(秒)
     * @return 执行 RPUSH 操作后,表的长度
     */
    public static Boolean rpush(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            LOGGER.error("rpush multi and expire error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 移除并返回列表 key 的头元素(左头元素)
     *
     * @param key
     * @return 列表的头元素,当 key 不存在时,返回 null
     */
    public static Object lpop(String key) {
        try {
            return redisTemplate.opsForList().leftPop(key);
        } catch (Exception e) {
            LOGGER.error("lpop error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 将一个值 value 插入到列表 key 的表头(最左边)
     * 如果 key 不存在,一个空列表会被创建并执行 RPUSH 操作
     *
     * @return 执行 RPUSH 操作后,表的长度
     */
    public static Long lpush(String key, Object value) {
        try {
            return redisTemplate.opsForList().leftPush(key, value);
        } catch (Exception e) {
            LOGGER.error("lpush error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 将一个或多个值 value 插入到列表 key 的表头(最左边)
     * 如果 key 不存在,一个空列表会被创建并执行 RPUSH 操作
     *
     * @return 执行 RPUSH 操作后,表的长度
     */
    public static Long lpush(String key, List<Object> value) {
        try {
            return redisTemplate.opsForList().leftPushAll(key, value);
        } catch (Exception e) {
            LOGGER.error("lpush multi error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 将一个或多个值 value 插入到列表 key 的表头(最左边)
     * 如果 key 不存在,一个空列表会被创建并执行 RPUSH 操作
     * 并为list设置有效期,非原子性
     *
     * @param time 时间(秒)
     * @return 执行 RPUSH 操作后,表的长度
     */
    public static Boolean lpush(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().leftPushAll(key, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            LOGGER.error("lpush multi and expire error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 移除并返回列表 key 的尾元素(最右元素)
     *
     * @param key
     * @return 列表的头元素,当 key 不存在时,返回 null
     */
    public static Object rpop(String key) {
        try {
            return redisTemplate.opsForList().rightPop(key);
        } catch (Exception e) {
            LOGGER.error("rpop error: " + e.getMessage(), e);
            return null;
        }
    }


    /**
     * 将列表 key 下标为 index 的元素的值设置为 value
     *
     * @param key   键
     * @param index 索引
     * @param value 值
     * @return
     */
    public static Boolean lset(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            LOGGER.error("lset error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 根据参数 count 的值,移除列表中与参数 value 相等的元素
     * count > 0 : 从表头开始向表尾搜索,移除与 value 相等的元素,数量为 count
     * count < 0 : 从表尾开始向表头搜索,移除与 value 相等的元素,数量为 count 的绝对值
     * count = 0 : 移除表中所有与 value 相等的值
     *
     * @param key   键
     * @param count 移除多少个
     * @param value 移除的元素值
     * @return 被移除元素的数量, 因为不存在的 key 被视作空表(empty list),所以当 key 不存在时, LREM 命令总是返回 0
     */
    public static Long lrem(String key, long count, Object value) {
        try {
            return redisTemplate.opsForList().remove(key, count, value);
        } catch (Exception e) {
            LOGGER.error("lrem error: " + e.getMessage(), e);
            return 0L;
        }
    }


    // ========================================== set ============================================

    /**
     * 返回集合 key 中的所有成员
     *
     * @param key
     * @return 集合中的所有成员
     */
    public static Set<Object> smembers(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            LOGGER.error("smembers error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 判断 value 元素是否集合 key 的成员
     *
     * @param key   键
     * @param value 值
     * @return true 是 false不是或key不存在
     */
    public static Boolean sismember(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            LOGGER.error("sismember error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 将一个或多个元素加入到集合 key 当中,已经存在于集合的元素将被忽略
     *
     * @param key
     * @param values
     * @return 被添加到集合中的新元素的数量,不包括被忽略的元素
     */
    public static Long sadd(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            LOGGER.error("sadd error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 将一个或多个元素加入到集合 key 当中,已经存在于集合的元素将被忽略
     * 并设置有效期
     *
     * @param key    键
     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 被添加到集合中的新元素的数量,不包括被忽略的元素
     */
    public static Long sadd(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0) {
                expire(key, time);
            }
            return count;
        } catch (Exception e) {
            LOGGER.error("sadd and expire error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 返回集合 key 的基数(集合中元素的数量)
     *
     * @param key
     * @return 集合的基数;当 key 不存在时,返回 0
     */
    public static Long scard(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            LOGGER.error("scard error: " + e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * 移除集合 key 中的一个或多个 值为value的 元素,不存在的元素会被忽略
     *
     * @param key    键
     * @param values 值 可以是多个
     * @return 被成功移除的元素的数量,不包括被忽略的元素
     */
    public static Long srem(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().remove(key, values);
        } catch (Exception e) {
            LOGGER.error("srem error: " + e.getMessage(), e);
            return 0L;
        }
    }


    // ==================================== sorted set (zset) =====================================

    /**
     * 将一个元素及其 score 值加入到有序集 key 当中
     * 如果元素已经是有序集的成员,那么更新这个元素的score值,并通过重新插入这个元素,来保证该元素在正确的位置上
     * 如果 key 不存在,则创建一个空的有序集并执行 ZADD 操作
     *
     * @param key
     * @param value
     * @param score score 值可以是整数值或双精度浮点数
     * @return 被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员
     */
    public static Boolean zadd(String key, String value, double score) {
        try {
            return redisTemplate.opsForZSet().add(key, value, score);
        } catch (Exception e) {
            LOGGER.error("zadd error: " + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 返回有序集 key 中,指定区间内的成员
     * 其中成员的位置按 score 值递增(从小到大)来排序
     *
     * @param key
     * @param start 以 0 为底,0表示第一个元素,-1表示最后一个元素,-2表示倒数第二个元素
     * @param end   以 0 为底,0表示第一个元素,-1表示最后一个元素,-2表示倒数第二个元素
     * @return 指定区间内有序集成员的列表
     */
    public static Set<Object> zrange(String key, long start, long end) {
        try {
            return redisTemplate.opsForZSet().range(key, start, end);
        } catch (Exception e) {
            LOGGER.error("zrange error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 返回有序集 key 中,指定区间内的成员
     * 其中成员的位置按 score 值递减(从大到小)来排列
     *
     * @param key
     * @param start 以 0 为底,0表示第一个元素,-1表示最后一个元素,-2表示倒数第二个元素
     * @param end   以 0 为底,0表示第一个元素,-1表示最后一个元素,-2表示倒数第二个元素
     * @return 指定区间内,带有 score 值(可选)的有序集成员的列表
     */
    public static Set<Object> zrevrange(String key, long start, long end) {
        try {
            return redisTemplate.opsForZSet().reverseRange(key, start, end);
        } catch (Exception e) {
            LOGGER.error("zrevrange error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员
     * 有序集成员按 score 值递增(从小到大)次序排列
     *
     * @param key
     * @param min
     * @param max
     * @return 指定区间内的有序集成员的列表
     */
    public static Set<Object> zrangebyscore(String key, double min, double max) {
        try {
            return redisTemplate.opsForZSet().rangeByScore(key, min, max);
        } catch (Exception e) {
            LOGGER.error("zrangebyscore error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员
     * 有序集成员按 score 值递增(从小到大)次序排列
     *
     * @param key
     * @param min
     * @param max
     * @param offset 符合条件的初始偏移量
     * @param count  符合条件的列表数量
     * @return 指定区间内的有序集成员的列表
     */
    public static Set<Object> zrangebyscore(String key, double min, double max, long offset, long count) {
        try {
            return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count);
        } catch (Exception e) {
            LOGGER.error("zrangebyscore limit error: " + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 返回有序集 key 中,成员 value 的 score 值
     *
     * @param key
     * @param value
     * @return 成员的 score 值
     */
    public static Double zscore(String key, String value) {
        try {
            return redisTemplate.opsForZSet().score(key, value);
        } catch (Exception e) {
            LOGGER.error("zscore error: " + e.getMessage(), e);
            return 0d;
        }
    }

    /**
     * 移除有序集 key 中的一个或多个成员,不存在的成员将被忽略
     *
     * @param key
     * @param values
     * @return 被成功移除的成员的数量,不包括被忽略的成员
     */
    public static Long zrem(String key, Object... values) {
        try {
            return redisTemplate.opsForZSet().remove(key, values);
        } catch (Exception e) {
            LOGGER.error("zrem error: " + e.getMessage(), e);
            return null;
        }
    }


    // ========================================== lock ============================================

    /**
     * 将 key 的值设为 value ,当且仅当 key 不存在
     * 同redisTemplate.opsForValue().setIfAbsent()
     *
     * @param key
     * @param value
     * @return 拿到锁(设置key成功),返回true;否则,返回false
     */
    public static Boolean setnx(String key, String value) {
        try {
            return redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] keyBys = redisTemplate.getStringSerializer().serialize(key);
                    byte[] valBys = redisTemplate.getStringSerializer().serialize(value);
                    return connection.setNX(keyBys, valBys);
                }
            });
        } catch (Exception e) {
            LOGGER.error("setnx error:" + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 将 key 的值设为 value ,当且仅当 key 不存在
     *
     * @param key
     * @param value
     * @return 拿到锁(设置key成功),返回true;否则,返回false
     */
    public static Boolean setnx2(String key, String value) {
        try {
            return redisTemplate.opsForValue().setIfAbsent(key, value);
        } catch (Exception e) {
            LOGGER.error("setnx error:" + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 将 key 的值设为 value ,当且仅当 key 不存在,并设置有效期,具有原子性
     *
     * @param key
     * @param value
     * @param seconds
     * @return 拿到锁(设置key成功),返回true;否则,返回false
     */
    public static Boolean setnx(String key, String value, long seconds) {
        try {
            return redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] keyBys = redisTemplate.getStringSerializer().serialize(key);
                    byte[] valBys = redisTemplate.getStringSerializer().serialize(value);
                    return connection.set(keyBys, valBys, Expiration.seconds(seconds), RedisStringCommands.SetOption.SET_IF_ABSENT);
                }
            });
        } catch (Exception e) {
            LOGGER.error("setnx and expire error:" + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 将 key 的值设为 value ,当且仅当 key 不存在,并设置有效期,具有原子性
     *
     * @param key
     * @param value
     * @param seconds
     * @return 拿到锁(设置key成功),返回true;否则,返回false
     */
    public static Boolean setnx2(String key, String value, long seconds) {
        try {
            return redisTemplate.opsForValue().setIfAbsent(key, value, seconds, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.error("setnx and expire error:" + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 释放Redis锁
     * 使用lua脚本,确保判断是否是加锁人与删除锁的原子性
     *
     * @param lockKey   分布式锁key
     * @param lockValue 分布式锁value
     * @return
     */
    public static Boolean unlock(String lockKey, String lockValue) {
        // 脚本,保证原子性,先判断分布式锁的值是否匹配,匹配再执行删除锁
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        try {
            RedisScript<Long> redisScript = RedisScript.of(script, Long.class);
            Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), lockValue);
            return result == 1;
        } catch (Exception e) {
            LOGGER.error("unlock error:" + e.getMessage(), e);
            return false;
        }
    }


    // ========================================== pub/script/stream ============================================

    /**
     * 发布消息
     *
     * @param channel
     * @param message
     */
    public static void publish(String channel, String message) {
        try {
            redisTemplate.convertAndSend(channel, message);
        } catch (Exception e) {
            LOGGER.error("publish error:" + e.getMessage(), e);
        }
    }

    /**
     * 执行lua脚本
     *
     * @param script     要运行脚本
     * @param resultType 运行返回结果类型
     * @param keys       脚本的key列表参数
     * @param args       脚本的参数
     * @param <T>        返回类型泛型
     * @return
     */
    public static <T> T eval(String script, Class<T> resultType, List<String> keys, Object... args) {
        try {
            RedisScript<T> redisScript = RedisScript.of(script, resultType);
            return redisTemplate.execute(redisScript, keys, args);
        } catch (Exception e) {
            LOGGER.error("eval script error:" + e.getMessage(), e);
            return null;
        }
    }
    
    /**
     * 执行lua脚本(给定lua脚本文件)
     * new ClassPathResource("xxx.lua")
     *
     * @param scriptResource 要运行脚本资源文件
     * @param resultType     运行返回结果类型
     * @param keys           脚本的key列表参数
     * @param args           脚本的参数
     * @param <T>            返回类型泛型
     * @return
     */
    public static <T> T eval(Resource scriptResource, Class<T> resultType, List<String> keys, Object... args) {
        try {
            RedisScript<T> redisScript = RedisScript.of(scriptResource, resultType);
            return redisTemplate.execute(redisScript, keys, args);
        } catch (Exception e) {
            LOGGER.error("eval script error:" + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 向流中追加记录,若流不存在,则创建
     *
     * @param record
     * @param streamKey
     * @return 追加消息的RecordId
     */
    public static RecordId xadd(Map<String, String> record, String streamKey) {
        try {
            StringRecord stringRecord = StreamRecords.string(record).withStreamKey(streamKey);
            // 刚追加记录的记录ID
            RecordId recordId = redisTemplate.opsForStream().add(stringRecord);
            LOGGER.info(recordId.getValue());
            return recordId;
        } catch (Exception e) {
            LOGGER.error("xadd error:" + e.getMessage(), e);
            return null;
        }
    }

    /**
     * 流消息消费确认
     *
     * @param groupName
     * @param record
     * @return 成功确认的消息数
     */
    public static Long xack(String groupName, Record record) {
        try {
            return redisTemplate.opsForStream().acknowledge(groupName, record);
        } catch (Exception e) {
            LOGGER.error("xack error:" + e.getMessage(), e);
            return 0L;
        }
    }

}
RedisUtil:

 

6、为RedisUtil 注入 RedisTemplate依赖

由于RedisUtil中依赖Spring容器中的bean RedisTemplate,因此,需要为RedisUtil注入 RedisTemplate的依赖。

  ①:采用在第三方专门初始化bean的类中,从spring容器中获取 name 为 redisTemplate的bean,然后赋值,代码如下:

import com.xxx.common.utils.RedisUtil;
import com.xxx.common.utils.ZKListenerUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 获取 application.properties中配置的属性
 *
 * @author yangyongjie
 * @date 2019/9/25
 * @desc
 */
@Component
public class CustomPropertyConfig implements ApplicationContextAware {

    private static ApplicationContext context;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        context = applicationContext;
    }

    /**
     * 获取配置的属性
     *
     * @param key
     * @return
     */
    public static String getproperties(String key) {
        if (StringUtils.isEmpty(key)) {
            return null;
        }
        Environment environment = context.getEnvironment();
        return environment.getProperty(key);
    }

    /**
     * 初始化ZK配置的属性
     */
    @PostConstruct
    public void initZKConfig() {
        ZKListenerUtil.loadZKConfig();
    }

    /**
     * 初始化redisTemplate
     */
    @PostConstruct
    public void initRedisTemplate() {
        RedisUtil.setRedisTemplate((RedisTemplate<String, Object>) context.getBean("redisTemplate"));
    }

}
View Code

   ②:直接在RedisConfig中注入 

 

 

 

Redis的一些方法说明:

SETEX:设置字符串的value并设置有效期,具有原子性。等价于:SET mykey value  EXPIRE mykey seconds

/**
     * set字符串值并设置有效期,具有原子性
     *
     * @param key
     * @param value
     * @param seconds
     */
    public static Boolean setex(String key, String value, long seconds) {
        return redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                try {
                    byte[] keyBys = redisTemplate.getStringSerializer().serialize(key);
                    byte[] valBys = redisTemplate.getStringSerializer().serialize(value);
                    return connection.setEx(keyBys, seconds, valBys);
                } finally {
                    connection.close();
                }
            }
        });
    }

 

SETNX:SET if Not eXists。当key已经存在时,什么都不做。

SET key value [EX seconds|PX milliseconds|KEEPTTL] [NX|XX] [GET]

  Options

    EX seconds -- Set the specified expire time, in seconds.    
    PX milliseconds -- Set the specified expire time, in milliseconds.
    NX -- Only set the key if it does not already exist.
    XX -- Only set the key if it already exist.
    KEEPTTL -- Retain the time to live associated with the key.
    GET -- Return the old value stored at key, or nil when key did not exist.

官方文档:https://redis.io/commands/setnx 

 

redis分布式锁方法:

  1)加锁:

  /**
     * 将 key 的值设为 value ,当且仅当 key 不存在
     * 同redisTemplate.opsForValue().setIfAbsent()
     *
     * @param key
     * @param value 随机字符串或线程号等
     * @return 拿到锁(设置key成功),返回true;否则,返回false
     */
    public static Boolean setnx(String key, String value) {
        try {
            return redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] keyBys = redisTemplate.getStringSerializer().serialize(key);
                    byte[] valBys = redisTemplate.getStringSerializer().serialize(value);
                    return connection.setNX(keyBys, valBys);
                }
            });
        } catch (Exception e) {
            LOGGER.error("setnx error:" + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 将 key 的值设为 value ,当且仅当 key 不存在
     *
     * @param key
     * @param value
     * @return 拿到锁(设置key成功),返回true;否则,返回false
     */
    public static Boolean setnx2(String key, String value) {
        try {
            return redisTemplate.opsForValue().setIfAbsent(key, value);
        } catch (Exception e) {
            LOGGER.error("setnx error:" + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 将 key 的值设为 value ,当且仅当 key 不存在,并设置有效期,具有原子性
     *
     * @param key
     * @param value
     * @param seconds
     * @return 拿到锁(设置key成功),返回true;否则,返回false
     */
    public static Boolean setnx(String key, String value, long seconds) {
        try {
            return redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] keyBys = redisTemplate.getStringSerializer().serialize(key);
                    byte[] valBys = redisTemplate.getStringSerializer().serialize(value);
                    return connection.set(keyBys, valBys, Expiration.seconds(seconds), RedisStringCommands.SetOption.SET_IF_ABSENT);
                }
            });
        } catch (Exception e) {
            LOGGER.error("setnx and expire error:" + e.getMessage(), e);
            return false;
        }
    }

    /**
     * 将 key 的值设为 value ,当且仅当 key 不存在,并设置有效期,具有原子性
     *
     * @param key
     * @param value
     * @param seconds
     * @return 拿到锁(设置key成功),返回true;否则,返回false
     */
    public static Boolean setnx2(String key, String value, long seconds) {
        try {
            return redisTemplate.opsForValue().setIfAbsent(key, value, seconds, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.error("setnx and expire error:" + e.getMessage(), e);
            return false;
        }
    }

setIfAbsent 方法底层调用的分别是RedisConnection 的setNX 和set方法

    @Override
    public Boolean setIfAbsent(K key, V value) {

        byte[] rawKey = rawKey(key);
        byte[] rawValue = rawValue(value);
        return execute(connection -> connection.setNX(rawKey, rawValue), true);
    }

    /*
     * (non-Javadoc)
     * @see org.springframework.data.redis.core.ValueOperations#setIfAbsent(java.lang.Object, java.lang.Object, long, java.util.concurrent.TimeUnit)
     */
    @Override
    public Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit) {

        byte[] rawKey = rawKey(key);
        byte[] rawValue = rawValue(value);

        Expiration expiration = Expiration.from(timeout, unit);
        return execute(connection -> connection.set(rawKey, rawValue, expiration, SetOption.ifAbsent()), true);
    }

  

  2)解锁:

   使用lua脚本校验value确保加锁和解锁是同一线程操作(解铃还须系铃人)

  /**
     * 释放Redis锁
     * 使用lua脚本,确保判断是否是加锁人与删除锁的原子性
     *
     * @param lockKey   分布式锁key
     * @param lockValue 分布式锁value
     * @return
     */
    public static Boolean unlock(String lockKey, String lockValue) {
        // 脚本,保证原子性,先判断分布式锁的值是否匹配,匹配再执行删除锁
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        try {
            RedisScript<Long> redisScript = RedisScript.of(script, Long.class);
            Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), lockValue);
            return result == 1;
        } catch (Exception e) {
            LOGGER.error("unlock error:" + e.getMessage(), e);
            return false;
        }
    }

 

 

 

二、Cluster 集群方案

  方式1、在上述的基础之上,在 application.properties 中配置集群信息:

# 集群信息,host:port,多个之间以逗号分隔
spring.redis.cluster.nodes=host:port,host:port

  此方式依赖Springboot提供的RedisAutoConfiguration类上Import的LettuceConnectionConfiguration来实现的。

 

  方式2、在Springboot的application.properties中配置集群信息,然后手动创建RedisConnectionFactory

但是下面这种方式不能配置 连接池等其他信息(若要配置连接池,请使用可配值连接池的构造)。

@Component
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class ClusterConfigurationProperties {

    /*
     * spring.redis.cluster.nodes[0] = 127.0.0.1:7379
     * spring.redis.cluster.nodes[1] = 127.0.0.1:7380
     * ...
     */
    List<String> nodes;
  // getter and setter
}

@Configuration
public class RedisConfig {
    /**
     * Type safe representation of application.properties
     */
    @Autowired 
   ClusterConfigurationProperties clusterProperties;
public @Bean RedisConnectionFactory connectionFactory() { return new LettuceConnectionFactory( new RedisClusterConfiguration(clusterProperties.getNodes())); } }

补充:若要配置连接池信息,又不想使用自动配置,想手动配置RedisConnectionFactory ,可以参考LettuceConnectionConfiguration的代码,使用LettuceConnectionFactory的如下构造器

LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,LettuceClientConfiguration clientConfig)

  

  方式3、使用redis.properties的方式,在redis.properties中配置集群信息,然后手动创建RedisConnectionFactory(推荐使用,Spring/Springboot都支持)

     /**
     * redis 客户端连接工厂
     */
    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {
        // 集群redis连接工厂配置
        RedisClusterConfiguration serverConfig=new RedisClusterConfiguration(redisProperties.getCluster().getNodes());
        serverConfig.setPassword(redisProperties.getPassword());
        // 连接池信息
        GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
        poolConfig.setMaxIdle(redisProperties.getPool().getMaxIdle());
        poolConfig.setMinIdle(redisProperties.getPool().getMinIdle());
        poolConfig.setMaxTotal(redisProperties.getPool().getMaxActive());
        poolConfig.setMaxWaitMillis(redisProperties.getPool().getMaxWait());

        // 连接池客户端配置建造者
        LettucePoolingClientConfiguration.LettucePoolingClientConfigurationBuilder lettucePoolClientConfBuilder = LettucePoolingClientConfiguration.builder();
        lettucePoolClientConfBuilder.commandTimeout(Duration.ofMillis(redisProperties.getTimeout()));
        lettucePoolClientConfBuilder.poolConfig(poolConfig);

        // 客户端配置
        LettuceClientConfiguration clientConfig = lettucePoolClientConfBuilder.build();

        return new LettuceConnectionFactory(serverConfig, clientConfig);
    } 

  补充:若集群连接测试,只启动一个redis实例时 ,当有如下报错:

    1)ERR This instance has cluster support disabled

      修复方式:修改redis.conf,找到配置项 cluster-enable yes,去掉注释;然后重启redis服务器,redis-server redis.conf路径

    2)Cannot determine a partition for slot 153.

      修复方式:./redis-cli --cluster fix 127.0.0.1:6379 -a 密码

        Fix these slots by covering with a random node? (type 'yes' to accept):输入yes重新分配slots即可

 

 

 

三、Sentinel 哨兵方案 

  Spring Data Redis通过使用RedisSentinelConfiguration来支持哨兵模式

  如:

/**
 * Jedis
 */
@Bean
public RedisConnectionFactory jedisConnectionFactory() {
  RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
  .master("mymaster")
  .sentinel("127.0.0.1", 26379)
  .sentinel("127.0.0.1", 26380);
  return new JedisConnectionFactory(sentinelConfig);
}

/**
 * Lettuce
 */
@Bean
public RedisConnectionFactory lettuceConnectionFactory() {
  RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
  .master("mymaster")
  .sentinel("127.0.0.1", 26379)
  .sentinel("127.0.0.1", 26380);
  return new LettuceConnectionFactory(sentinelConfig);
}

或者直接在SpringBoot的application.properties中定义:

spring.redis.sentinel.master: name of the master node.
spring.redis.sentinel.nodes: Comma delimited list of host:port pairs.
spring.redis.sentinel.password: The password to apply when authenticating with Redis Sentinel

 通过使用下面方式访问第一个活动的Sentinel

RedisConnectionFactory.getSentinelConnection() or RedisConnection.getSentinelCommands()

 

 

发布/订阅 Pub/Sub

  发布订阅 的特点是订阅者(listener)负责订阅频道(channel),发送者(publisher)负责向频道发送二进制字符串消息(binary string message)。每当有消息被发送至给定频道时,频道的所有订阅者都会收到消息。(订阅者可以订阅多个频道,发送者可以在任何频道发送消息)

  发布订阅 依赖于即时消息的广播(即,如果没有听,则错过一条消息),没有对消息持久化。

  与Springboot集成:https://www.cnblogs.com/yangyongjie/p/14355833.html

 

Stream

  与Springboot集成:https://www.cnblogs.com/yangyongjie/p/14347781.html

 

Scripting

   Redis 2.6及更高版本通过eval和evalsha命令提供了对运行Lua脚本的支持。Spring Data Redis为正在运行的脚本提供了高级抽象,该脚本处理序列化并自动使用Redis脚本缓存。

  脚本可以通过调用运行execute的方法RedisTemplate和ReactiveRedisTemplate。两者都使用可配置的ScriptExecutor(或ReactiveScriptExecutor)运行提供的脚本。默认情况下,ScriptExecutor(或ReactiveScriptExecutor)负责序列化提供的键和参数并反序列化脚本结果。这是通过模板的键和值序列化程序完成的。还有一个额外的重载,可让您传递脚本参数和结果的自定义序列化程序。

  缺省值ScriptExecutor通过检索脚本的SHA1并尝试首先运行来优化性能,如果脚本未在Redis脚本缓存中存在,则evalsha返回默认值eval。(理想的是DefaultRedisScript在您的应用程序上下文中配置一个实例,以避免在每次运行脚本时重新计算脚本的SHA1)

 

  eval命令语法:

  EVAL script numkeys key [key ...] arg [arg ...]

  如:

if redis.call('GET', KEYS[1]) == ARGV[1]
  then redis.call('SET', KEYS[1], ARGV[2])
  return true
end
return false

 

  Springboot RedisUtil相关工具方法:

    /**
     * 执行lua脚本
     *
     * @param script     要运行脚本
     * @param resultType 运行返回结果类型
     * @param keys       脚本的key列表参数
     * @param args       脚本的参数
     * @param <T>        返回类型泛型
     * @return
     */
    public static <T> T eval(String script, Class<T> resultType, List<String> keys, Object... args) {
        try {
            RedisScript<T> redisScript = RedisScript.of(script, resultType);
            return redisTemplate.execute(redisScript, keys, args);
        } catch (Exception e) {
            LOGGER.error("eval script error:" + e.getMessage(), e);
            return null;
        }
    }

 

 

 

附录:

Jedis参数配置:

Jedis参数 默认值(jedis,非comnonpoolv2) 建议值 备注
timeout 2000 200~1000 Jedis的socket timeout值,单位毫秒;
maxRedirections 5 5 最大重定向次数;超过设置后,此抛出异常
MaxTotal 50 50~100 当前pool可并发的最大连接数;即pool v1中的maxActive相同。不宜设置过大,能很好对Redis有过载保护作用.如果实例独享,评估部署的java进程数,尽量控制Redis实例连接在5000以内(见Redis吞吐量与连接数的相关性)
MaxIdle 10 30~50 pool中最大的空闲连接数;达到后pool会开始回收空闲连接,直到空闲连接数达到Mindle个数。 主要避免空连接占用,资源浪费
MinIdle 5 10~20 pool中保持最小的空闲可用连接数,这部分不被回收。可防止流量增量时,连接创建不及时
TestWhileIdle true true 打开空闲连接存活和回收,周期性检测
TimeBetweenEvictionRunsMillis 30000 30000 (30秒) 空闲连接检测的周期;用于检测并回收空闲连接。每次通过PING命令检查连接是否可用。 和Redis Server timeout参数配合使用,避免server堆积死连接。
testOnReturn false false 连接在被归还给pool前,会验证连接的有效性,通过ping命令来检测; 为避免Ping命令过多,带来资源消耗或过多验证处理,影响jedis pool性能
testOnBorrow false false 连接从pool中获取,使用前会被验证;通过ping命令检测 ; 为避免Ping命令过多,带来资源消耗或过多验证处理,影响jedis pool性能.

 

LettuceConnectionConfiguration

/**
 * Redis connection configuration using Lettuce.
 *
 * @author Mark Paluch
 * @author Andy Wilkinson
 */
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RedisClient.class)
@ConditionalOnProperty(name = "spring.redis.client-type", havingValue = "lettuce", matchIfMissing = true)
class LettuceConnectionConfiguration extends RedisConnectionConfiguration {

    LettuceConnectionConfiguration(RedisProperties properties,
            ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider,
            ObjectProvider<RedisClusterConfiguration> clusterConfigurationProvider) {
        super(properties, sentinelConfigurationProvider, clusterConfigurationProvider);
    }

    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(ClientResources.class)
    DefaultClientResources lettuceClientResources() {
        return DefaultClientResources.create();
    }

    @Bean
    @ConditionalOnMissingBean(RedisConnectionFactory.class)
    LettuceConnectionFactory redisConnectionFactory(
            ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
            ClientResources clientResources) {
        LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,
                getProperties().getLettuce().getPool());
        return createLettuceConnectionFactory(clientConfig);
    }

    private LettuceConnectionFactory createLettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {
        if (getSentinelConfig() != null) {
            return new LettuceConnectionFactory(getSentinelConfig(), clientConfiguration);
        }
        if (getClusterConfiguration() != null) {
            return new LettuceConnectionFactory(getClusterConfiguration(), clientConfiguration);
        }
        return new LettuceConnectionFactory(getStandaloneConfig(), clientConfiguration);
    }

    private LettuceClientConfiguration getLettuceClientConfiguration(
            ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
            ClientResources clientResources, Pool pool) {
        LettuceClientConfigurationBuilder builder = createBuilder(pool);
        applyProperties(builder);
        if (StringUtils.hasText(getProperties().getUrl())) {
            customizeConfigurationFromUrl(builder);
        }
        builder.clientOptions(createClientOptions());
        builder.clientResources(clientResources);
        builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
        return builder.build();
    }

    private LettuceClientConfigurationBuilder createBuilder(Pool pool) {
        if (pool == null) {
            return LettuceClientConfiguration.builder();
        }
        return new PoolBuilderFactory().createBuilder(pool);
    }

    private LettuceClientConfigurationBuilder applyProperties(
            LettuceClientConfiguration.LettuceClientConfigurationBuilder builder) {
        if (getProperties().isSsl()) {
            builder.useSsl();
        }
        if (getProperties().getTimeout() != null) {
            builder.commandTimeout(getProperties().getTimeout());
        }
        if (getProperties().getLettuce() != null) {
            RedisProperties.Lettuce lettuce = getProperties().getLettuce();
            if (lettuce.getShutdownTimeout() != null && !lettuce.getShutdownTimeout().isZero()) {
                builder.shutdownTimeout(getProperties().getLettuce().getShutdownTimeout());
            }
        }
        if (StringUtils.hasText(getProperties().getClientName())) {
            builder.clientName(getProperties().getClientName());
        }
        return builder;
    }

    private ClientOptions createClientOptions() {
        ClientOptions.Builder builder = initializeClientOptionsBuilder();
        Duration connectTimeout = getProperties().getConnectTimeout();
        if (connectTimeout != null) {
            builder.socketOptions(SocketOptions.builder().connectTimeout(connectTimeout).build());
        }
        return builder.timeoutOptions(TimeoutOptions.enabled()).build();
    }

    private ClientOptions.Builder initializeClientOptionsBuilder() {
        if (getProperties().getCluster() != null) {
            ClusterClientOptions.Builder builder = ClusterClientOptions.builder();
            Refresh refreshProperties = getProperties().getLettuce().getCluster().getRefresh();
            Builder refreshBuilder = ClusterTopologyRefreshOptions.builder()
                    .dynamicRefreshSources(refreshProperties.isDynamicRefreshSources());
            if (refreshProperties.getPeriod() != null) {
                refreshBuilder.enablePeriodicRefresh(refreshProperties.getPeriod());
            }
            if (refreshProperties.isAdaptive()) {
                refreshBuilder.enableAllAdaptiveRefreshTriggers();
            }
            return builder.topologyRefreshOptions(refreshBuilder.build());
        }
        return ClientOptions.builder();
    }

    private void customizeConfigurationFromUrl(LettuceClientConfiguration.LettuceClientConfigurationBuilder builder) {
        ConnectionInfo connectionInfo = parseUrl(getProperties().getUrl());
        if (connectionInfo.isUseSsl()) {
            builder.useSsl();
        }
    }

    /**
     * Inner class to allow optional commons-pool2 dependency.
     */
    private static class PoolBuilderFactory {

        LettuceClientConfigurationBuilder createBuilder(Pool properties) {
            return LettucePoolingClientConfiguration.builder().poolConfig(getPoolConfig(properties));
        }

        private GenericObjectPoolConfig<?> getPoolConfig(Pool properties) {
            GenericObjectPoolConfig<?> config = new GenericObjectPoolConfig<>();
            config.setMaxTotal(properties.getMaxActive());
            config.setMaxIdle(properties.getMaxIdle());
            config.setMinIdle(properties.getMinIdle());
            if (properties.getTimeBetweenEvictionRuns() != null) {
                config.setTimeBetweenEvictionRunsMillis(properties.getTimeBetweenEvictionRuns().toMillis());
            }
            if (properties.getMaxWait() != null) {
                config.setMaxWaitMillis(properties.getMaxWait().toMillis());
            }
            return config;
        }

    }

}
View Code

 

IBM SpringBoot Redis文档:https://developer.ibm.com/zh/languages/spring/articles/know-redis-and-use-it-in-springboot-projects/ 

 

END.

posted @ 2019-10-29 16:30  杨岂  阅读(9015)  评论(0编辑  收藏  举报