执行shell命令

# get stdout, stderr, returncode

>>> from subprocess import Popen, PIPE
>>> args = ['time', 'echo', 'hello python']
>>> ret = Popen(args, stdout=PIPE, stderr=PIPE)
>>> out, err = ret.communicate()
>>> out
b'hello python\n'
>>> err
b'        0.00 real         0.00 user         0.00 sys\n'
>>> ret.returncode
0

通过“threading”创建一个线程

>>> from threading import Thread
>>> class Worker(Thread):
...   def __init__(self, id):
...     super(Worker, self).__init__()
...     self._id = id
...   def run(self):
...     print "I am worker %d" % self._id
...
>>> t1 = Worker(1)
>>> t2 = Worker(2)
>>> t1.start(); t2.start()
I am worker 1
I am worker 2

# using function could be more flexible
>>> def Worker(worker_id):
...   print "I am worker %d" % worker_id
...
>>> from threading import Thread
>>> t1 = Thread(target=Worker, args=(1,))
>>> t2 = Thread(target=Worker, args=(2,))
>>> t1.start()
I am worker 1
I am worker 2

性能问题 - GIL

# GIL - Global Interpreter Lock
# see: Understanding the Python GIL
>>> from threading import Thread
>>> def profile(func):
...   def wrapper(*args, **kwargs):
...     import time
...     start = time.time()
...     func(*args, **kwargs)
...     end   = time.time()
...     print end - start
...   return wrapper
...
>>> @profile
... def nothread():
...   fib(35)
...   fib(35)
...
>>> @profile
... def hasthread():
...   t1=Thread(target=fib, args=(35,))
...   t2=Thread(target=fib, args=(35,))
...   t1.start(); t2.start()
...   t1.join(); t2.join()
...
>>> nothread()
9.51164007187
>>> hasthread()
11.3131771088
# !Thread get bad Performance
# since cost on context switch

生产者和消费者

# This architecture make concurrency easy
>>> from threading import Thread
>>> from Queue import Queue
>>> from random import random
>>> import time
>>> q = Queue()
>>> def fib(n):
...   if n<=2:
...     return 1
...   return fib(n-1)+fib(n-2)
...
>>> def producer():
...   while True:
...     wt = random()*5
...     time.sleep(wt)
...     q.put((fib,35))
...
>>> def consumer():
...   while True:
...     task,arg = q.get()
...     print task(arg)
...     q.task_done()
...
>>> t1 = Thread(target=producer)
>>> t2 = Thread(target=consumer)
>>> t1.start();t2.start()

 

线程池模板

# producer and consumer architecture
from Queue import Queue
from threading import Thread

class Worker(Thread):
   def __init__(self,queue):
      super(Worker, self).__init__()
      self._q = queue
      self.daemon = True
      self.start()
   def run(self):
      while True:
         f,args,kwargs = self._q.get()
         try:
            print f(*args, **kwargs)
         except Exception as e:
            print e
         self._q.task_done()

class ThreadPool(object):
   def __init__(self, num_t=5):
      self._q = Queue(num_t)
      # Create Worker Thread
      for _ in range(num_t):
         Worker(self._q)
   def add_task(self,f,*args,**kwargs):
      self._q.put((f, args, kwargs))
   def wait_complete(self):
      self._q.join()

def fib(n):
   if n <= 2:
      return 1
   return fib(n-1)+fib(n-2)

if __name__ == '__main__':
   pool = ThreadPool()
   for _ in range(3):
      pool.add_task(fib,35)
   pool.wait_complete()

 

使用多处理ThreadPool

# ThreadPool is not in python doc
>>> from multiprocessing.pool import ThreadPool
>>> pool = ThreadPool(5)
>>> pool.map(lambda x: x**2, range(5))
[0, 1, 4, 9, 16]

与“map” 性能比较

# pool will get bad result since GIL
import time
from multiprocessing.pool import \
     ThreadPool

