🥷
🥷
文章目录
  1. 基本概念
  2. GIL
    1. Performance Notes
  3. 并行编程(Parallel Programing)
    1. 线程
      1. MultiThread
      2. Thread Pool
    2. 进程
      1. MultiProcess
      2. ProcessPool
    3. 协程
    4. 其他
      1. 队列:
  4. Other
  5. References

GIL和Python并行编程(一)

基本概念

  • 进程

    一个进程有多个线程,进程拥有独立的内存空间,进程间通信较为方便

  • 线程

    不可划分的最小单位

  • 协程

    由某个调度程序进行控制调度

  • 阻塞非阻塞

    关注点是程序在等待调用结果(消息,返回值)时的状态,处理IO时都是阻塞和非阻塞都是同步IO,只有使用了特殊的API才是异步IO

  • 异步与同步

    关注点是消息通信机制,同步则在发出调用之后即返回结果,或者等待到这个结果。异步是在调用之后不会得到结果,被调用函数计算出结果之后会通知调用者,或者通过回调函数处理。

GIL

pass

Performance Notes

  • 多线程,多进程(单机多进程,多机多进程)
  • 多进程可以直接用multiprocess或者从shell之类启动多个python进程
  • 分布式的方式
  • 直接采用gevent的monkey patch(协程)
  • 编写扩展,ctypes编写c库,由python调用,或者编写rust扩展也行,
  • 换解释器,换成pypy解释器,据说代码运行速度可以快6.3倍,但是并不实用,好多库的实现依赖不同

并行编程(Parallel Programing)

talk is cheap, show me the demo code…

线程

MultiThread

一般来讲,直接继承自threading.Thread,即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

import threading
import time

exitFlag = 0

class mThread( threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)

self.threadID = threadID
self.name = name
self.counter = counter

def run(self):
print("Starting "+ self.name)
print_time(self.name, self.counter, 5)
print("Esxiting " + self.name)

def print_time(threadName, delay, counter):
while counter:
if exitFlag:
thread.exit()
time.sleep(delay)

print("{} {}".format(threadName,time.ctime(time.time())))

counter -= 1

thread1 = mThread(1, "Thread-1", 1)
thread2 = mThread(2, "Thread-2", 2)

thread1.start()
thread2.start()

print("Exiting Main Thread")

Thread Pool

不建议使用,使用进程池吧

  • 写法一,使用concurrent模块的ThreadPool
1
2
3
4
5
6

from concurrent import ThreadPool

p = ThreadPool(4)
p.map(f,range(10))

  • 写法二,使用concurrent模块的ThreadPoolExecutor以及with
1
2
3
4
5
6
7
8
9

from concurrent import futures

def f(x):
return x*x

with futures.ThreadPoolExecutor(max_workers=4) as ex:
print(list(ex.map(f, range(10))))

  • 写法三,使用gevent库的ThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# code from https://github.com/gevent/gevent/blob/master/examples/threadpool.py

import time
import gevent
from gevent.threadpool import ThreadPool


pool = ThreadPool(3)
start = time.time()
for _ in range(4):
pool.spawn(time.sleep, 1)
gevent.wait()
delay = time.time() - start
print('Running "time.sleep(1)" 4 times with 3 threads. Should take about 2 seconds: %.3fs' % delay)

进程

MultiProcess

1
2
3
4
5
6
7
8
9
10
11
12
13

import multiprocessing

def worker(num):
print("Worker: {}".format(num))

jobs = []

for i in range(5):
p = multiprocessing.Process(target=worker,args=(i,))
jobs.append(p)
p.start()

ProcessPool

  • 写法一,使用multiprocessing模块的Pool
1
2
3
4
5
6
7
8
9

from multiprocessing import Pool
p = Pool(4)

def f(x):
return x*x

print(p.map(f, range(10)))

  • 写法二,使用multiprocessing模块的Pool以及上下文管理器With
1
2
3
4
5
6
7
8
9
10
11

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
return x*x

with Pool(processes=4) as pool:
print(pool.map(f,range(10)))

  • 写法三,使用concurrent模块的ProcessPoolExecutor以及with
1
2
3
4
5
6
7
8
9
10
11
12
13
14

from concurrent import futures

def f(x):
return x*x

with futures.ProcessPoolExecutor(max_workers=4) as ex:
print(list(ex.map(f, range(10))))

# Also you can use ex.submit, and get it's value by result() method
#for i in range(10):
# res = ex.submit(f,i)
# print(res.result())

协程

使用gevent的猴子补丁,尽量早,在代码的一开始就导入并使用。不用对其他代码进行修改就可以达到加速。

1
2
3
4

from gevent import monkey
monkey.patch_all()

其他的用的并不多,不是很清楚,还是等更熟悉了再添加。

其他

队列:

  • queue: A synchronized queue class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# demo code from https://docs.python.org/3/library/queue.html

def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)

for item in source():
q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

import multiprocessing
import random
import string

class Fun:
def __init__(self, name):
self.name = name

def do_it(self):
name = multiprocessing.current_process().name
print("{} Just Do it; Fun {}".format(name, self.name))


def worker(q):
obj = q.get()
obj.do_it()


queue = multiprocessing.Queue()

p = multiprocessing.Process(target=worker, args=(queue,))
p.start()

queue.put(Fun("{}".format(random.choice(string.ascii_uppercase))))

queue.close()
queue.join_thread()
p.join()

Other

下篇详细写GIL和并行编程中的通信,进程间通信等。

References