# 简单网络通信任务队列系统实现


背景介绍

在现代应用中,任务调度和状态监控是实现高效资源管理的关键模块。本系统采用Python实现,结合优先级队列、线程管理与异步执行,支持任务添加、执行和状态跟踪,适用于本地运行环境。


思路分析

1. 优先级队列模型

使用heapq实现优先级队列,按任务执行时间排序。
– 任务按优先级(执行时间)加入队列,优先级越高越优先执行。
– 若有多线程竞争,优先级队列可自动处理,确保多线程间竞争的公平性。

2. 异步执行逻辑

通过asyncio处理异步任务,实现高并发执行。
– 使用asyncio创建事件循环,管理线程池的执行任务。
– 队列中任务执行时,触发事件循环,异步执行任务。

3. 状态监控机制

记录任务执行状态,支持实时监控。
– 使用字典保存任务执行状态(成功/失败)。
– 实时更新状态并通知用户。


代码实现

import heapq
import asyncio

# 任务队列管理类  
class TaskQueueManager:
    def __init__(self):
        self.heap = []  # 优先队列,按执行时间排序
        self.task_states = {}  # 存储任务执行状态

    def add_task(self, name, execution_time):
        """添加任务,优先按执行时间排序"""
        # 将任务放入最小堆(执行时间越小越优先)  
        heapq.heappush(self.heap, (name, execution_time))
        self.task_states[name] = "pending"

    def execute_task(self, task_id):
        """执行任务,异步执行并记录状态"""
        async with asyncio.runloop():
            # 触发事件循环,异步执行任务  
            status = await asyncio.sleep_promise(task_id)
            self.task_states[task_id] = status

    def get_task_status(self, task_id):
        """获取任务执行状态"""
        return self.task_states.get(task_id, "pending")

# 示例使用  
if __name__ == "__main__":
    manager = TaskQueueManager()

    # 添加任务  
    manager.add_task("计算平均值", 3)  # 执行时间3秒  
    manager.add_task("加法运算", 5)  

    # 执行任务  
    manager.execute_task(0)  # 异步执行第一个任务  
    manager.execute_task(1)  # 异步执行第二个任务  

    # 输出结果  
    print("任务执行状态:")
    for task_id, status in manager.task_states.items():
        print(f"任务 {task_id}: {status}")

总结

本实现通过优先队列、异步调度和状态记录,构建了一个支持任务调度与监控的网络通信任务队列系统。

  • 优先级队列确保任务按执行时间排序,提升并发处理能力。
  • 异步执行采用asyncio处理多任务,实现高并发执行。
  • 状态监控记录任务执行状态,支持实时反馈。

该系统在本地环境中运行,无需依赖框架或外部服务,适用于需要任务调度的任务管理场景。