pool = ThreadPool(10)
def profile(func):
    def wrapper(*args, **kwargs):
       print func.__name__
       s = time.time()
       func(*args, **kwargs)
       e = time.time()
       print "cost: {0}".format(e-s)
    return wrapper

@profile
def pool_map():
    res = pool.map(lambda x:x**2,
                   range(999999))

@profile
def ordinary_map():
    res = map(lambda x:x**2,
              range(999999))

pool_map()
ordinary_map()

输出:

$ python test_threadpool.py
pool_map
cost: 0.562669038773
ordinary_map
cost: 0.38525390625

 

互斥锁

最简单的同步原语锁

>>> from threading import Thread
>>> from threading import Lock
>>> lock = Lock()
>>> def getlock(id):
...   lock.acquire()
...   print "task{0} get".format(id)
...   lock.release()
...
>>> t1=Thread(target=getlock,args=(1,))
>>> t2=Thread(target=getlock,args=(2,))
>>> t1.start();t2.start()
task1 get
task2 get

# using lock manager
>>> def getlock(id):
...   with lock:
...     print "task%d get" % id
...
>>> t1=Thread(target=getlock,args=(1,))
>>> t2=Thread(target=getlock,args=(2,))
>>> t1.start();t2.start()
task1 get
task2 get

 

Deadlock

Happen when more than one mutex lock.

>>> import threading
>>> import time
>>> lock1 = threading.Lock()
>>> lock2 = threading.Lock()
>>> def task1():
...   with lock1:
...     print "get lock1"
...     time.sleep(3)
...     with lock2:
...       print "No deadlock"
...
>>> def task2():
...   with lock2:
...     print "get lock2"
...     with lock1:
...       print "No deadlock"
...
>>> t1=threading.Thread(target=task1)
>>> t2=threading.Thread(target=task2)
>>> t1.start();t2.start()
get lock1
 get lock2

>>> t1.isAlive()
True
>>> t2.isAlive()
True

实现“监控”

Using RLock

# ref: An introduction to Python Concurrency - David Beazley
from threading import Thread
from threading import RLock
import time

class monitor(object):
   lock = RLock()
   def foo(self,tid):
      with monitor.lock:
         print "%d in foo" % tid
         time.sleep(5)
         self.ker(tid)

   def ker(self,tid):
      with monitor.lock:
         print "%d in ker" % tid
m = monitor()
def task1(id):
   m.foo(id)

def task2(id):
   m.ker(id)

t1 = Thread(target=task1,args=(1,))
t2 = Thread(target=task2,args=(2,))
t1.start()
t2.start()
t1.join()
t2.join()

output:

$ python monitor.py
1 in foo
1 in ker
2 in ker

控制primitive资源

使用信号量

from threading import Thread
from threading import Semaphore
from random    import random
import time

# limit resource to 3
sema = Semaphore(3)
def foo(tid):
    with sema:
        print "%d acquire sema" % tid
        wt = random()*5
        time.sleep(wt)
    print "%d release sema" % tid

threads = []
for _t in range(5):
    t = Thread(target=foo,args=(_t,))
    threads.append(t)
    t.start()
for _t in threads:
    _t.join()

输出:

python semaphore.py
0 acquire sema
1 acquire sema
2 acquire sema
0 release sema
 3 acquire sema
2 release sema
 4 acquire sema
1 release sema
4 release sema
3 release sema

Ensure tasks has done

Using ‘event’

from threading import Thread
from threading import Event
import time

e = Event()

def worker(id):
   print "%d wait event" % id
   e.wait()
   print "%d get event set" % id

t1=Thread(target=worker,args=(1,))
t2=Thread(target=worker,args=(2,))
t3=Thread(target=worker,args=(3,))
t1.start()
t2.start()
t3.start()

# wait sleep task(event) happen
time.sleep(3)
e.set()

output:

python event.py
1 wait event
2 wait event
3 wait event
2 get event set
 3 get event set
1 get event set

线程安全优先队列

