Homogenous Computations: Thoughts on Generator Coroutines#
Hello, everyone and welcome back to Cameron’s Corner! This week, I have a treat. We received a fantastic question in our Discord Server—which you should join if you haven’t yet—about the usage of a generator coroutine
in Python. Specifically, the question sought to disambiguate the call of __next__
and .send(None)
on a generator instance.
Before I get started, I want to remind you about the seminar coming up tomorrow, September 7th, titled, “How Do I Write “Constructively” Correct Code with Metaclasses & Decorators?” Join James Powell as he delves into the powerful concept of leveraging Python’s object construction mechanism to enforce code correctness. Discover how metaclasses, decorators, and other language features can be used to validate and coerce input data, define selective object hierarchies, and implement abstract base classes.
Back to the question at hand. While there are workarounds to this problem, none of them feel very optimal and involve contorting Python to your will. This led James to create a thoughtful response, laying out the approach we take with generators, linking it to forms of encoding (in-band vs out-of-band), and demonstrating how we can use this thinking to guide our application of generator coroutines. This week, I wanted to share some of that Q&A with you all.
The Question:#
I have a specific doubt in coroutines.
How do I stop someone from doing a
coro.send(None)
? I want my coro to behave differently during anext(coro)
as opposed to acoro.send(something)
.For more context, I have a wrapper for a time tracker:
def track2(iterable, *, total_steps=None):
total = len(iterable) if total_steps is None else total_steps
timer = Timer(total_steps)
for item in iterable:
info = yield item
timer(info=info)
if info is not None:
yield # Just to ensure the send operation stops and wait for the actual __next__ call
and, during looping, I want to log something if they send something meaningful, otherwise just go to the next step:
for i in (tracker:=track2([1,2,3,4,5])):
time.sleep(0.1)
info = f'My Info: {i}'
tracker.send(info) # THIS MUST BE OPTIONAL, i.e., if .send is not called go for the next iteration
But, in the above code, if someone sends a None to tracker, that will basically waste the next iteration, and it just fails silently.
Any insight will be appreciated.
Answer#
When we talk about generators and coroutines, we often describe them as the consequence of adding structure to a computation.
Background#
in-band vs out-of-band encoding
Consider the challenge of representing (or “encoding”) three entities that we want to automate over: “Alice,” “Bob,” and “Charlie.” We can represent these three entities in a variety of ways.
We could encode these data as a delimiter-separated str
:
# encode the entities
entities = 'Alice,Bob,Charlie'
# automate (i.e., iterate) over the entities
for ent in entities.split(','):
print(f'{ent = }')
ent = 'Alice'
ent = 'Bob'
ent = 'Charlie'
Or, we could encode these data using a list
:
# encode the entities
entities = ['Alice', 'Bob', 'Charlie']
# automate (i.e., iterate) over the entities
for ent in entities:
print(f'{ent = }')
ent = 'Alice'
ent = 'Bob'
ent = 'Charlie'
The above choices differ formally: one uses
str
and the other list
. We are interested in understanding the deepest
difference between these two choices—what differences are present beyond the
superficial choice of Python data type?
In the scope of the choices we can make, consider that choosing between str
and list
for representing this data is, indeed, quite superficial.
What does this have to do with generators?#
Well, generators are a way for us to add an out-of-band structuring to a computation.
For example, the following function computes three results…
def f(data):
x = data + 1
y = data * 2
z = data ** 3
return x, y, z
a, b, c = f(123)
and the following generator computes the same three results, but it allows us to delineate each ‘step’ of the computation…
def g(data):
yield (x := data + 1)
yield (y := data * 2)
yield (z := data ** 3)
gi = g(123)
a = next(gi)
b = next(gi)
c = next(gi)
Note that, in the above, each “part” or “step” of the computation corresponds to
one line of the source code. But, if we look at this function with the dis
module, we can clearly see an alternate interpretation—each part of the
computation corresponds to one Python bytecode.
from dis import dis
def f(data):
x = data + 1
y = data * 2
z = data ** 3
return x, y, z
dis(f)
4 0 LOAD_FAST 0 (data)
2 LOAD_CONST 1 (1)
4 BINARY_ADD
6 STORE_FAST 1 (x)
5 8 LOAD_FAST 0 (data)
10 LOAD_CONST 2 (2)
12 BINARY_MULTIPLY
14 STORE_FAST 2 (y)
6 16 LOAD_FAST 0 (data)
18 LOAD_CONST 3 (3)
20 BINARY_POWER
22 STORE_FAST 3 (z)
7 24 LOAD_FAST 1 (x)
26 LOAD_FAST 2 (y)
28 LOAD_FAST 3 (z)
30 BUILD_TUPLE 3
32 RETURN_VALUE
In fact, for the purposes of the Python interpreter’s Global Interpreter Lock (“GIL,”), each (atomic) “step” of this computation is, indeed, each bytecode: the GIL is a coarse-grained lock that locks the interpreter on the scope of one bytecode loop so that only one thread can execute at a time.
(Note that one line of Python source code readily requires multiple bytecodes, and, in the Python threading model, threads are preempted at the bytecode level. In other words, in the Python threading model, threads do not guarantee that the execution of a line of Python source code is atomic.)
In practice, executing one line of Python source code may involve the
execution of multiple Python bytecodes that are not visible or available for
programmers to use. I am not aware of any guarantees that the Python core
developers provide to end-users on how Python bytecodes are generated or how
they map to source text. As a result, the authors of code transformation or
deep metaprogramming frameworks may choose to perform transformations at the
abstract syntax tree (“AST”) level (supported by the standard library’s ast
module) rather than risk chasing after version-to-version changes that occur at
the bytecode level.
Lazy Out-of-band Computation#
Hello, everyone! Welcome back to Cameron’s Corner. This week, I want to continue with a question we received in our Discord Server—which you should join if you haven’t yet—about the usage of a generator coroutine in Python. Specifically, the question sought to disambiguate the call of next and .send(None) on a generator instance.
Let’s pick up where we left off…
A generator (or generator coroutine) is how we take a computation and break it down into parts so that we can do something useful with this decomposition.
%%timeit -n1 -r 1
from time import sleep
from random import Random
def compute(x):
''' does something slowly '''
sleep(.1)
return x ** 3
def process(dataset):
rv = []
for x in dataset:
rv.append(compute(x))
return rv
if __name__ == '__main__':
rnd = Random(0)
dataset = [rnd.randint(-100, +100) for _ in range(10)]
# find the first three positive values
results = []
for x in process(dataset):
if x >= 0:
results.append(x)
if len(results) == 3:
break
print(f'{results = }')
results = [830584, 343, 27000]
1 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
And, because the process
computation is one indivisible
“part,” we are forced to “eagerly” compute values for the entire dataset, even
though we only want a small subset of these values. As a result, we waste
significant memory and computational time.
However, if we add some structuring to this computation…
%%timeit -n 1 -r 1
from time import sleep
from random import Random
def compute(x):
''' does something slowly '''
sleep(.1)
return x ** 3
def process(dataset):
for x in dataset:
yield compute(x)
if __name__ == '__main__':
rnd = Random(0)
dataset = [rnd.randint(-100, +100) for _ in range(10)]
# find the first three positive values
results = []
for x in process(dataset):
if x >= 0:
results.append(x)
if len(results) == 3:
break
print(f'{results = }')
results = [830584, 343, 27000]
601 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
then we can use this structuring for a purpose. Instead of “eagerly” computing all values for the entire dataset, we can “lazily” compute only the values we desire. As a result, we use only the exact amount of memory and exact amount of computational time necessary.
In the above example, we identify that each iteration through for x in dataset
constitutes one “step” of the computation, and we indicate this with
the yield
keyword.
We could consider the yield
keyword in the body of a Python generators to be
the equivalent of the ,
in a list
. It unambiguously delineates where each
computation begins and ends.
The advantages of structuring a computation go far beyond reducing resource use, which is why generators and generator coroutines are such a fundamental approach with such extreme power.
If we can accept that a generator is an out-of-band structure for a
computation serving some useful purpose for the end-user (i.e., computation = [step, step, step]
), then we can switch our attention to
describing the structuring provided.
A list
represents a linear structuring with nesting. In other words, we can
proceed through the components of a list
only in a sequential, forwards or backwards ordering.
xs = ['a', 'b', 'c']
for x in xs:
print(f'{x = }')
for x in reversed(xs):
print(f'{x = }')
x = 'a'
x = 'b'
x = 'c'
x = 'c'
x = 'b'
x = 'a'
Note that the use of sorted
does not constitute another kind of ordering because
sorted
returns a new list
, over which we iterate sequentially in a forward
order. Similarly, the Python list
provides us with fast (constant-time)
random access, from which we may construct something that appears to be a
non-sequential ordering. For example…
from random import Random
def randomed(xs, *, random_state=None):
rnd = Random() if random_state is None else random_state
idxs = [*range(len(xs))]
rnd.shuffle(idxs)
return [xs[idx] for idx in idxs]
xs = ['a', 'b', 'c']
for x in randomed(xs, random_state=Random(0)):
print(f'{x = }')
x = 'a'
x = 'c'
x = 'b'
However, it should be clear that there is a level of indirection here, and
an alternate implementation of list
(e.g., a linked list implementation)
would not provide us this capability while providing very similar
functionality.
A numpy.ndarray
is a container that provides non-linear orderings.
from numpy.random import default_rng
rng = default_rng(0)
xs = rng.integers(-10, +10, size=(3, 3))
for x in xs: # iterate over rows
print(f'{x = }')
for x in xs.T: # iterate over columns
print(f'{x = }')
x = array([7, 2, 0])
x = array([ -5, -4, -10])
x = array([ -9, -10, -7])
x = array([ 7, -5, -9])
x = array([ 2, -4, -10])
x = array([ 0, -10, -7])
Note that, since we can specify whether the numpy.ndarray
is stored in
Fortran-style column-major (‘colexicographical’) order or
in C-style row-major (‘lexicographical’) order, the consideration that memory addresses are
fundamentally linear is irrelevant. Neither ordering is
guaranteed to be more “native,” more efficient, or “closer to the machine.”
A generator (or generator coroutine) provides an ordering over the “steps” of a
computation, but, unlike list
, it allows only for forward iteration. It is not
meaningful to iterate in a backward direction over the steps of a computation;
you cannot reversed(…)
a generator.
If we consider only linearly ordered structures—and consider only forward iterations of such structures—we can discover another important distinction in our container types.
Homogeneous vs Heterogeneous Containers#
A list
is a linearly-ordered structure containing (loosely) homogeneous
entities (in the sense that we can treat each value as being of equivalent
type, based on a common interface.) We generally loop over the contents of a
list
and perform the same operation to each entity…
from numbers import Number
xs = [1, 2.3, 4+5j]
assert len({type(x) for x in xs}) > 1 # “strictly” heterogeneous
assert len({isinstance(x, Number) for x in xs}) == 1 # “loosely” heterogeneous
for x in xs:
print(f'{x + 1 = }')
x + 1 = 2
x + 1 = 3.3
x + 1 = (5+5j)
While it’s common to randomly access either the very first or very last
entry in a list
, it is unusual to randomly access other entities. In other
words, we would not pause if we saw xs[0]
or xs[-1]
in some code, but we
would wonder what was going on if we saw xs[3]
or xs[7]
—why is the third
or seventh entity so special?
Because list
is (typically) loosely homogeneous, we consider entities near
the middle to be indistinguishable from each other, such that that we cannot
privilege one middle entity over another middle entity. (Privileging the very
first or very last entity is merely a consequence of observing a
“first”→“middle”→“last” modality.)
On the other hand, a tuple
is a linearly-ordered structure containing
(loosely) heterogeneous components of a single entity. (These are loosely
heterogeneous in the sense that we treat each value as being distinct, despite
their types being potentially identical.) We generally unpack the contents of a
tuple
and perform a different operation on each component:
user = 'Smith, Bob', 'bsmith', '192.168.1.100'
assert len({type(x) for x in user}) == 1 # “strictly” homogeneous
# “loosely” heterogeneous because we cannot perform the same actions
# on an IP address that we can perform on a username or a human name
name, username, ip_address = user
print(f'{name} connects from {ip_address} as {username.upper()}')
Smith, Bob connects from 192.168.1.100 as BSMITH
It is very common for us to see random access on arbitrary components of a
tuple
, and the primary reason we would avoid t[2]
in our code is that it is
inconvenient or inefficient. (Thus, we might encourage turning the tuple
into
a collections.namedtuple
.)
In short, a list
is how we typically represent a homogeneous collection of
multiple entities; a tuple
is how we typically represent a heterogeneous
grouping of fields related on a single entity. (tuple
is when we have one
thing; list
is when we have many things.)
A generator provides us with a forwards-only sequential ordering of the steps of a computation. This means that a generator provides us with a mechanism that can enforce the sequencing of events.
Generators and… Context Managers?#
We commonly use another feature of Python for sequencing
events—the PEP-343 context manager. When we use the with
statement, we want
to ensure that if a before event occurs, an after event is guaranteed to
occur:
class Context:
def __enter__(self):
print('before')
def __exit__(self, exc_type, exc_value, traceback):
print('after')
with Context():
pass
before
after
Note that a context manager provides us with a modality on the latter action because we can decide what “after” action to perform based on the presence and nature of any exceptions that arise during the execution of the indented block of code.
It should make sense, then, that one of the more common ways for us to write a
PEP-343 context manager is by writing a generator and using the
contextlib.contextmanager
decorator to wrap this in a class that provides the
necessary __enter__
and __exit__
methods:
from contextlib import contextmanager
@contextmanager
def context():
print('before')
yield
print('after')
with context():
pass
before
after
Fundamentally, a generator coroutine allows us to enforce the sequencing of what computations are performed when.
Since a generator coroutine can take additional inputs via .send(…)
, it is
common for us to include branching logic within these structures, and we could
argue that a generator coroutine can let us compute with potentially non-linear
forward orderings:
def coro():
inp = yield 'a'
if inp:
yield 'b'
else:
yield 'c'
ci = coro()
print(f'{next(ci) = }')
print(f'{ci.send(False) = }')
next(ci) = 'a'
ci.send(False) = 'c'
If we think about taking a computation and breaking it down into ‘parts,’ then it stands to reason that a generator coroutine is one way for us to encode a state machine. A state machine is a computation where the ‘steps’ of the computation have been explicitly delineated as distinct states (nodes) and whose non-linear topology has been explicitly defined as edges between these states (nodes).
Therefore, the following two are equivalent.
Here’s the first example:
class api:
def first(self):
return 'a'
def second(self, inp):
self.state = inp
return 'b'
def last(self):
if self.state:
return 'c'
else:
return 'd'
obj = api()
print(f'{obj.first() = }')
print(f'{obj.second(True) = }')
print(f'{obj.last() = }')
obj.first() = 'a'
obj.second(True) = 'b'
obj.last() = 'c'
Here’s the second:
def api():
inp = yield 'a'
yield 'b'
if inp:
yield 'c'
else:
yield 'd'
obj = api()
print(f'{next(obj) = }')
print(f'{obj.send(True) = }')
print(f'{next(obj) = }')
next(obj) = 'a'
obj.send(True) = 'b'
next(obj) = 'c'
There are a few differences:
In the first example (the
class
,) we can accidentally run the steps out of order.However, in the second (the generator coroutine), we cannot.
In the first example, the sequencing that demands
second
must be called beforelast
may not be obvious from the source text.But, in the second example, the fact that
b
comes beforec
ord
is clear from the source text.
Consider, however, the function of transforming any arbitrary multi-step API
from a class
-style formulation to a generator coroutine formulation:
class Api:
def first(self):
return 'first'
def second(self):
return 'second'
def third(self):
return 'third'
def last(self):
return 'last'
obj = Api()
print(f'{obj.first() = }')
print(f'{obj.second() = }')
print(f'{obj.third() = }')
print(f'{obj.last() = }')
def api():
yield 'first'
yield 'second'
yield 'third'
yield 'last'
obj = api()
print(f'{next(obj) = }')
print(f'{next(obj) = }')
print(f'{next(obj) = }')
print(f'{next(obj) = }')
obj.first() = 'first'
obj.second() = 'second'
obj.third() = 'third'
obj.last() = 'last'
next(obj) = 'first'
next(obj) = 'second'
next(obj) = 'third'
next(obj) = 'last'
In the previous examples, you cannot enforce the sequence of operations. You cannot
prevent someone from calling .first
after .last
. In this example, you can
enforce the sequence of operations, but you can’t tell which operation you’re
executing on each line of code! It’s just a bunch of next(…)
s!
While the computation’s steps are clearly different, we cannot distinguish them in the latter formulation.
As a result, if a human being is proceeding through the computation manually via next
, we likely won’t be able to perform a transformation like the
above.
Generators → Async!#
Welcome back, everyone! This week on Cameron’s Corner, I’m diving back into homogenous computations and generator coroutines, which I’ve been digging into the past few weeks.
But, before we jump in, I want to let you know about my upcoming seminar, “Working with Polars”! This seminar is designed to provide attendees with a comprehensive understanding of Polars and equip them with the skills to leverage its full potential. We will explore various aspects of Polars, including its core features, data transformation techniques, and best practices for data manipulation and analysis.
Now, for the topic at hand!
Last time, we left off talking about generators and context managers. Now, let’s talk about generators and async and how they can help solve our problem.
It would only be the case that we might perform such a transformation if we are
stepping through the computation automatically via either a loop…
or an event
loop.
Commonly, in asynchronous code, we take a single computation and break it down into parts so that we can do something useful with it. When we break the computation down into parts, we allow an application-level event loop to cooperatively schedule the parts of this computation, which allows us to interleave operations!
If we look at the implementation of async def
asynchronous functions that use
the await
keyword, we will see that they are very similar to def
generator
coroutines using the yield
keyword.
For example, the two following examples are roughly equivalent:
from asyncio import run, sleep as aio_sleep, gather, get_event_loop
from time import sleep
loop = get_event_loop()
async def task(name):
for _ in range(2):
print(f'{name = }')
await aio_sleep(0) # idiom for “return control to event loop”
sleep(.5)
await gather(
task('task#1'),
task('task#2'),
task('task#3'),
)
name = 'task#1'
name = 'task#2'
name = 'task#3'
name = 'task#1'
name = 'task#2'
name = 'task#3'
[None, None, None]
Now, compare the above to this:
from time import sleep
def task(name):
for i in range(2):
print(f'{name = }')
yield
sleep(.5)
def gather(*tasks):
complete = {t: False for t in tasks}
while not all(complete.values()):
for t in tasks:
try:
next(t)
except StopIteration:
complete[t] = True
@lambda main: main()
def main():
gather(
task('task#1'),
task('task#2'),
task('task#3'),
)
name = 'task#1'
name = 'task#2'
name = 'task#3'
name = 'task#1'
name = 'task#2'
name = 'task#3'
Note that in the second example, the progression through the computation is handled
automatically via the event loop; thus, there is no confusion from not knowing
what action each next(…)
will perform.
Given two following transformation examples, how can we distinguish between the two of them?
First, let’s look at the most likely option. Here’s our starting point:
def f(dataset):
rv = []
for x in dataset:
rv.append(x ** 2)
return rv
for x in f(range(100)):
pass
And, here’s what it transitions to:
def g(dataset):
for x in dataset:
yield x ** 2
for x in g(range(100)):
pass
On the other hand, this second option is less likely. We start the transition here:
class T:
def first(self, x):
self.x = x
return 'first', self.x
def second(self, x, y):
self.x += x
self.y = y
return 'second', self.x, self.y
def last(self, x, y, z):
self.x += x
self.y += y
self.z = z
return 'third', self.x, self.y, self.z
obj = T()
print(f'{obj.first(1) = }')
print(f'{obj.second(20, 30) = }')
print(f'{obj.last(400, 500, 600) = }')
obj.first(1) = ('first', 1)
obj.second(20, 30) = ('second', 21, 30)
obj.last(400, 500, 600) = ('third', 421, 530, 600)
Then, we transition to the following:
from functools import wraps
@(pumped :=
lambda coro: wraps(coro)(
lambda *a, **kw: [ci := coro(*a, **kw), next(ci)][0]
)
)
def c():
x = yield
x_, y = yield 'first', x
x += x_
x_, y_, z = yield 'second', x, y
x += x_; y += y_
yield 'third', x, y , z
obj = c()
print(f'{obj.send(1) = }')
print(f'{obj.send((20, 30)) = }')
print(f'{obj.send((400, 500, 600)) = }')
obj.send(1) = ('first', 1)
obj.send((20, 30)) = ('second', 21, 30)
obj.send((400, 500, 600)) = ('third', 421, 530, 600)
So, given the most and least likely transition examples, what’s the difference? What makes one reasonable and the other unreasonable?
If we look closely at these two examples, we will notice that, in the first transformation, each “step” was similar to every other “step.” In the second, “steps” were dissimilar (at the minimum, they take different parameters).
In other words, the first is a computation that has been broken down into “homogeneous parts,” and the second is a computation that has been broken down into “heterogeneous parts.”
We would not perform the second transformation because the heterogeneous
nature of the parts of the computation would cause confusion, given the
homogeneous nature of how we proceed through the computation. In other words,
we can’t—without confusion—break down a computation into multiple parts that are very different,
yet invoke each part in the same way (with .send
or next(…)
). We are inadvertently making very different things look similar!
(Note that, when working with asynchronous code with an event loop, the parts of the computation might look different to us, they are similar in the eyes of the event loop. The event loop doesn’t know or care what each step does—it’s job is merely to schedule when the steps occur. Similarly, while xs = [1, 2.3, 4+5j]
is “strictly” heterogeneous, it’s “loosely” homogeneous from the perspective of the common arithmetic operations we want to perform. At the same time, while the elements of xs = [1, 'two', {3}]
don’t support common arithmetic operations, from a loop that just performed print(x)
, they can be considered all the same—they’re all object
s that support __repr__
or __str__
or __format__
.)
Solution#
I believe this theory underlies the problem you have.
I took another look at the original question on discord and tried to rewrite the code provided to try to understand the poster’s motivation. For reference, you can view the original code here.
Here is my draft:
from random import Random
from time import perf_counter, sleep
class Timer:
def __call__(self, info):
print(f'{perf_counter() = :.2f}', f'{info = }' if info is not None else '')
def track(xs):
t = Timer()
for x in xs:
info = yield
t(info=info)
if info is not None:
yield
rnd = Random(0)
xs = [rnd.randint(-10, +10) for _ in range(5)]
match (mode_of_operation := 'buggy'):
case 'ok':
for _ in zip((ci := track(xs)), xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
ci.send(...)
case 'buggy':
for x in zip((ci := track(xs)), xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
ci.send(None)
case 'also buggy':
for _ in zip((ci := track(xs)), xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
ci.send(...)
ci.send(...)
perf_counter() = 8247.91
perf_counter() = 8247.91
perf_counter() = 8247.92
perf_counter() = 8247.93
perf_counter() = 8247.93
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Input In [29], in <cell line: 19>()
16 rnd = Random(0)
17 xs = [rnd.randint(-10, +10) for _ in range(5)]
19 match (mode_of_operation := 'buggy'):
20 case 'ok':
21 for _ in zip((ci := track(xs)), xs, strict=True):
22 sleep(.01)
23 if rnd.choices([True, False], weights=[.9, .1])[0]:
24 ci.send(...)
25 case 'buggy':
---> 26 for x in zip((ci := track(xs)), xs, strict=True):
27 sleep(.01)
28 if rnd.choices([True, False], weights=[.9, .1])[0]:
29 ci.send(None)
30 case 'also buggy':
31 for _ in zip((ci := track(xs)), xs, strict=True):
32 sleep(.01)
33 if rnd.choices([True, False], weights=[.9, .1])[0]:
34 ci.send(...)
35 ci.send(...)
ValueError: zip() argument 2 is longer than argument 1
In the above code, in the mode_of_operation := 'ok'
mode of operation, we
optionally .send
a value into the coroutine, which gets emitted alongside the timing. The .send
value is assumed to be non-None
. This code works because the for … in …
loop mechanism is guaranteed to perform a single next(…)
and, if there is a .send
in the loop body, then that guarantees the presence of an additional yield
. Therefore, the for … in …
loop mechanism always performs steps through the first part of the computation (up to the first yield
), and a .send
in the loop body always steps through the second part of the computation (up to the second yield
).
However, in the mode_of_operation := 'buggy'
or mode_of_operation := 'also buggy'
modes, someone might .send
a None
value. In this case, we skip the second yield
, and the for … in …
loop mechanism inadvertently triggers another “step” through the coroutine. In the code above, this results in “missing” values in loop body. Similarly, if we .send
twice, we also inadvertently send the coroutine “off-kilter” and see too few values in the loop body.
With this understanding, I believe that the fundamental problem is that
the original code has two yield
s that delineate two very different parts of the
computation. In one case, you accept input; in the other, you do not. In one
case, you expect that the computation will be triggered by next(…)
that
occurs in the execution of for … in …
; in the other, you expect that the
computation will be triggered by an explicit .send
. Only in the .send
case
is this optional.
The original poster is trying to both distinguish these parts of the computation from the
human’s perspective (who triggers one through for … in …
and the other
through .send
), while also making them indistinguishable for the interpreter.
I believe these requirements are at odds, leading to this intractable dilemma.
As the previous theoretical discussion outlines, I
think it is confusing to model a computation with heterogeneous
“parts” as a coroutine that is manually operated via next
or .send
.
Therefore, I would suggest the following options.
You could keep next
and .send
homogeneous and pass the additional data in via another (“out-of-band”?) mechanism such as .throw
:
from random import Random
from time import perf_counter, sleep
class Timer:
def __call__(self, *info):
print(f'{perf_counter() = :.2f}', f'{info = }' if info else '')
class Info(Exception):
pass
def track(xs):
def supplement(*info):
while True:
try:
yield
break
except Info as e:
info = *info, *filter(lambda x: x is not None, e.args)
t(*info)
t = Timer()
for x in xs:
try:
yield
t()
except Info as e:
yield from supplement(*filter(lambda x: x is not None, e.args))
rnd = Random(0)
xs = [rnd.randint(-10, +10) for _ in range(5)]
match (mode_of_operation := 'ok'):
case 'ok':
for _ in zip((ci := track(xs)), xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
ci.throw(Info(...))
case 'also ok':
for _ in zip((ci := track(xs)), xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
ci.throw(Info(None))
case 'still ok':
for _ in zip((ci := track(xs)), xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
ci.throw(Info(...))
ci.throw(Info(...))
perf_counter() = 8248.46 info = (Ellipsis,)
perf_counter() = 8248.47
perf_counter() = 8248.48 info = (Ellipsis,)
perf_counter() = 8248.49
perf_counter() = 8248.50 info = (Ellipsis,)
Unfortunately, this is just… kind of weird and users very rarely use
.throw
, especially for routine business logic or control-flow.
Alternatively, we could decompose the single heterogeneous coroutine into two distinct entities:
from random import Random
from time import perf_counter, sleep
from dataclasses import dataclass, field
from collections.abc import Iterable
class Timer:
def __call__(self, *info):
print(f'{perf_counter() = :.2f}', f'{info = }' if info else '')
@dataclass
class track:
items : Iterable
timer : Timer = field(default_factory=Timer)
info : object = ()
def __iter__(self):
for x in xs:
yield
self.timer(*self.info)
self.info = ()
def __call__(self, info):
if info is None: return
self.info = (*self.info, info) if self.info else (info,)
rnd = Random(0)
xs = [rnd.randint(-10, +10) for _ in range(5)]
match (mode_of_operation := 'ok'):
case 'ok':
for _ in zip((obj := track(xs)), xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
obj(...)
case 'also ok':
for _ in zip((obj := track(xs)), xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
obj(None)
case 'still ok':
for _ in zip((obj := track(xs)), xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
obj(...)
obj(...)
perf_counter() = 8248.52 info = (Ellipsis,)
perf_counter() = 8248.53
perf_counter() = 8248.54 info = (Ellipsis,)
perf_counter() = 8248.55
perf_counter() = 8248.56 info = (Ellipsis,)
An equivalent formulation for the above use of a dataclass
would be using a
closure to link the two returned entities:
from random import Random
from time import perf_counter, sleep
from collections import namedtuple
from functools import wraps
class Timer:
def __call__(self, *info):
print(f'{perf_counter() = :.2f}', f'{info = }' if info else '')
@lambda f: wraps(f)(
lambda *a, **kw:
namedtuple(f.__name__, map(lambda x: x.__name__, rv := f(*a, **kw)))(*rv)
)
def track(xs):
t = Timer()
info = ()
def loop():
nonlocal info
for x in xs:
info = ()
yield
t(*info)
def supplement(info_):
nonlocal info
if info_ is None: return
info = (*info, info_) if info else (info_,)
return loop(), supplement
rnd = Random(0)
xs = [rnd.randint(-10, +10) for _ in range(5)]
match (mode_of_operation := 'ok'):
case 'ok':
for _ in zip((obj := track(xs)).loop, xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
obj.supplement(...)
case 'also ok':
for _ in zip((obj := track(xs)).loop, xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
obj.supplement(None)
case 'still ok':
for _ in zip((obj := track(xs)).loop, xs, strict=True):
sleep(.01)
if rnd.choices([True, False], weights=[.9, .1])[0]:
obj.supplement(...)
obj.supplement(...)
perf_counter() = 8248.58 info = (Ellipsis,)
perf_counter() = 8248.59
perf_counter() = 8248.60 info = (Ellipsis,)
perf_counter() = 8248.61
perf_counter() = 8248.62 info = (Ellipsis,)
The implementation using a dataclasses.dataclass
relies on the
implicit call to iter(…)
in the evaluation of for … in …
:
xs = [...]
for x in xs: pass
# … equivalent to…
xi = iter(xs)
while True:
try:
x = next(xi)
except StopIteration:
break