实现一个 Redis 分布式锁

实现一个 Redis 分布式锁插图

前言

在我们日常开发中,难免会遇到要加锁的情景。例如扣除产品库存,首先要从数据库中取出库存,进行库存判断,再减去库存。这一波操作明显不符合原子性,如果代码块不加锁,很容易因为并发导致超卖问题。咱们的系统如果是单体架构,那我们使用本地锁就可以解决问题。如果是分布式架构,就需要使用分布式锁。

方案

使用 SETNX 和 EXPIRE 命令

|

1

2

3

4

5

6

7

8

9

10

11

12

13

|

SETNX key value

EXPIRE key seconds

DEL key

if (setnx(“item_1_lock”, 1)) {

    expire(“item_1_lock”, 30);

    try {

        … 逻辑

    } catch {

        …

    } finally {

        del(“item_1_lock”);

    }

}

|

这种方法看起来可以解决问题,但是有一定的风险,因为 SETNX 和 EXPIRE 这波操作是非原子性的,如果 SETNX 成功之后,出现错误,导致 EXPIRE 没有执行,导致锁没有设置超时时间形成死锁。

针对这种情况,我们可以使用 lua 脚本来保持操作原子性,保证 SETNX 和 EXPIRE 两个操作要么都成功,要么都不成功。

|

1

2

3

4

5

|

if (redis.call(‘setnx’, KEYS[1], ARGV[1]) < 1)

then return 0;

end;

redis.call(‘expire’, KEYS[1], tonumber(ARGV[2]));

return 1;

|

通过这样的方法,我们初步解决了竞争锁的原子性问题,虽然其他功能还未实现,但是应该不会造成死锁????????????。

Redis 2.6.12 以上可灵活使用 SET 命令

|

1

2

3

4

5

6

7

8

9

10

11

|

SET key value NX EX 30

DEL key

if (set(“item_1_lock”, 1, “NX”, “EX”, 30)) {

    try {

        … 逻辑

    } catch {

        …

    } finally {

        del(“item_1_lock”);

    }

}

|

改进后的方法不需要借助 lua 脚本就解决了 SETNX 和 EXPIRE 的原子性问题。现在我们再仔细琢磨琢磨,如果 A 拿到了锁顺利进入代码块执行逻辑,但是由于各种原因导致超时自动释放锁。在这之后 B 成功拿到了锁进入代码块执行逻辑,但此时如果 A 执行逻辑完毕再来释放锁,就会把 B 刚获得的锁释放了。就好比用自己家的钥匙开了别家的门,这是不可接受的。

为了解决这个问题我们可以尝试在 SET 的时候设置一个锁标识,然后在 DEL 的时候验证当前锁是否为自己的锁。

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

|

String value = UUID.randomUUID().toString().replaceAll(“-”, "");

if (set(“item_1_lock”, value, “NX”, “EX”, 30)) {

    try {

        … 逻辑

    } catch {

        …

    } finally {

        … lua 脚本保证原子性

    }

}

if (redis.call(‘get’, KEYS[1]) == ARGV[1])

then return redis.call(‘del’, KEYS[1])

else return 0

end

|

到这里,我们终于解决了竞争锁的原子性问题和误删锁问题。但是锁一般还需要支持可重入、循环等待和超时自动续约等功能点。下面我们学习使用一个非常好用的包来解决这些问题。

入门 Redisson

Redission 的锁,实现了可重入和超时自动续约功能,它都帮我们封装好了,我们只要按照自己的需求调用它的 API 就可以轻松实现上面所提到的几个功能点。详细功能可以查看 Redisson 文档

在项目中安装 Redisson

|

1

2

3

4

5

|

    org.redisson

    redisson

    3.13.2

|

|

1

|

implementation ‘org.redisson:redisson:3.13.2’

|

用 Maven 或者 Gradle 构建,目前最新版本为 3.13.2,也可以在这里 Redisson 找到你需要的版本。

简单尝试

|

1

2

3

4

5

6

7

8

9

10

|

RedissonClient redissonClient = Redisson.create();

RLock lock = redissonClient.getLock(“lock”);

boolean res = lock.lock();

if (res) {

try {

… 逻辑

} finally {

lock.unlock();

}

}

|

Redisson 将底层逻辑全部做了一个封装,我们无需关心具体实现,几行代码就能使用一把完美的锁。下面我们简单折腾折腾源码。

加锁

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

|

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {

    long threadId = Thread.currentThread().getId();

    Long ttl = tryAcquire(leaseTime, unit, threadId);

    if (ttl == null) {

        return;

    }

    RFuture future = subscribe(threadId);

    if (interruptibly) {

        commandExecutor.syncSubscriptionInterrupted(future);

    } else {

        commandExecutor.syncSubscription(future);

    }

    try {

        while (true) {

            ttl = tryAcquire(leaseTime, unit, threadId);

            if (ttl == null) {

                break;

            }

            if (ttl >= 0) {

                try {

                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

                } catch (InterruptedException e) {

                    if (interruptibly) {

                        throw e;

                    }

                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

                }

            } else {

                if (interruptibly) {

                    future.getNow().getLatch().acquire();

                } else {

                    future.getNow().getLatch().acquireUninterruptibly();

                }

            }

        }

    } finally {

        unsubscribe(future, threadId);

    }

}

|

获取锁

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

|

private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {

    if (leaseTime != -1) {

        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);

    }

    RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);

    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {

        if (e != null) {

            return;

        }

        if (ttlRemaining == null) {

            scheduleExpirationRenewal(threadId);

        }

    });

    return ttlRemainingFuture;

}

RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {

    internalLockLeaseTime = unit.toMillis(leaseTime);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,

            "if (redis.call(‘exists’, KEYS[1]) == 0) then" +

                    "redis.call(‘hincrby’, KEYS[1], ARGV[2], 1);" +

                    "redis.call(‘pexpire’, KEYS[1], ARGV[1]);" +

                    "return nil;" +

                    "end;" +

                    "if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) then" +

                    "redis.call(‘hincrby’, KEYS[1], ARGV[2], 1);" +

                    "redis.call(‘pexpire’, KEYS[1], ARGV[1]);" +

                    "return nil;" +

                    "end;" +

                    “return redis.call(‘pttl’, KEYS[1]);”,

            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

}

|

删除锁

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

|

public RFuture unlockAsync(long threadId) {

    RPromise result = new RedissonPromise();

    RFuture future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {

        cancelExpirationRenewal(threadId);

        if (e != null) {

            result.tryFailure(e);

            return;

        }

        if (opStatus == null) {

            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id:"

                    + id + "thread-id:" + threadId);

            result.tryFailure(cause);

            return;

        }

        result.trySuccess(null);

    });

    return result;

}

protected RFuture unlockInnerAsync(long threadId) {

    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

            "if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0) then" +

                    “return nil;” +

                    "end;" +

                    "local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1);" +

                    "if (counter > 0) then" +

                    "redis.call(‘pexpire’, KEYS[1], ARGV[2]);" +

                    "return 0;" +

                    "else" +

                    "redis.call(‘del’, KEYS[1]);" +

                    "redis.call(‘publish’, KEYS[2], ARGV[1]);" +

                    "return 1;" +

                    "end;" +

                    “return nil;”,

            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

|

总结

使用 Redis 做分布式锁来解决并发问题仍存在一些困难,也有很多需要注意的点,我们应该正确评估系统的体量,不能为了使用某项技术而用。要完全解决并发问题,仍需要在数据库层面做功夫。