执行shell命令
|
通过“threading”创建一个线程
|
性能问题 - GIL
|
生产者和消费者
|
线程池模板
|
使用多处理ThreadPool
与“map” 性能比较
输出:
|
互斥锁
|
Deadlock
|
实现“监控”
Using RLock # ref: An introduction to Python Concurrency - David Beazley from threading import Thread from threading import RLock import time class monitor(object): lock = RLock() def foo(self,tid): with monitor.lock: print "%d in foo" % tid time.sleep(5) self.ker(tid) def ker(self,tid): with monitor.lock: print "%d in ker" % tid m = monitor() def task1(id): m.foo(id) def task2(id): m.ker(id) t1 = Thread(target=task1,args=(1,)) t2 = Thread(target=task2,args=(2,)) t1.start() t2.start() t1.join() t2.join() output: $ python monitor.py 1 in foo 1 in ker 2 in ker |
控制primitive资源
使用信号量 from threading import Thread from threading import Semaphore from random import random import time # limit resource to 3 sema = Semaphore(3) def foo(tid): with sema: print "%d acquire sema" % tid wt = random()*5 time.sleep(wt) print "%d release sema" % tid threads = [] for _t in range(5): t = Thread(target=foo,args=(_t,)) threads.append(t) t.start() for _t in threads: _t.join() 输出: python semaphore.py 0 acquire sema 1 acquire sema 2 acquire sema 0 release sema 3 acquire sema 2 release sema 4 acquire sema 1 release sema 4 release sema 3 release sema |
Ensure tasks has done
Using ‘event’ from threading import Thread from threading import Event import time e = Event() def worker(id): print "%d wait event" % id e.wait() print "%d get event set" % id t1=Thread(target=worker,args=(1,)) t2=Thread(target=worker,args=(2,)) t3=Thread(target=worker,args=(3,)) t1.start() t2.start() t3.start() # wait sleep task(event) happen time.sleep(3) e.set() output: python event.py 1 wait event 2 wait event 3 wait event 2 get event set 3 get event set 1 get event set |
线程安全优先队列
Using ‘condition’ import threading import heapq import time import random class PriorityQueue(object): def __init__(self): self._q = [] self._count = 0 self._cv = threading.Condition() def __str__(self): return str(self._q) def __repr__(self): return self._q def put(self, item, priority): with self._cv: heapq.heappush(self._q, (-priority,self._count,item)) self._count += 1 self._cv.notify() def pop(self): with self._cv: while len(self._q) == 0: print("wait...") self._cv.wait() ret = heapq.heappop(self._q)[-1] return ret priq = PriorityQueue() def producer(): while True: print(priq.pop()) def consumer(): while True: time.sleep(3) print("consumer put value") priority = random.random() priq.put(priority,priority*10) for _ in range(3): priority = random.random() priq.put(priority,priority*10) t1=threading.Thread(target=producer) t2=threading.Thread(target=consumer) t1.start();t2.start() t1.join();t2.join() output: python3 thread_safe.py 0.6657491871045683 0.5278797439991247 0.20990624606296315 wait... consumer put value 0.09123101305407577 wait... |
Multiprocessing
Solving GIL problem via processes >>> from multiprocessing import Pool >>> def fib(n): ... if n <= 2: ... return 1 ... return fib(n-1) + fib(n-2) ... >>> def profile(func): ... def wrapper(*args, **kwargs): ... import time ... start = time.time() ... func(*args, **kwargs) ... end = time.time() ... print end - start ... return wrapper ... >>> @profile ... def nomultiprocess(): ... map(fib,[35]*5) ... >>> @profile ... def hasmultiprocess(): ... pool = Pool(5) ... pool.map(fib,[35]*5) ... >>> nomultiprocess() 23.8454811573 >>> hasmultiprocess() 13.2433719635 |
自定义多处理映射
from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(pipe,x): pipe.send(f(x)) pipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] proc=[Process(target=spawn(f), args=(c,x)) for x,(p,c) in izip(X,pipe)] [p.start() for p in proc] [p.join() for p in proc] return [p.recv() for (p,c) in pipe] print parmap(lambda x:x**x,range(1,5)) |
优雅的方法来杀死所有的子进程
from __future__ import print_function import signal import os import time from multiprocessing import Process, Pipe NUM_PROCESS = 10 def aurora(n): while True: time.sleep(n) if __name__ == "__main__": procs = [Process(target=aurora, args=(x,)) for x in range(NUM_PROCESS)] try: for p in procs: p.daemon = True p.start() [p.join() for p in procs] finally: for p in procs: if not p.is_alive(): continue os.kill(p.pid, signal.SIGKILL) |
简单的循环调度程序
>>> def fib(n): ... if n <= 2: ... return 1 ... return fib(n-1)+fib(n-2) ... >>> def gen_fib(n): ... for _ in range(1,n+1): ... yield fib(_) ... >>> t=[gen_fib(5),gen_fib(3)] >>> from collections import deque >>> tasks = deque() >>> tasks.extend(t) >>> def run(tasks): ... while tasks: ... try: ... task = tasks.popleft() ... print task.next() ... tasks.append(task) ... except StopIteration: ... print "done" ... >>> run(tasks) 1 1 1 1 2 2 3 done 5 done |
具有阻塞功能的调度器
# ref: PyCon 2015 - David Beazley import socket from select import select from collections import deque tasks = deque() r_wait = {} s_wait = {} def fib(n): if n <= 2: return 1 return fib(n-1)+fib(n-2) def run(): while any([tasks,r_wait,s_wait]): while not tasks: # polling rr, sr, _ = select(r_wait, s_wait, {}) for _ in rr: tasks.append(r_wait.pop(_)) for _ in sr: tasks.append(s_wait.pop(_)) try: task = tasks.popleft() why,what = task.next() if why == 'recv': r_wait[what] = task elif why == 'send': s_wait[what] = task else: raise RuntimeError except StopIteration: pass def fib_server(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) sock.bind(('localhost',5566)) sock.listen(5) while True: yield 'recv', sock c, a = sock.accept() tasks.append(fib_handler(c)) def fib_handler(client): while True: yield 'recv', client req = client.recv(1024) if not req: break resp = fib(int(req)) yield 'send', client client.send(str(resp)+'\n') client.close() tasks.append(fib_server()) run() output: (bash 1) $ nc loalhost 5566 20 6765 output: (bash 2) $ nc localhost 5566 10 55 |
PoolExecutor
# python2.x is module futures on PyPI # new in Python3.2 >>> from concurrent.futures import \ ... ThreadPoolExecutor >>> def fib(n): ... if n<=2: ... return 1 ... return fib(n-1) + fib(n-2) ... >>> with ThreadPoolExecutor(3) as e: ... res= e.map(fib,[1,2,3,4,5]) ... for _ in res: ... print(_, end=' ') ... 1 1 2 3 5 >>> # result is generator?! >>> with ThreadPoolExecutor(3) as e: ... res = e.map(fib, [1,2,3]) ... inspect.isgenerator(res) ... True # demo GIL from concurrent import futures import time def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) def thread(): s = time.time() with futures.ThreadPoolExecutor(2) as e: res = e.map(fib, [35]*2) for _ in res: print(_) e = time.time() print("thread cost: {}".format(e-s)) def process(): s = time.time() with futures.ProcessPoolExecutor(2) as e: res = e.map(fib, [35]*2) for _ in res: print(_) e = time.time() print("pocess cost: {}".format(e-s)) # bash> python3 -i test.py >>> thread() 9227465 9227465 thread cost: 12.550225019454956 >>> process() 9227465 9227465 pocess cost: 5.538189888000488 |
What “with ThreadPoolExecutor” doing?
from concurrent import futures def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) with futures.ThreadPoolExecutor(3) as e: fut = e.submit(fib, 30) res = fut.result() print(res) # equal to e = futures.ThreadPoolExecutor(3) fut = e.submit(fib, 30) fut.result() e.shutdown(wait=True) print(res) output: $ python3 thread_pool_exec.py 832040 832040 |
Future Object
# future: deferred computation # add_done_callback from concurrent import futures def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) def handler(future): res = future.result() print("res: {}".format(res)) def thread_v1(): with futures.ThreadPoolExecutor(3) as e: for _ in range(3): f = e.submit(fib, 30+_) f.add_done_callback(handler) print("end") def thread_v2(): to_do = [] with futures.ThreadPoolExecutor(3) as e: for _ in range(3): fut = e.submit(fib, 30+_) to_do.append(fut) for _f in futures.as_completed(to_do): res = _f.result() print("res: {}".format(res)) print("end") output: $ python3 -i fut.py >>> thread_v1() res: 832040 res: 1346269 res: 2178309 end >>> thread_v2() res: 832040 res: 1346269 res: 2178309 end |
Future error handling
from concurrent import futures def spam(): raise RuntimeError def handler(future): print("callback handler") try: res = future.result() except RuntimeError: print("get RuntimeError") def thread_spam(): with futures.ThreadPoolExecutor(2) as e: f = e.submit(spam) f.add_done_callback(handler) output: $ python -i fut_err.py >>> thread_spam() callback handler get RuntimeError |