| 1 |
import Queue |
|---|
| 2 |
import time |
|---|
| 3 |
from threading import Semaphore |
|---|
| 4 |
|
|---|
| 5 |
class StopThread(Exception): pass |
|---|
| 6 |
|
|---|
| 7 |
class ThreadPool: |
|---|
| 8 |
def __init__(self, max, mkthread_callable, spare=6, shrinkratio=2, shrink_every=10): |
|---|
| 9 |
'''Create a new thread pool. |
|---|
| 10 |
|
|---|
| 11 |
max should be the absolute maximum number of threads spawned |
|---|
| 12 |
mkthread_callable is some callable that takes a function |
|---|
| 13 |
which should be called to retrieve each "job", and that |
|---|
| 14 |
returns a threading.Thread-like object with a non-blocking |
|---|
| 15 |
start() method. The thread should exit if get_job() returns |
|---|
| 16 |
StopThread. |
|---|
| 17 |
spare should be the number of "idle" connections to keep open |
|---|
| 18 |
shrinkratio should be the factor (current_idle - spare) gets |
|---|
| 19 |
divided by to gradually shrink the thread pool |
|---|
| 20 |
shrink_every is the number of seconds between each shrink() |
|---|
| 21 |
run |
|---|
| 22 |
''' |
|---|
| 23 |
spare = min(spare, max) |
|---|
| 24 |
self.max = max |
|---|
| 25 |
self.mkthread_callable = mkthread_callable |
|---|
| 26 |
self.spare = spare |
|---|
| 27 |
self.shrinkratio = shrinkratio |
|---|
| 28 |
self.shrink_every = shrink_every |
|---|
| 29 |
|
|---|
| 30 |
self._queue = Queue.Queue() |
|---|
| 31 |
self._threadcount = 0 |
|---|
| 32 |
self._idlecount = 0 |
|---|
| 33 |
self._lastcheck = 0 |
|---|
| 34 |
self._inclock = Semaphore() |
|---|
| 35 |
|
|---|
| 36 |
def _get_job(self): |
|---|
| 37 |
'''Returns a new job (or StopThread) from the shared Queue''' |
|---|
| 38 |
self._inclock.acquire() |
|---|
| 39 |
self._idlecount += 1 |
|---|
| 40 |
self._inclock.release() |
|---|
| 41 |
|
|---|
| 42 |
r = self._queue.get() |
|---|
| 43 |
|
|---|
| 44 |
self._inclock.acquire() |
|---|
| 45 |
self._idlecount -= 1 |
|---|
| 46 |
self._inclock.release() |
|---|
| 47 |
|
|---|
| 48 |
return r |
|---|
| 49 |
|
|---|
| 50 |
def _spawnthread(self): |
|---|
| 51 |
'''Creates a new thread''' |
|---|
| 52 |
th = self.mkthread_callable(self._get_job) |
|---|
| 53 |
th.start() |
|---|
| 54 |
self._threadcount += 1 |
|---|
| 55 |
|
|---|
| 56 |
def _killthread(self): |
|---|
| 57 |
'''Tells one of the idle threads to die''' |
|---|
| 58 |
th = self.mkthread_callable(self._get_job) |
|---|
| 59 |
self._queue.put(StopThread) |
|---|
| 60 |
self._threadcount -= 1 |
|---|
| 61 |
|
|---|
| 62 |
def tick(self): |
|---|
| 63 |
'''Determines whether or not enough time has elapsed |
|---|
| 64 |
to call shrink() again. |
|---|
| 65 |
''' |
|---|
| 66 |
if time.time() - self._lastcheck >= self.shrink_every: |
|---|
| 67 |
self.shrink() |
|---|
| 68 |
|
|---|
| 69 |
def shrink(self): |
|---|
| 70 |
'''Find out of a surge in demand has created more |
|---|
| 71 |
threads than we need (more idle threads than self.spare), |
|---|
| 72 |
and scale back the number if need be. |
|---|
| 73 |
''' |
|---|
| 74 |
self._lastcheck = time.time() |
|---|
| 75 |
surplus = self._idlecount - self.spare |
|---|
| 76 |
if surplus > 0: |
|---|
| 77 |
remove = max(1, surplus / self.shrinkratio) |
|---|
| 78 |
for x in xrange(remove): |
|---|
| 79 |
self._killthread() |
|---|
| 80 |
|
|---|
| 81 |
def run_job(self, job): |
|---|
| 82 |
'''Add some object to the job queue to be handled by |
|---|
| 83 |
some thread in the pool. |
|---|
| 84 |
|
|---|
| 85 |
job should be a unit of work that the threads understand |
|---|
| 86 |
''' |
|---|
| 87 |
self.tick() |
|---|
| 88 |
if self._idlecount == 0 and self.max - self._threadcount: |
|---|
| 89 |
self._spawnthread() |
|---|
| 90 |
self._queue.put(job) |
|---|
| 91 |
|
|---|
| 92 |
def shutdown(self): |
|---|
| 93 |
'''Kill all threads''' |
|---|
| 94 |
def _dead_st(*args, **kw): |
|---|
| 95 |
pass |
|---|
| 96 |
self._spawnthread = _dead_st |
|---|
| 97 |
for x in xrange(self._threadcount): |
|---|
| 98 |
self._killthread() |
|---|
| 99 |
|
|---|
| 100 |
if __name__ == '__main__': |
|---|
| 101 |
|
|---|
| 102 |
from threading import Thread |
|---|
| 103 |
import random |
|---|
| 104 |
class Sleeper(Thread): |
|---|
| 105 |
def __init__(self, get_job): |
|---|
| 106 |
self.get_job = get_job |
|---|
| 107 |
Thread.__init__(self) |
|---|
| 108 |
|
|---|
| 109 |
def run(self): |
|---|
| 110 |
while True: |
|---|
| 111 |
job = self.get_job() |
|---|
| 112 |
if job == StopThread: |
|---|
| 113 |
print 'thread done' |
|---|
| 114 |
return |
|---|
| 115 |
time.sleep(job) |
|---|
| 116 |
pool = ThreadPool(40, Sleeper) |
|---|
| 117 |
|
|---|
| 118 |
for x in xrange(0, 80): |
|---|
| 119 |
pool.run_job(random.random()) |
|---|
| 120 |
for x in xrange(21): |
|---|
| 121 |
time.sleep(1) |
|---|
| 122 |
pool.tick() |
|---|
| 123 |
print 'total shutdown...' |
|---|
| 124 |
pool.shutdown() |
|---|
| 125 |
time.sleep(1.5) |
|---|
| 126 |
print 'should not sleep' |
|---|
| 127 |
pool.run_job(15) |
|---|
| 128 |
print 'exit' |
|---|