Download Install Tutorial Docs FAQ Tools WikiLicense Team IRC Planet Involvement Shop Book

Ticket #539: tpool.py

Line 
1 import Queue
2 import time
3 from threading import Semaphore
4
5 class StopThread(Exception): pass
6
7 class ThreadPool:
8     def __init__(self, max, mkthread_callable, spare=6, shrinkratio=2, shrink_every=10):
9         '''Create a new thread pool.
10
11         max should be the absolute maximum number of threads spawned
12         mkthread_callable is some callable that takes a function
13             which should be called to retrieve each "job", and that
14             returns a threading.Thread-like object with a non-blocking
15             start() method.  The thread should exit if get_job() returns
16             StopThread.
17         spare should be the number of "idle" connections to keep open
18         shrinkratio should be the factor (current_idle - spare) gets
19             divided by to gradually shrink the thread pool
20         shrink_every is the number of seconds between each shrink()
21             run
22         '''
23         spare = min(spare, max)
24         self.max = max
25         self.mkthread_callable = mkthread_callable
26         self.spare = spare
27         self.shrinkratio = shrinkratio
28         self.shrink_every = shrink_every
29
30         self._queue = Queue.Queue()
31         self._threadcount = 0
32         self._idlecount = 0
33         self._lastcheck = 0
34         self._inclock = Semaphore()
35    
36     def _get_job(self):
37         '''Returns a new job (or StopThread) from the shared Queue'''
38         self._inclock.acquire()
39         self._idlecount += 1
40         self._inclock.release()
41
42         r = self._queue.get()
43
44         self._inclock.acquire()
45         self._idlecount -= 1
46         self._inclock.release()
47
48         return r
49
50     def _spawnthread(self):
51         '''Creates a new thread'''
52         th = self.mkthread_callable(self._get_job)
53         th.start()
54         self._threadcount += 1
55
56     def _killthread(self):
57         '''Tells one of the idle threads to die'''
58         th = self.mkthread_callable(self._get_job)
59         self._queue.put(StopThread)
60         self._threadcount -= 1
61
62     def tick(self):
63         '''Determines whether or not enough time has elapsed
64         to call shrink() again.
65         '''
66         if time.time() - self._lastcheck >= self.shrink_every:
67             self.shrink()
68
69     def shrink(self):
70         '''Find out of a surge in demand has created more
71         threads than we need (more idle threads than self.spare),
72         and scale back the number if need be.
73         '''
74         self._lastcheck = time.time()
75         surplus = self._idlecount - self.spare
76         if surplus > 0:
77             remove = max(1, surplus / self.shrinkratio)
78             for x in xrange(remove):
79                 self._killthread()
80        
81     def run_job(self, job):
82         '''Add some object to the job queue to be handled by
83         some thread in the pool.
84
85         job should be a unit of work that the threads understand
86         '''
87         self.tick()
88         if self._idlecount == 0 and self.max - self._threadcount:
89             self._spawnthread()
90         self._queue.put(job)
91
92     def shutdown(self):
93         '''Kill all threads'''
94         def _dead_st(*args, **kw):
95             pass
96         self._spawnthread = _dead_st
97         for x in xrange(self._threadcount):
98             self._killthread()
99
100 if __name__ == '__main__':
101     # Example
102     from threading import Thread
103     import random
104     class Sleeper(Thread):
105         def __init__(self, get_job):
106             self.get_job = get_job
107             Thread.__init__(self)
108
109         def run(self):
110             while True:
111                 job = self.get_job()
112                 if job == StopThread:
113                     print 'thread done'
114                     return
115                 time.sleep(job)
116     pool = ThreadPool(40, Sleeper)
117
118     for x in xrange(0, 80):
119         pool.run_job(random.random())
120     for x in xrange(21):
121         time.sleep(1)
122         pool.tick()
123     print 'total shutdown...'
124     pool.shutdown()
125     time.sleep(1.5)
126     print 'should not sleep'
127     pool.run_job(15)
128     print 'exit'

Hosted by WebFaction

Log in as guest/cpguest to create tickets