Python并发编程

python基础

浏览数:381

2019-5-20

本文只是介绍Python并发编程的各种实现, 关于并发问题更深入讨论可以参考进程与线程.

进程是正在运行程序的抽象, 每个进程中可以包含多个线程. 进程拥有独立的内存空间, 同进程下的线程共享内存空间.

在单核系统上, 进程或者线程通过交替执行来实现并发, 但并没有真正地并行.

在多核系统的每个核心上, 线程同样采用交替执行的方式. 但因为多核的存在, 不同核心上的线程可以真正的并行.

协程是在单线程中, 通过控制不同任务执行顺序进行协作的机制.协程实现多任务的协作,而非多任务的并发执行.

异步调用将耗时操作交由他人完成保证自己不被阻塞, 并行仍由多线程或内核的IO机制实现.

进程

multiprocessing模块提供了一个Process类来代表一个进程对象:

from multiprocessing import Process
import os

def run(name):
    print('sub process %s : %d' % (name, os.getpid()) )

if __name__ == '__main__':
    pro = Process(target = run, args = ('name',))
    pro.start()
    pro.join()
    print('main process: %d' % os.getpid() )

在创建线程对象时,将函数对象与target参数绑定作为新进程的主函数.调用进程对象的start方法,start方法将创建新进程并调用其target函数开始运行.

调用进程对象pro的join()方法,将使调用进程(主进程)等待进程pro运行完成后继续运行.

进程池

进程池Pool可以批量管理进程,Pool的构造函数接受一个int值作为最大进程数,默认为计算机逻辑核心数.

from multiprocessing import Process
from multiprocessing import Pool
import os

def run():
    print('sub process: %d' % os.getpid() )

if __name__ == '__main__':
    pool = Pool(4);
    for i in range(0,4):
        pool.apply_async(run)
    #向进程池中添加函数对象作为子进程的主函数

    pool.close()
    #close()之后进程池中不能添加新的进程
    #只有close()之后才能调用join()方法
    pool.join()
    #join()使得在进程池中所有进程都执行完毕后,主调进程继续执行
    print('process: %d' % os.getpid() )

fork

fork命令是UNIX系列操作系统中的一个重要命令, 用它在当前进程下创建一个子进程作为当前进程的副本.若成功则父进程返回子进程pid, 子进程返回0.

一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID.

Python的os模块封装了fork调用,可以在Python程序中轻松创建子进程.由于Windows没有fork调用,该方法无法在Windows上运行.

subprocess

subprocess模块用于调用外部进程:

import subprocess
r = subprocess.call(['ping','www.python.org'])

subprocess.call方法接受一个列表作为参数,并将其作为命令行发送给OS并返回外部进程的返回值.上述调用,等价于在终端中输入ping www.python.org.

进程间通信

Python提供了多种进程间通信机制,最常用的是Queue和Pipe:

from multiprocessing import Process,Pipe
import os

def fun(conn):
    conn.send(['msg:',os.getpid()]);
    conn.close();

if __name__ == '__main__':
    (sendConn,recvConn) = Pipe();
    pro = Process(target=fun,args=(sendConn,))
    pro.start();
    pro.join();
    print(recvConn.recv());

Pipe()返回一对相互连接的read-write PipeConnection对象,每个对象都具有send()和recv()方法,一个对象通过send()发送字符串,与其关联的对象使用recv()方法可以获得该消息.

Queue是python中的队列,为了在进程通信中使用一般采用其派生类JoinableQueue.

import multiprocessing

q = multiprocessing.Queue()

def readerProc():
    print(q.get())

if __name__ == '__main__':
    reader = multiprocessing.Process(target=readerProc)
    reader.start()
    q.put(100)
    reader.join()

线程

Python标准库提供了_thread模块作为线程的底层实现,并提供了高级模块threading_thread进行了封装.

通常情况下可以使用高级模块threading,其语法与Process类似:

import threading

def run():
    print('current thread:%s' % threading.current_thread().name);

t = threading.Thread(target=run,name='new_thread');
t.start();
t.join();
print('current thread:%s' % threading.current_thread().name);

输出:

current thread:new_thread
current thread:MainThread

每个进程至少拥有一个线程,Python主进程的默认线程称为MainThread,threading模块的current_thread()方法将返回当前线程对象.

Mutex

线程与进程的区别在于进程拥有自己的内存空间,而同一个进程下的线程需要共享内存空间.

对一个对象的修改一般是由多条机器指令完成的,多线程模式下各线程的指令交替执行,可能出现一个线程对对象的修改没有完成,而另一个线程却在此时访问该对象造成错误.

为了防止这种错误, 可以使用锁机制:

import threading

shared = 0;
lock = threading.Lock();

def vary():
    global shared
    # Declare "shared" is a global object instead of a local object
    shared = shared + 1;

def run():
    lock.acquire();
    try:
        vary();
        print('new_thread %d\n' % shared);
    except Exception, e:
        raise
    finally:
        lock.release();

t1 = threading.Thread(target=run,name='new_thread',);
t1.start();
t1.join();
print('MainThread %d' % shared);