Using ‘condition’

import threading
import heapq
import time
import random

class PriorityQueue(object):
    def __init__(self):
        self._q = []
        self._count = 0
        self._cv = threading.Condition()

    def __str__(self):
        return str(self._q)

    def __repr__(self):
        return self._q

    def put(self, item, priority):
        with self._cv:
            heapq.heappush(self._q, (-priority,self._count,item))
            self._count += 1
            self._cv.notify()

    def pop(self):
        with self._cv:
            while len(self._q) == 0:
                print("wait...")
                self._cv.wait()
            ret = heapq.heappop(self._q)[-1]
        return ret

priq = PriorityQueue()
def producer():
    while True:
        print(priq.pop())

def consumer():
    while True:
        time.sleep(3)
        print("consumer put value")
        priority = random.random()
        priq.put(priority,priority*10)

for _ in range(3):
    priority = random.random()
    priq.put(priority,priority*10)

t1=threading.Thread(target=producer)
t2=threading.Thread(target=consumer)
t1.start();t2.start()
t1.join();t2.join()

output:

python3 thread_safe.py
0.6657491871045683
0.5278797439991247
0.20990624606296315
wait...
consumer put value
0.09123101305407577
wait...

Multiprocessing

Solving GIL problem via processes

>>> from multiprocessing import Pool
>>> def fib(n):
...     if n <= 2:
...         return 1
...     return fib(n-1) + fib(n-2)
...
>>> def profile(func):
...     def wrapper(*args, **kwargs):
...         import time
...         start = time.time()
...         func(*args, **kwargs)
...         end   = time.time()
...         print end - start
...     return wrapper
...
>>> @profile
... def nomultiprocess():
...     map(fib,[35]*5)
...
>>> @profile
... def hasmultiprocess():
...     pool = Pool(5)
...     pool.map(fib,[35]*5)
...
>>> nomultiprocess()
23.8454811573
>>> hasmultiprocess()
13.2433719635

自定义多处理映射

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe,x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),
          args=(c,x))
          for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p,c) in pipe]

print parmap(lambda x:x**x,range(1,5))

优雅的方法来杀死所有的子进程

from __future__ import print_function

import signal
import os
import time

from multiprocessing import Process, Pipe

NUM_PROCESS = 10

def aurora(n):
    while True:
        time.sleep(n)

if __name__ == "__main__":
    procs = [Process(target=aurora, args=(x,))
                for x in range(NUM_PROCESS)]
    try:
        for p in procs:
            p.daemon = True
            p.start()
        [p.join() for p in procs]
    finally:
        for p in procs:
            if not p.is_alive(): continue
            os.kill(p.pid, signal.SIGKILL)

简单的循环调度程序

>>> def fib(n):
...   if n <= 2:
...     return 1
...   return fib(n-1)+fib(n-2)
...
>>> def gen_fib(n):
...   for _ in range(1,n+1):
...     yield fib(_)
...
>>> t=[gen_fib(5),gen_fib(3)]
>>> from collections import deque
>>> tasks = deque()
>>> tasks.extend(t)
>>> def run(tasks):
...   while tasks:
...     try:
...       task = tasks.popleft()
...       print task.next()
...       tasks.append(task)
...     except StopIteration:
...       print "done"
...
>>> run(tasks)
1
1
1
1
2
2
3
done
5
done

具有阻塞功能的调度器

# ref: PyCon 2015 - David Beazley
import socket
from select import select
from collections import deque

tasks  = deque()
r_wait = {}
s_wait = {}

def fib(n):
    if n <= 2:
        return 1
    return fib(n-1)+fib(n-2)

def run():
    while any([tasks,r_wait,s_wait]):
        while not tasks:
            # polling
            rr, sr, _ = select(r_wait, s_wait, {})
            for _ in rr:
                tasks.append(r_wait.pop(_))
            for _ in sr:
                tasks.append(s_wait.pop(_))
        try:
            task = tasks.popleft()
            why,what = task.next()
            if why == 'recv':
                r_wait[what] = task
            elif why == 'send':
                s_wait[what] = task
            else:
                raise RuntimeError
        except StopIteration:
            pass

