redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
官网:https://github.com/redisson/redisson
Wiki中文:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
分布式锁
进入子项止  redisson-spring-boot-starter
Readme.md
添加依赖
| 12
 3
 4
 5
 
 | <dependency><groupId>org.redisson</groupId>
 <artifactId>redisson-spring-boot-starter</artifactId>
 <version>3.17.5</version>
 </dependency>
 
 | 
yml配置
方法1:使用spring boot的配置
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | spring:redis:
 database:
 host:
 port:
 password:
 ssl:
 timeout:
 cluster:
 nodes:
 sentinel:
 master:
 nodes:
 
 | 
方法2: 使用redisson的配置
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 
 | spring:redis:
 redisson:
 file: classpath:redisson.yaml
 config: |
 clusterServersConfig:
 idleConnectionTimeout: 10000
 connectTimeout: 10000
 timeout: 3000
 retryAttempts: 3
 retryInterval: 1500
 failedSlaveReconnectionInterval: 3000
 failedSlaveCheckInterval: 60000
 password: null
 subscriptionsPerConnection: 5
 clientName: null
 loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
 subscriptionConnectionMinimumIdleSize: 1
 subscriptionConnectionPoolSize: 50
 slaveConnectionMinimumIdleSize: 24
 slaveConnectionPoolSize: 64
 masterConnectionMinimumIdleSize: 24
 masterConnectionPoolSize: 64
 readMode: "SLAVE"
 subscriptionMode: "SLAVE"
 nodeAddresses:
 - "redis://127.0.0.1:7004"
 - "redis://127.0.0.1:7001"
 - "redis://127.0.0.1:7000"
 scanInterval: 1000
 pingConnectionInterval: 0
 keepAlive: false
 tcpNoDelay: false
 threads: 16
 nettyThreads: 32
 codec: !<org.redisson.codec.MarshallingCodec> {}
 transportMode: "NIO"
 
 | 
可用的spring bean
- RedissonClient
- RedissonRxClient
- RedissonReactiveClient
- RedisTemplate
- ReactiveRedisTemplate
单机配置及使用
| 12
 3
 4
 5
 6
 7
 8
 
 | spring:redis:
 database: 0
 host: 192.168.158.137
 port: 6379
 password: 123456
 ssl: ture
 timeout: 3000
 
 | 
| 12
 3
 4
 5
 6
 7
 
 | @Autowiredprivate RedissonClient redissonClient;
 
 RLock rLock = redissonClient.getLock("lock1");
 rLock.lock();
 
 rLock.unlock();
 
 | 
源码
| 12
 
 | getLock时使用的是RedissonLockrLock.lock();
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 
 | @Overridepublic void lock() {
 try {
 lock(-1, null, false);
 } catch (InterruptedException e) {
 throw new IllegalStateException();
 }
 }
 
 | 
| 1
 | Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 
 | private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;
 if (leaseTime > 0) {
 ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
 } else {
 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
 TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
 }
 CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
 
 if (ttlRemaining == null) {
 if (leaseTime > 0) {
 internalLockLeaseTime = unit.toMillis(leaseTime);
 } else {
 
 scheduleExpirationRenewal(threadId);
 }
 }
 return ttlRemaining;
 });
 return new CompletableFutureWrapper<>(f);
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | protected void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();
 ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
 if (oldEntry != null) {
 oldEntry.addThreadId(threadId);
 } else {
 entry.addThreadId(threadId);
 try {
 renewExpiration();
 } finally {
 if (Thread.currentThread().isInterrupted()) {
 cancelExpirationRenewal(threadId);
 }
 }
 }
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 
 | private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
 if (ee == null) {
 return;
 }
 
 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
 @Override
 public void run(Timeout timeout) throws Exception {
 ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
 if (ent == null) {
 return;
 }
 Long threadId = ent.getFirstThreadId();
 if (threadId == null) {
 return;
 }
 
 CompletionStage<Boolean> future = renewExpirationAsync(threadId);
 future.whenComplete((res, e) -> {
 if (e != null) {
 log.error("Can't update lock " + getRawName() + " expiration", e);
 EXPIRATION_RENEWAL_MAP.remove(getEntryName());
 return;
 }
 
 if (res) {
 
 renewExpiration();
 } else {
 cancelExpirationRenewal(null);
 }
 });
 }
 }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
 
 ee.setTimeout(task);
 }
 
 |