type
status
date
slug
summary
tags
category
icon
password
Redis 在 Python 中的实际应用场景总结
Redis 是一种高性能的键值对存储数据库,常用于缓存、分布式锁、消息队列等场景。以下是 Redis 在 Python 中的一些典型应用场景及其实现示例。
(1)KV 缓存
应用场景: 缓存用户信息、会话信息、商品信息等,可以显著提高系统性能,减轻数据库的负载。
示例代码:
说明:
- 通过
get_user
函数首先尝试从 Redis 缓存中获取用户信息,如果不存在则从数据库中查询,并将查询结果缓存到 Redis 中,设置有效期为 3600 秒。
(2)分布式锁
应用场景: 在分布式系统中,用于确保同一时刻只有一个进程能访问共享资源,以避免并发问题。
实现方案:
方案1:SETNX
+ EXPIRE
方案2:SETNX
+ 带过期时间的 value
方案3:原子性操作
说明:
SETNX
+EXPIRE
组合可以用来实现分布式锁,但必须注意原子性。方案3使用set
方法,将SETNX
和EXPIRE
合并为一个原子操作,避免锁得不到释放的问题。
(3)延迟队列
应用场景: 实现任务的延迟处理,如订单超时未支付自动取消、定时消息推送等。
实现方法:
- 使用 Redis 的有序集合(ZSet),利用元素的
score
表示任务的执行时间,通过轮询检查并处理过期任务。
这段代码使用 Redis 和 Python 来实现一个简单的延迟任务队列。每个任务被添加到一个有序集合中,并在指定的时间后执行。
逐行解释代码
import time
: 引入 Python 标准库中的time
模块,用于处理时间相关的功能,比如获取当前时间、让线程休眠等。
import uuid
: 引入 Python 标准库中的uuid
模块,用于生成全局唯一标识符(UUID),确保每个任务有唯一的标识。
import redis
: 引入redis-py
库,用于连接和操作 Redis 数据库。
pool = redis.ConnectionPool(...)
: 创建一个 Redis 连接池,配置 Redis 服务器的地址为127.0.0.1
(本地主机)和端口6379
,并启用decode_responses
(自动解码 Redis 响应)。
r = redis.Redis(connection_pool=pool)
: 使用连接池创建一个 Redis 客户端对象r
,通过它可以进行各种 Redis 操作。
def delay_task(task_name, delay_time):
: 定义一个名为delay_task
的函数,用于添加延迟任务。
task_id = task_name + str(uuid.uuid4())
: 生成任务的唯一标识符task_id
,由任务名称和一个 UUID 组成,确保每个任务都有独立的标识。
retry_ts = time.time() + delay_time
: 计算任务的执行时间retry_ts
,通过获取当前时间并加上延迟时间(delay_time
)。
r.zadd("delay-queue", {task_id: retry_ts})
: 将任务添加到 Redis 的有序集合delay-queue
中,task_id
作为成员,retry_ts
作为评分(score),用于控制任务的执行顺序。
def loop():
: 定义一个名为loop
的函数,用于不断检查并执行已到期的任务。
while True:
: 启动一个无限循环,使程序持续运行,不断检查是否有到期任务。
task_list = r.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)
: 从 Redis 的有序集合delay-queue
中,按评分从小到大获取分数在[0, 当前时间]
区间内的任务,最多获取一个(num=1
)。
if not task_list:
: 如果没有任务到期(task_list
为空),则:time.sleep(1)
: 让当前线程休眠 1 秒钟,然后继续循环。continue
: 结束本次循环,回到循环的开头。
task_id = task_list[0]
: 获取任务列表中的第一个任务的 ID。
success = r.zrem("delay-queue", task_id)
: 从有序集合中移除该任务,表示任务已被获取并处理。zrem
返回 1 表示成功,0 表示任务已经被删除。
if success:
: 如果成功删除任务(即该任务没有被其他线程获取并处理),则调用handle_msg(task_id)
处理该任务。
def handle_msg(msg):
: 定义一个名为handle_msg
的函数,用于处理任务。
print(f"消息 {msg} 已经被处理完成!")
: 输出任务已经处理完成的消息,显示任务的 ID。
import threading
: 引入 Python 的threading
模块,用于多线程操作。
t = threading.Thread(target=loop)
: 创建一个线程对象t
,该线程将运行loop
函数。
t.start()
: 启动线程t
,让loop
函数在后台持续运行。
delay_task("任务1延迟5", 5)
: 向延迟队列中添加一个名为 "任务1延迟5" 的任务,延迟时间为 5 秒。
delay_task("任务2延迟2", 2)
: 向延迟队列中添加一个名为 "任务2延迟2" 的任务,延迟时间为 2 秒。
总结
这段代码使用 Redis 实现了一个延迟任务队列。任务被添加到 Redis 的有序集合中,并由一个后台线程定期检查并执行到期任务。这种模式在需要处理定时任务、延迟执行的场景中非常实用。
(4)发布订阅
应用场景: 实现消息的发布与订阅,适用于聊天室、通知系统等场景。
示例代码:
说明:
- 使用 Redis 的发布订阅功能,允许多个客户端订阅同一频道,并接收从其他客户端发布的消息。
(5)定时任务
应用场景: 实现诸如订单超时自动取消等功能,通常使用 Redis 的键空间通知(Keyspace Notification)。
示例代码:
说明:
- Redis 的键空间通知可以监听某个键的过期事件,当订单超时后自动触发相应的处理逻辑。
总结
Redis 在 Python 中的应用场景非常广泛,从最基本的 KV 缓存,到分布式锁、延迟队列、发布订阅、定时任务等应用,都可以利用 Redis 提供的高性能数据结构实现。通过这些案例,可以看到 Redis 的灵活性和强大功能在实际项目中的重要作用。
打赏
如果您觉得我的内容对你有所帮助,不要吝啬你的一键三连!如果你有能力的话也可以通过下面请我喝杯咖啡~金额您随意~如果对文章内容有任何疑问,欢迎加入群组联系我~
- 作者:Don Mark
- 链接:null/article/8c1a0acf-d44e-4324-97bf-25b1274ea600
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。