def fib_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
    sock.bind(('localhost',5566))
    sock.listen(5)
    while True:
        yield 'recv', sock
        c, a = sock.accept()
        tasks.append(fib_handler(c))

def fib_handler(client):
    while True:
        yield 'recv', client
        req  = client.recv(1024)
        if not req:
            break
        resp = fib(int(req))
        yield 'send', client
        client.send(str(resp)+'\n')
    client.close()

tasks.append(fib_server())
run()

output: (bash 1)

$ nc loalhost 5566
20
6765

output: (bash 2)

$ nc localhost 5566
10
55

PoolExecutor

# python2.x is module futures on PyPI
# new in Python3.2
>>> from concurrent.futures import \
...     ThreadPoolExecutor
>>> def fib(n):
...     if n<=2:
...         return 1
...     return fib(n-1) + fib(n-2)
...
>>> with ThreadPoolExecutor(3) as e:
...     res= e.map(fib,[1,2,3,4,5])
...     for _ in res:
...         print(_, end=' ')
...
1 1 2 3 5 >>>
# result is generator?!
>>> with ThreadPoolExecutor(3) as e:
...   res = e.map(fib, [1,2,3])
...   inspect.isgenerator(res)
...
True

# demo GIL
from concurrent import futures
import time

def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

def thread():
    s = time.time()
    with futures.ThreadPoolExecutor(2) as e:
        res = e.map(fib, [35]*2)
        for _ in res:
            print(_)
    e = time.time()
    print("thread cost: {}".format(e-s))

def process():
    s = time.time()
    with futures.ProcessPoolExecutor(2) as e:
        res = e.map(fib, [35]*2)
        for _ in res:
            print(_)
    e = time.time()
    print("pocess cost: {}".format(e-s))


# bash> python3 -i test.py
>>> thread()
9227465
9227465
thread cost: 12.550225019454956
>>> process()
9227465
9227465
pocess cost: 5.538189888000488

What “with ThreadPoolExecutor” doing?

from concurrent import futures

def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

with futures.ThreadPoolExecutor(3) as e:
    fut = e.submit(fib, 30)
    res = fut.result()
    print(res)

# equal to
e = futures.ThreadPoolExecutor(3)
fut = e.submit(fib, 30)
fut.result()
e.shutdown(wait=True)
print(res)

output:

$ python3 thread_pool_exec.py
832040
832040

Future Object

# future: deferred computation
# add_done_callback
from concurrent import futures

def fib(n):
    if n <= 2:
        return 1
    return fib(n-1) + fib(n-2)

def handler(future):
    res = future.result()
    print("res: {}".format(res))

def thread_v1():
    with futures.ThreadPoolExecutor(3) as e:
        for _ in range(3):
            f = e.submit(fib, 30+_)
            f.add_done_callback(handler)
    print("end")

def thread_v2():
    to_do = []
    with futures.ThreadPoolExecutor(3) as e:
        for _ in range(3):
            fut = e.submit(fib, 30+_)
            to_do.append(fut)
        for _f in futures.as_completed(to_do):
            res = _f.result()
            print("res: {}".format(res))
    print("end")

output:

$ python3 -i fut.py
>>> thread_v1()
res: 832040
res: 1346269
res: 2178309
end
>>> thread_v2()
res: 832040
res: 1346269
res: 2178309
end

Future error handling

from concurrent import futures

def spam():
    raise RuntimeError

def handler(future):
    print("callback handler")
    try:
        res = future.result()
    except RuntimeError:
        print("get RuntimeError")

def thread_spam():
    with futures.ThreadPoolExecutor(2) as e:
        f = e.submit(spam)
        f.add_done_callback(handler)

output:

$ python -i fut_err.py
>>> thread_spam()
callback handler
get RuntimeError