🗒️redis数据库10— redis实战案例
00 分钟
2024-8-25
2024-8-25
type
status
date
slug
summary
tags
category
icon
password

Redis 在 Python 中的实际应用场景总结

 
 
Redis 是一种高性能的键值对存储数据库,常用于缓存、分布式锁、消息队列等场景。以下是 Redis 在 Python 中的一些典型应用场景及其实现示例。
 
 

(1)KV 缓存

notion image
应用场景: 缓存用户信息、会话信息、商品信息等,可以显著提高系统性能,减轻数据库的负载。
示例代码:
说明:
  • 通过 get_user 函数首先尝试从 Redis 缓存中获取用户信息,如果不存在则从数据库中查询,并将查询结果缓存到 Redis 中,设置有效期为 3600 秒。
 

 
 

(2)分布式锁

应用场景: 在分布式系统中,用于确保同一时刻只有一个进程能访问共享资源,以避免并发问题。
实现方案:

方案1:SETNX + EXPIRE


方案2:SETNX + 带过期时间的 value

 

方案3:原子性操作

说明:
  • SETNX + EXPIRE 组合可以用来实现分布式锁,但必须注意原子性。方案3使用 set 方法,将 SETNXEXPIRE 合并为一个原子操作,避免锁得不到释放的问题。

(3)延迟队列

应用场景: 实现任务的延迟处理,如订单超时未支付自动取消、定时消息推送等。
实现方法:
  • 使用 Redis 的有序集合(ZSet),利用元素的 score 表示任务的执行时间,通过轮询检查并处理过期任务。
这段代码使用 Redis 和 Python 来实现一个简单的延迟任务队列。每个任务被添加到一个有序集合中,并在指定的时间后执行。

逐行解释代码

  • import time: 引入 Python 标准库中的 time 模块,用于处理时间相关的功能,比如获取当前时间、让线程休眠等。
  • import uuid: 引入 Python 标准库中的 uuid 模块,用于生成全局唯一标识符(UUID),确保每个任务有唯一的标识。
  • import redis: 引入 redis-py 库,用于连接和操作 Redis 数据库。
  • pool = redis.ConnectionPool(...): 创建一个 Redis 连接池,配置 Redis 服务器的地址为 127.0.0.1(本地主机)和端口 6379,并启用 decode_responses(自动解码 Redis 响应)。
  • r = redis.Redis(connection_pool=pool): 使用连接池创建一个 Redis 客户端对象 r,通过它可以进行各种 Redis 操作。
  • def delay_task(task_name, delay_time):: 定义一个名为 delay_task 的函数,用于添加延迟任务。
  • task_id = task_name + str(uuid.uuid4()): 生成任务的唯一标识符 task_id,由任务名称和一个 UUID 组成,确保每个任务都有独立的标识。
  • retry_ts = time.time() + delay_time: 计算任务的执行时间 retry_ts,通过获取当前时间并加上延迟时间(delay_time)。
  • r.zadd("delay-queue", {task_id: retry_ts}): 将任务添加到 Redis 的有序集合 delay-queue 中,task_id 作为成员,retry_ts 作为评分(score),用于控制任务的执行顺序。
  • def loop():: 定义一个名为 loop 的函数,用于不断检查并执行已到期的任务。
  • while True:: 启动一个无限循环,使程序持续运行,不断检查是否有到期任务。
  • task_list = r.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1): 从 Redis 的有序集合 delay-queue 中,按评分从小到大获取分数在 [0, 当前时间] 区间内的任务,最多获取一个(num=1)。
  • if not task_list:: 如果没有任务到期(task_list 为空),则:
    • time.sleep(1): 让当前线程休眠 1 秒钟,然后继续循环。
    • continue: 结束本次循环,回到循环的开头。
  • task_id = task_list[0]: 获取任务列表中的第一个任务的 ID。
  • success = r.zrem("delay-queue", task_id): 从有序集合中移除该任务,表示任务已被获取并处理。zrem 返回 1 表示成功,0 表示任务已经被删除。
  • if success:: 如果成功删除任务(即该任务没有被其他线程获取并处理),则调用 handle_msg(task_id) 处理该任务。
  • def handle_msg(msg):: 定义一个名为 handle_msg 的函数,用于处理任务。
  • print(f"消息 {msg} 已经被处理完成!"): 输出任务已经处理完成的消息,显示任务的 ID。
  • import threading: 引入 Python 的 threading 模块,用于多线程操作。
  • t = threading.Thread(target=loop): 创建一个线程对象 t,该线程将运行 loop 函数。
  • t.start(): 启动线程 t,让 loop 函数在后台持续运行。
  • delay_task("任务1延迟5", 5): 向延迟队列中添加一个名为 "任务1延迟5" 的任务,延迟时间为 5 秒。
  • delay_task("任务2延迟2", 2): 向延迟队列中添加一个名为 "任务2延迟2" 的任务,延迟时间为 2 秒。

总结

这段代码使用 Redis 实现了一个延迟任务队列。任务被添加到 Redis 的有序集合中,并由一个后台线程定期检查并执行到期任务。这种模式在需要处理定时任务、延迟执行的场景中非常实用。
 

 
 

(4)发布订阅

应用场景: 实现消息的发布与订阅,适用于聊天室、通知系统等场景。
示例代码:
说明:
  • 使用 Redis 的发布订阅功能,允许多个客户端订阅同一频道,并接收从其他客户端发布的消息。
 

 

(5)定时任务

应用场景: 实现诸如订单超时自动取消等功能,通常使用 Redis 的键空间通知(Keyspace Notification)。
示例代码:
说明:
  • Redis 的键空间通知可以监听某个键的过期事件,当订单超时后自动触发相应的处理逻辑。
 

总结

Redis 在 Python 中的应用场景非常广泛,从最基本的 KV 缓存,到分布式锁、延迟队列、发布订阅、定时任务等应用,都可以利用 Redis 提供的高性能数据结构实现。通过这些案例,可以看到 Redis 的灵活性和强大功能在实际项目中的重要作用。
 
 
 

打赏

如果您觉得我的内容对你有所帮助,不要吝啬你的一键三连!如果你有能力的话也可以通过下面请我喝杯咖啡~金额您随意~如果对文章内容有任何疑问,欢迎加入群组联系我~
notion image
上一篇
redis数据库9— Python操作 Redis
下一篇
Python asyncio高性能异步编程1—协程