前言

在理解协程时要从根本上把yield视作控制流程的方式, 这样就好理解协程了

生成器是如何进化成协程的

协程的底层架构在PEP 342—Coroutines via Enhanced Generators中定义。 在Python 2.5(2006年)实现了。自此之后, yield关键字可以在表达式中使用,而且生成器API中增加了send(value)方法。生成器的调用方可以使用send(...)方法发送数据, 发送的数据会成为生成器函数中yield表达式的值。因此,生成器可以作为协程使用。协程是指一个过程,这个过程与调用方协作, 产出由调用方提供的值

除此之外,PEP 342还添加了throw(...)close()方法:前者的作用是让调用方抛出异常,在生成器中处理;后者的作用是终止生成器。

PEP 380对生成器函数的句法做了两处改动:

  • 生成器可以返回一个值;(在这之前,如果在生成器中给return语句提供值, 会抛出SyntaxError异常。
  • 新引入了yield from句法, 使用它可以把复杂的生成器重构成小型的嵌套生成器,省去了之前把生成器的工作委托给子生成器所需的大量样板代码

用作协程的生成器的基本行为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def simple_coroutine():
    print('-> coroutine started...')
    x = yield
    print('-> coroutine received...', x)

c = simple_coroutine()
print(c)
>>> <generator object simple_coroutine at 0x7f8d60c7d468>

next(c)
# c.send(None)
>>> -> coroutine started...

c.send(13)
>>> -> coroutine received... 13
>>> Traceback (most recent call last):
  File "/home/lrhaoo/桌面/notebook/fluent_python/tempCodeRunnerFile.md", line 13, in <module>
    c.send(13)
StopIteration
  • yield在表达式中使用;如果协程只需从客户那里接收数据,那么产出的值是None——这个值是隐式指定的, 因为yield关键字右边没有表达式。
  • 与创建生成器的方式一样,调用函数得到生成器对象。(注意是生成器对象)
  • 首先要调用next(...)函数启动生成器,执行到yield语句处暂停, 不启动的则无法发送数据。(也可以使用c.send(None)启动)
  • 调用send()方法后, 协程定义体中的yield表达式会计算出13; 现在, 协程会恢复, 一直运行到下一个yield表达式, 或者终止
  • 控制权流动到协程定义体的末尾,导致生成器像往常一样抛出StopIteration异常。
  • 协程的四个状态: GEN_CREATED, GEN_RUNNING, GEN_SUSPENDED, GEN_CLOSED
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def simple_coroutine(a):
    print('-> started: a =', a)
    b = yield a
    print('-> Received: b =', b)
    c = yield a + b
    print('-> Received: c =', c)

c = simple_coroutine(1)
print(c.send(None))
>>> -> started: a = 1
>>> 1

print(c.send(2))
>>> -> Received: b = 2
>>> 3

c.send(23333)
>>> -> Received: c = 23333
>>> Traceback (most recent call last):
        File "/home/lrhaoo/桌面/notebook/fluent_python/tempCodeRunnerFile.md", line 17, in <module>
            c.send(23333)
        StopIteration

b = yield a这样理解: 启动语句c.send(None)使得程序执行到yield a, 之后一直等待调用者输入,直到执行c.send(2)后,2被赋值给b,之后继续执行到c = yield a + b中的yield a + b,等待下一个输入。(举个例子类比后容易理解:我们写一个a+b的程序,运行后,程序等待我们输入a, 然后等待我们输入b,如果我们输入了a后不输入b,则程序会一直等待)(这样就容易理解激活语句到底使得程序运行到哪里->运行至用户输入)

使用协程计算平均值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        new_input = yield average
        total += new_input
        count += 1
        average = total / count

avg = averager()
avg.send(None)

print(avg.send(1))
>>> 1.0
print(avg.send(2))
>>> 1.5
print(avg.send(3))
>>> 2.0
print(avg.send(4))
>>> 2.5
print(avg.send(5))
>>> 3.0

与之前使用闭包实现的那个版本相比,使用协程的好处是, totalcount声明为局部变量即可, 无需使用实例属性或闭包在多次调用之间保持上下文。

预激协程的装饰器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from functools import wraps

def coroutine(func):
    @wraps(func)
    def primer(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen
    return primer

@coroutine
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        new_input = yield average
        total += new_input
        count += 1
        average = total / count

avg = averager()
print(avg.send(1))
>>> 1.0
print(avg.send(2))
>>> 1.5

终止协程和异常处理

协程中未处理的异常会向上冒泡,传给next函数或send方法的调用方(即触发协程的对象)
发生了异常的协程再次被调用会抛出StopIteration异常

Python2.5开始,客户代码可以在生成器对象上调用throw()close()。两个方法, 显式地把异常发给协程。

generator.throw(exc_type[, exc_value[, traceback]])

致使生成器在暂停的yield表达式处抛出指定的异常。如果生成器处理了抛出的异常,代码会向前执行到下一个 yield 表达式,而产出的值会成为调用generator.throw方法得到的返回值。如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中。

generator.close()

致使生成器在暂停的yield表达式处抛出GeneratorExit异常。如果生成器没有处理这个异常, 或者抛出StopIteration异常(通常是指运行到结尾), 调用方不会报错。如果收到GeneratorExit异常, 生成器一定不能产出值, 否则解释器会抛出RuntimeError异常。生成器抛出的其他异常会向上冒泡,传给调用方。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from inspect import getgeneratorstate

class MyException(Exception):
    pass

def demo_exc_handling():
    print('-> coroutine started...')
    while True:
        try:
            x = yield
        except MyException:
            print('*** MyException handled. Continuing...')
        else:
            print('-> coroutine received: {!r}'.format(x))
    raise RuntimeError('This line should never run.')

cor = demo_exc_handling()
cor.send(None)
>>> -> coroutine started...

cor.send(2333)
>>> -> coroutine received: 2333

cor.throw(MyException)
>>> *** MyException handled. Continuing...

print(getgeneratorstate(cor))
>>> GEN_SUSPENDED

cor.close()

print(getgeneratorstate(cor))
>>> GEN_CLOSED
  • 如果把MyException异常传入demo_exc_handling协程, 它会处理,(我们的程序在这里处理了)然后继续运行, 如果传入协程的异常没有处理, 协程会停止, 即状态变成GEN_CLOSED

Python3.3引入yield from结构的主要原因之一与把异常传入嵌套的协程有关。另一个原因是让协程更方便地返回值

让协程返回值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        new_input = yield
        if new_input is None:
            break
        total += new_input
        count += 1
        average = total / count
    return (count, average)

avg = averager()
next(avg)
avg.send(1)
avg.send(2)
avg.send(3)
print(avg.send(None))
>>> Traceback (most recent call last):
        File "/home/lrhaoo/桌面/notebook/fluent_python/tempCodeRunnerFile.md", line 19, in <module>
            print(avg.send(None))
        StopIteration: (3, 2.0)

发送None会终止循环, 导致协程结束, 返回结果。一如既往,生成器对象会抛出StopIteration异常。异常对象的value属性保存着返回的值。

使用try/exception改进

1
2
3
4
5
try:
    avg.send(None)
except StopIteration as err:
    result = err.value
print(result)

获取协程的返回值虽然要绕个圈子,但这是PEP 380定义的方式,当我们意识到这一点之后就说得通了: yield from结构会在内部自动捕获StopIteration异常。这种处理方式与for循环处理StopIteration异常的方式一样:循环机制使用用户易于理解的方式处理异常。对yield from结构来说, 解释器不仅会捕获StopIteration异常,还会把value属性的值变成yield from表达式的值。

yield from

首先要知道, yield from是全新的语言结构。它的作用比yield多很多,因此人们认为继续使用那个关键字多少会引起误解。在其他语言中,类似的结构使用await关键字, 这个名称好多了,因为它传达了至关重要的一点: 在生成器gen中使用 yield from subgen()时, subgen会获得控制权, 把产出的值传给gen的调用方, 即调用方可以直接控制subgen。与此同时, gen会阻塞, 等待subgen终止

ps: 写这本书的时候协程的概念还没有从生成器独立出来吗?

1
2
3
4
5
6
7
def gen():
    for c in 'AB':
        yield c
    for i in range(1, 3):
        yield i
print(list(gen()))
>>> ['A', 'B', 1, 2]

改写成:

1
2
3
def gen():
    yield from 'AB'
    yield from range(1, 3)

yield from x表达式对x对象所做的第一件事是, 调用iter(x), 从中获取迭代器。因此, x可以是任何可迭代的对象

ps: 这个例子真的是太x了

一个有趣的的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from collections import Iterable

def flatten(items, ignore_types=(str, bytes)):
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, ignore_types):
            yield from flatten(x)
        else:
            yield x

items = [1, 2, [3, 4, [5, 6], 7], 8]

# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):
    print(x)

items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
for x in flatten(items):
    print(x)

yield from的主要功能是打开双向通道, 把最外层的调用方与最内层的子生成器连接起来, 这样二者可以直接发送和产出值, 还可以直接传入异常, 而不用在位于中间的协程中添加大量处理异常的样板代码。有了这个结构,协程可以通过以前不可能的方式委托职责。

PEP 380使用了一些专门的术语。

  • 委派生成器: 包含yield from <iterable>表达式的生成器函数。
  • 子生成器: 从yield from表达式中<iterable>部分获取的生成器. 子生成器可能是简单的迭代器,只实现了__next__方法
  • 调用方: PEP 380使用“调用方”这个术语指代调用委派生成器的客户端代码。

一个很好的例子

『图片』

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        new_input = yield 
        if new_input is None:
            break
        total += new_input
        count += 1
        average = total / count
    return (count, average)


def grouper(results, key):
    while True:
        results[key] = yield from averager()
    

def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(result[0], group, result[1], unit))

