Skip to main content
 首页 » 程序教程

Python编程如何实现简单的线程池

2016年07月24日42420

目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。

传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。

我们将传统方案中的线程执行过程分为三个过程:T1、T2、T3:

T1:线程创建时间

T2:线程执行时间,包括线程的同步等时间

T3:线程销毁时间

那么我们可以看出,线程本身的开销所占的比例为(T1+T3) / (T1+T2+T3)。如果线程执行的时间很短的话,这比开销可能占到20%-50%左右。如果任务执行时间很频繁的话,这笔开销将是不可忽略的。

除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为2000,那么最坏情况下,系统可能需要产生2000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求。

因此线程池的出现正是着眼于减少线程池本身带来的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。

基于这种预创建技术,线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上,执行次数越多,每个任务所分担到的线程本身开销则越小,不过我们另外可能需要考虑进去线程之间同步所带来的开销。

线程池的注意事项

虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。

(1)线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。一般来说,如果代码结构合理的 话,线程数目与CPU 数量相适合即可。如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应算法来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。

(2)并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。

(3)线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。

简单线程池的设计

一个典型的线程池,应该包括如下几个部分:

1、线程池管理器(ThreadPool),用于启动、停用,管理线程池

2、工作线程(WorkThread),线程池中的线程

3、请求接口(WorkRequest),创建请求对象,以供工作线程调度任务的执行

4、请求队列(RequestQueue),用于存放和提取请求

5、结果队列(ResultQueue),用于存储请求执行后返回的结果

线程池管理器,通过添加请求的方法(putRequest)向请求队列(RequestQueue)添加请求,这些请求事先需要实现请求接口,即传递工作函数、参数、结果处理函数、以及异常处理函数。之后初始化一定数量的工作线程,这些线程通过轮询的方式不断查看请求队列(RequestQueue),只要有请求存在,则会提取出请求,进行执行。然后,线程池管理器调用方法(poll)查看结果队列(resultQueue)是否有值,如果有值,则取出,调用结果处理函数执行。通过以上讲述,不难发现,这个系统的核心资源在于请求队列和结果队列,工作线程通过轮询requestQueue获得人物,主线程通过查看结果队列,获得执行结果。因此,对这个队列的设计,要实现线程同步,以及一定阻塞和超时机制的设计,以防止因为不断轮询而导致的过多cpu开销。在本文中,将会用python语言实现,python的Queue,就是很好的实现了对线程同步机制。

Python编程如何实现简单的线程池

这就是一般的线程池实现的原理,下面看一个实际的代码:

#!/usr/local/env python
# coding:utf-8
__author__ = "风轻清淡"

import queue
import threading
import time


# 线程池管理
class WorkManager(object):
    def __init__(self, work_num=1000, thread_num=4):
        self.work_queue = queue.Queue()
        self.threads = []
        self.__init_work_queue(work_num)
        self.__init_thread_pool(thread_num)

    def __init_thread_pool(self, thread_num):
        """
        初始化线程池
        """
        for i in range(thread_num):
            self.threads.append(WorkThread(self.work_queue))

    def __init_work_queue(self, jobs_num):
        """
        初始化工作队列
        """
        for i in range(jobs_num):
            self.add_job(do_job, i)

    def add_job(self, func, *args):
        """
        添加一项工作入队
        """
        self.work_queue.put((func, list(args)))  # 任务入队,Queue内部实现了同步机制

    def wait_complete(self):
        """
        等待所有线程运行完毕
        """
        for item in self.threads:
            if item.isAlive():
                item.join()
                # 工作线程


class WorkThread(threading.Thread):
    def __init__(self, work_queue):
        threading.Thread.__init__(self)
        self.work_queue = work_queue
        self.start()

    def run(self):
        # 死循环,从而让创建的线程在一定条件下关闭退出
        while True:
            try:
                do, args = self.work_queue.get(block=False)  # 任务异步出队,Queue内部实现了同步机制
                do(args)
                self.work_queue.task_done() # 通知系统任务完成
            except:
                break

                # 具体任务实现


def do_job(args):
    time.sleep(0.1)  # 模拟处理时间
    print(threading.current_thread(), list(args))


# 测试
if __name__ == '__main__':
    start = time.time()
    # work_manager = WorkManager(10000, 10)       # 100.469903946
    work_manager = WorkManager(10000, 20)  # 50.2791779041
    work_manager.wait_complete()
    end = time.time()
    print("花费的时间: %s" % (end - start))

线程池的python实现代码:

Work类是一个Python线程池,不断地从workQueue队列中获取需要执行的任务,执行之,并将结果写入到resultQueue中。这里的workQueue和resultQueue都是线程安全的,其内部对各个线程的操作做了互斥。当从workQueue中获取任务超时,则线程结束。

WorkerManager负责初始化Python线程池,提供将任务加入队列和获取结果的接口,并能等待所有任务完成。

在 Python 中使用线程时,这个模式是一种很常见的并且推荐使用的方式。具体工作步骤描述如下:

  1. 创建一个 Queue.Queue() 的实例,然后使用数据对它进行填充。

  2. 将经过填充数据的实例传递给线程类,后者是通过继承 threading.Thread 的方式创建的。

  3. 生成守护线程池。

  4. 每次从队列中取出一个项目,并使用该线程中的数据和 run 方法以执行相应的工作。

  5. 在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号。

  6. 对队列执行 join 操作,实际上意味着等到队列为空,再退出主程序。

在使用这个模式时需要注意一点:通过将守护线程设置为 true,将允许主线程或者程序仅在守护线程处于活动状态时才能够退出。这种方式创建了一种简单的方式以控制程序流程,因为在退出之前,您可以对队列执行 join 操作、或者等到队列为空。

评论列表暂无评论
发表评论