获取锁之后必须释放,否则会造成死锁.

当线程试图调用vary()方法时, 会先检查该线程是否获得了锁. 若线程获得了锁则继续执行, 否则线程将会被挂起直到得到锁才会继续执行.

ThreadLocal

ThreadLocal机制使得线程可以建立自己的局部变量不与其它线程共享,ThreadLocal对象是全局的(线程共享),其每一个属性都是一个词典以线程ID作为key进行取值.

import threading

def run(name):
    local_name.name = name;
    print('Hello, %s' % local_name.name);


local_name = threading.local();
t1 = threading.Thread(target=run,args=('World',));
t2 = threading.Thread(target=run,args=('Home',));
t1.start();
t2.start();

GIL

GIL(Global Interpreter Lock)是CPython的历史遗留问题. GIL的存在使得CPython上的多线程程序基本无法并行,不能真正利用多核的计算能力.

线程只有在获得GIL之后才可以执行. CPython会计算当前线程已执行的字节码数量(opcode计数),达到阈值后就强制释放GIL,同时触发一次线程切换.

这种模式在只有一个CPU核心的情况下毫无问题.任何一个线程被唤起时都能成功获得到GIL(因为只有释放了GIL才会引发线程调度).

因为GIL释放和获取间隔较短而唤醒线程间隔较长,在多核情况下,原来持有GIL的线程很可能再次获得GIL,运行在其它核心上的线程唤醒后却得不到GIL,白白浪费了CPU.

GIL使得CPython多线程程序效率下降严重但却确保了单线程程序的执行效率.如果确实需要多任务处理(如IO密集任务)可以考虑多进程与异步调用,如果需要多核的计算能力优势可以选择其它语言.

协程

协程(Coroutine),又称微线程,纤程.多线(进)程模型下, 各线程执行的先后顺序是不可知的, 协程可以控制它们之间执行的顺序实现可控的写作.

yield关键字是协程的核心功能, yield将一个值返回主调用函数, 然后下次调用时从yield下面的一条语句继续执行, 整个过程中函数作用域内的数据不变.

传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待.如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产.

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('Consuming %s' % n)
        r = 'OK'

def producer(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('Producing %s' % n)
        r = c.send(n)
        print('Consumer return: %s' % r)
    c.close()

c = consumer()
producer(c)

输出:

Producing 1
Consuming 1
Consumer return: OK
Producing 2
Consuming 2
Consumer return: OK
Producing 3
Consuming 3
Consumer return: OK
Producing 4
Consuming 4
Consumer return: OK
Producing 5
Consuming 5
Consumer return: OK

注意到consumer函数是一个generator,把一个consumer传入produce后:

  1. 首先调用c.send(None)启动生成器;

  2. producer生产一个产品, 并调用c.send(n)把实参值赋给yield的左值n1并从下一行开始继续执行consumer.

  3. consumer通过yield拿到消息,处理,然后通过yield把结果传回;

  4. produce拿到consumer处理的结果,继续生产下一条消息;

  5. produce决定停止生产,通过c.close()关闭consumer,整个过程结束.

整个流程无锁,由一个线程执行,producer和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务.

异步调用

asyncio是Python3.4中引入的异步调用模块, async/await 是Python3.5中引入的新语法使得可读性更高.

asyncio内建一个事件循环(eventloop),事件循环监听事件, 并将事件发送给响应函数进行处理.

使用async关键字声明一个响应函数为异步函数, 当响应函数遇到耗时操作时, 通过await关键字通知事件循环.

事件循环暂停该事件的处理, 继续处理其它事件.当耗时操作完成后, 事件循环将继续执行刚才被挂起的异步函数.

耗时操作交由异步API的提供者在另外的线程中完成,主线程并未执行耗时操作, 使得主线程可以正常响应其它事件不至阻塞.

#!/usr/local/bin/python3.5
import threading
import asyncio

async def hello():
    print('Hello world! (%s)' % threading.currentThread())
    await asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
# start event loop
loop.close()

输出:

Hello world! (<_MainThread(MainThread, started 7984)>)
Hello world! (<_MainThread(MainThread, started 7984)>)
Hello again! (<_MainThread(MainThread, started 7984)>)
Hello again! (<_MainThread(MainThread, started 7984)>)
({<Task finished coro=<hello() done, defined at <stdin>:1> result=None>, <Task finished coro=<hello() done, defined at <stdin>:1> result=None>}, set())

上述示例中包含两个耗时操作[hello(), hello()], 但是耗时操作sleep(1)异步执行主线程并未被阻塞.

本示例中事件循环和异步函数在同一个线程内执行即可避免阻塞.当然也可以在不同线程执行,异步机制将保证异步函数线程不被阻塞.

asyncio库可以在其它线程中完成耗时操作, 更可能的解决方案是采用操作系统内置的异步支持.

异步调用将耗时操作交由他人完成保证自己不被阻塞, 只是线程内部的一种策略.并发的实现仍由操作系统或者运行时环境考虑.

作者:-Finley-