def main(data):
    results = dict()
    for key, values in data.items():
        group = grouper(results, key)
        next(group)
        for value in values:
            group.send(value)
        group.send(None)
    
    report(results)

data = {
    'girls;kg': [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m': [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg': [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m': [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

main(data)
>>> 9 boys  averaging 40.42kg
    9 boys  averaging 1.39m
    10 girls averaging 42.04kg
    10 girls averaging 1.43m
  • averager()在这里作为子生成器, grouper()委托生成器
  • grouper发送的每个值都会经由yield from处理, 通过管道传给averager实例。
  • 调用next(group), 预激委派生成器grouper, 此时进入while True循环,调用子生成器averager
  • 调用子生成器averager后, grouper会在yield from表达式处暂停, 等待averager实例处理客户端发来的值。averager实例运行完毕后, 返回的值绑定到results[key] 上。(划重点)
  • 如果外层for循环的末尾没有group.send(None), 那么averager子生成器永远不会终止, 委派生成器group永远不会再次激活, 因此永远不会为results[key]赋值。(即如果子生成器不终止,委派生成器会在yield from表达式处永远暂停。如果是这样,程序不会向前执行,因为yield from(与yield一样)把控制权转交给客户代码(即,委派生成器的调用方)了。

yield from的意义

  • 子生成器产出的值都直接传给委派生成器的调用方(即客户端代码)。(ps:我觉得这个看情况吧,毕竟上面那个例子就不是这样的)
  • 使用send()方法发给委派生成器的值都直接传给子生成器。如果发送的值是None, 那么会调用子生成器的__next__()方法。如果发送的值不是None, 那么会调用子生成器的send()方法。如果调用的方法抛出StopIteration异常, 那么委派生成器恢复运行。任何其他异常都会向上冒泡,传给委派生成器。
  • 生成器退出时, 生成器(或子生成器)中的return expr表达式会触发StopIteration(expr)异常抛出。yield from表达式的值是子生成器终止时传给StopIteration异常的第一个参数(之前也提到过,return的值通过构造的异常类实例传递出去)

yield from结构的另外两个特性与异常和终止有关

  • 传入委派生成器的异常, 除了GeneratorExit之外都传给子生成器的throw()方法。如果调用throw()方法时抛出StopIteration异常, 委派生成器恢复运行。StopIteration 之外的异常会向上冒泡,传给委派生成器。
  • 如果把GeneratorExit异常传入委派生成器,或者在委派生成器上调用close()方法, 那么在子生成器上调用close()方法, 如果它有的话。如果调用close()方法导致异常抛出, 那么异常会向上冒泡, 传给委派生成器; 否则,委派生成器抛出GeneratorExit异常

ps:这两点把爷给看吐了

推荐直接去看PEP 380,里边有一段伪代码解释:

1
RESULT = yield from EXPR

等价于

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
_i = iter(EXPR)
try:
    _y = next(_i)
except StopIteration as _e:
    _r = _e.value
else:
    while 1:
        try:
            _s = yield _y
        except GeneratorExit as _e:
            try:
                _m = _i.close
            except AttributeError:
                pass
            else:
                _m()
            raise _e
        except BaseException as _e:
            _x = sys.exc_info()
            try:
                _m = _i.throw
            except AttributeError:
                raise _e
            else:
                try:
                    _y = _m(*_x)
                except StopIteration as _e:
                    _r = _e.value
                    break
        else:
            try:
                if _s is None:
                    _y = next(_i)
                else:
                    _y = _i.send(_s)
            except StopIteration as _e:
                _r = _e.value
                break
RESULT = _r

在生成器里使用return value,相当于raise StopIteration(value)

ps: 我看了一下,感觉还是那几个except不好理解,也就是异常了怎么处理, 主要是GeneratorExitBaseException这两个异常在什么情况下触发弄不清楚就不好理解. (先鸽了,以后需要深入的时候再来看) 这本书也有对那段代码的注解。PEP 380里没有注解,因为那是写给语言专家看的,不过有点基础的可以大致猜猜是什么意思

使用案例:使用协程做离散事件仿真

协程能自然地表述很多算法,例如仿真、游戏、异步 I/O,以及其他事件驱动型编程形式或协作式多任务。——Guido van Rossum和Phillip J. Eby PEP 342—Coroutines via Enhanced Generators

通过仿真系统能说明如何使用协程代替线程实现并发的活动, 而且对理解asyncio包有极大的帮助

离散事件仿真简介

离散事件仿真(Discrete Event Simulation,DES)是一种把系统建模成一系列事件的仿真类型。在离散事件仿真中,仿真“钟”向前推进的量不是固定的,而是直接推进到下一个事件模型的模拟时间。假如我们抽象模拟出租车的运营过程,其中一个事件是乘客上车,下一个事件则是乘客下车。不管乘客坐了 5 分钟还是 50 分钟,一旦乘客下车,仿真钟就会更新,指向此次运营的结束时间。使用离散事件仿真可以在不到一秒钟的时间内模拟一年的出租车运营过程。这与连续仿真不同,连续仿真的仿真钟以固定的量(通常很小)不断向前推进。显然,回合制游戏就是离散事件仿真的例子:游戏的状态只在玩家操作时变化,而且一旦玩家决定下一步怎么走了,仿真钟就会冻结。而实时游戏则是连续仿真,仿真钟一直在运行,游戏的状态在一秒钟之内更新很多次,因此反应慢的玩家特别吃亏。

这两种仿真类型都能使用多线程或在单个线程中使用面向事件的编程技术(例如事件循环驱动的回调或协程)实现。可以说,为了实现连续仿真,在多个线程中处理实时并行的操作更自然。而协程恰好为实现离散事件仿真提供了合理的抽象

在仿真领域,进程这个术语指代模型中某个实体的活动,与操作系统中的进程无关。仿真系统中的一个进程可以使用操作系统中的一个进程实现,但是通常会使用一个线程或一个协程实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import random
from queue import PriorityQueue


DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 180
SEARCH_DURATION = 5
TRIP_DURATION = 20
DEPARTURE_INTERVAL = 5

class Event:
    def __init__(self, time, taxi_id, action):
        self.time = time
        self.taxi_id = taxi_id
        self.action = action
    
    def __iter__(self):
        return iter((self.time, self.taxi_id, self.action))

    def __lt__(self, rhs):
        return self.time < rhs.time
    
    def __repr__(self):
        return 'Event(taxi_id={}, time={}, action={})'.format(self.taxi_id, self.time, self.action)


def taxi_process(taxi_id, trips, start=0):
    time = yield Event(start, taxi_id, 'leave garage')
    for i in range(trips):
        time = yield Event(time, taxi_id, 'pick up passenger')
        time = yield Event(time, taxi_id, 'drop off passenger')
    yield Event(time, taxi_id, 'going home')


def compute_duration(previous_action):
    if previous_action in ['leave garage', 'drop off passenger']:
        interval = SEARCH_DURATION
    elif previous_action == 'pick up passenger':
        interval = TRIP_DURATION
    elif previous_action == 'going home':
        interval = 1
    else:
        raise ValueError('Unknown previous_action: %s' % previous_action)
    return int(random.expovariate(1/interval)) + 1


class Simulator:
    def __init__(self, taxis):
        self.events = PriorityQueue()
        self.taxis = dict(taxis)
    
    def run(self, end):
        for _, taxi, in sorted(self.taxis.items()):
            first_event = next(taxi)
            self.events.put(first_event)

        sim_time = 0
        while sim_time < end:
            if self.events.empty():
                print('*** end of events ***')
                break
            
            current_event = self.events.get()
            sim_time, taxi_id, previous_action = current_event
            print('taxi:', taxi_id, taxi_id*' ', current_event)
            active_taxi = self.taxis[taxi_id]
            next_time = sim_time + compute_duration(previous_action)
            try:
                next_event = active_taxi.send(next_time)
            except StopIteration:
                del self.taxis[taxi_id]           
            else:
                self.events.put(next_event)    
        else:
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))

taxis = {
    0: taxi_process(taxi_id=0, trips=2, start=0),
    1: taxi_process(taxi_id=1, trips=4, start=5),
    2: taxi_process(taxi_id=2, trips=6, start=10)
}

sim = Simulator(taxis)
sim.run(20)
>>> taxi: 0  Event(taxi_id=0, time=0, action=leave garage)
    taxi: 0  Event(taxi_id=0, time=2, action=pick up passenger)
    taxi: 1   Event(taxi_id=1, time=5, action=leave garage)
    taxi: 1   Event(taxi_id=1, time=8, action=pick up passenger)
    taxi: 2    Event(taxi_id=2, time=10, action=leave garage)
    taxi: 2    Event(taxi_id=2, time=14, action=pick up passenger)
    taxi: 1   Event(taxi_id=1, time=15, action=drop off passenger)
    taxi: 1   Event(taxi_id=1, time=16, action=pick up passenger)
    taxi: 2    Event(taxi_id=2, time=24, action=drop off passenger)
    *** end of simulation time: 3 events pending ***

给我的启发:

  • 存在多个协程,每个协程产出event,event有time属性用来在优先队列中排序,协程不停生产event,生产完成后结束释放,event的执行更具time属性决定。

生成器有三种不同的编码风格:拉取式,推送式,任务式

延伸阅读

David Beazley是Python生成器和协程的终极权威,它贡献了如下资料: