当前位置: 移动技术网 > IT编程>脚本编程>Python > Python开发【模块】:asyncio

Python开发【模块】:asyncio

2018年11月05日  | 移动技术网IT编程  | 我要评论

日本旅游报价,福建日报电子版,vero moda官方旗舰店

异步asyncio

asyncio是一个使用async / await语法编写并发代码的库

asyncio用作多个python异步框架的基础,这些框架提供高性能的网络和web服务器,数据库连接库,分布式任务队列等。

asyncio通常非常适合io绑定和高级 结构化网络代码。

 

asyncio提供了一组高级 api:

此外,还有一些用于库和框架开发人员的低级 api 

  • 创建和管理,提供异步api ,运行,处理等;os signals
  • 使用实现有效的协议 ;
  • 使用async / await语法基于回调的库和代码。

 

conroutines

使用async / await语法声明的协同程序是编写asyncio应用程序的首选方法。例如,以下代码片段(需要python 3.7+)打印“hello”,等待1秒,然后打印“world”:

import asyncio

async def main():
     print('hello')
     await asyncio.sleep(1)
     print('world')

asyncio.run(main())

# hello
# world

上面代码等同于下面(不推荐使用基于生成器的协同程序的支持,并计划在python 3.10中删除。

import asyncio

@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')

asyncio.run(main())

asyncio实际等同于下面的工作(参数为an asyncio.future, a coroutine or an awaitable is required)

import asyncio

@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')

# asyncio.run(main())
loop = asyncio.events.new_event_loop()
asyncio.events.set_event_loop(loop)
loop.run_until_complete(main())

# hello
# world
 1     this function runs the passed coroutine, taking care of
 2     managing the asyncio event loop and finalizing asynchronous
 3     generators.
 4 
 5     this function cannot be called when another asyncio event loop is
 6     running in the same thread.
 7 
 8     if debug is true, the event loop will be run in debug mode.
 9 
10     this function always creates a new event loop and closes it at the end.
11     it should be used as a main entry point for asyncio programs, and should
12     ideally only be called once.
asyncio.run功能介绍

实际运行协程asyncio提供了三种主要机制:

1、the asyncio.run()函数来运行顶层入口点“main()”函数(见上面的例子)

2、awaiting on a coroutine 以下代码片段将在等待1秒后打印“hello”,然后在等待另外 2秒后打印“world” 

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%x')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%x')}")

asyncio.run(main())

# started at 11:54:48
# hello
# world
# finished at 11:54:51

3、与asyncio同时运行协同程序功能tasks让我们修改上面的例子并同时运行两个say_after协同程序 :

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%x')}")

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%x')}")
    # wait until both tasks are completed (should take
    # around 2 seconds.)

    await task1
    await task2
    print(f"finished at {time.strftime('%x')}")

asyncio.run(main())

# started at 14:27:22
# hello at 14:27:23
# world at 14:27:24
# finished at 14:27:24

 稍微改变一下形式,可以理解的更多

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%x')}")

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%x')}")
    # wait until both tasks are completed (should take
    # around 2 seconds.)
    await asyncio.sleep(3)

    # await task1
    # await task2
    print(f"finished at {time.strftime('%x')}")

asyncio.run(main())

# started at 14:29:41
# hello at 14:29:42
# world at 14:29:43
# finished at 14:29:44

  

 awaitables

我们说如果一个对象可以在表达式中使用,那么它就是一个等待对象许多asyncio api旨在接受等待。

有三种主要类型的等待对象:coroutinestasks, and futures.

coroutines

python coroutines are awaitables and therefore can be awaited from other coroutines:

import asyncio

async def nested():
    return 42

async def main():
    # nothing happens if we just call "nested()".
    # a coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

# 42

重要

在本文档中,术语“coroutine”可用于两个密切相关的概念:

  • 一个协程功能:一个功能;
  • 一个协程对象:通过调用协同程序函数返回的对象 

tasks

任务用于调度协同程序并发。

当一个协程被包装到一个task中时,会像)一样  conroutine自动安排很快运行:

import asyncio

async def nested():
    return 42

async def main():
    # schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

futures

future是一个特殊的低级别等待对象,它表示异步操作最终结果

等待 future对象时,它意味着协程将等到future在其他地方解析。

需要asyncio中的未来对象以允许基于回调的代码与async / await一起使用。

通常,不需要在应用程序级代码中创建future对象。

可以等待有时通过库和一些asyncio api公开的未来对象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

返回future对象的低级函数的一个很好的例子是

 

asyncio方法

1、运行asyncio程序 

 asyncio.runcoro*debug = false 

  此函数运行传递的协同程序,负责管理asyncio事件循环并最终确定异步生成器

  当另一个asyncio事件循环在同一个线程中运行时,无法调用此函数。

  如果debugtrue,则事件循环将以调试模式运行。

  此函数始终创建一个新的事件循环并在结束时将其关闭。它应该用作asyncio程序的主要入口点,理想情况下应该只调用一次。

  版本3.7中的新功能重要:此功能已添加到python 3.7中的asyncio中

 

2、创建任务 

asyncio.create_task(coro)

  将coro 包装成a task 并安排执行。返回task对象。

  任务在返回的循环中执行,如果当前线程中没有运行循环, runtimeerror则引发该任务

  python 3.7中添加了此功能在python 3.7之前,可以使用低级函数:

async def coro():
    ...

# in python 3.7+
task = asyncio.create_task(coro())
...

# this works in all python versions but is less readable
task = asyncio.ensure_future(coro())

 

3、sleeping 

coroutine asyncio.sleep(delayresult=none*loop=none)

  阻止 delay seconds.

  如果提供了result ,则在协程完成时将其返回给调用者。

  leep() 始终挂起当前任务,允许其他任务运行。

  该loop 参数已被弃用,并定于去除在python 3.10。

  协程示例每秒显示当前日期5秒:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while true:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

  

4、同时运行任务

awaitable asyncio.gather(*awsloop=nonereturn_exceptions=false)

  同时aws 序列中运行 

  如果在aws中任何awaitable 是协程,它将自动安排为任务

  如果所有等待成功完成,则结果是返回值的汇总列表。结果值的顺序对应于aws中的等待顺序

  如果return_exceptionsfalse(默认),则第一个引发的异常会立即传播到等待的任务gather()

  如果return_exceptionstrue,异常的处理方式一样成功的结果,并在结果列表汇总。

  如果gather()取消,所有提交的awaitables(尚未完成)也被取消

  如果aws序列中的task or future取消,则将其视为已引发cancellederror在这种情况下不会取消gather() 呼叫这是为了防止取消一个提交的tasks/futures 以导致其他任务/期货被取消。

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"task {name}: compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"task {name}: factorial({number}) = {f}")

async def main():
    # schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("a", 2),
        factorial("b", 3),
        factorial("c", 4),
    )

asyncio.run(main())

# expected output:
#
#     task a: compute factorial(2)...
#     task b: compute factorial(2)...
#     task c: compute factorial(2)...
#     task a: factorial(2) = 2
#     task b: compute factorial(3)...
#     task c: compute factorial(3)...
#     task b: factorial(3) = 6
#     task c: compute factorial(4)...
#     task c: factorial(4) = 24

 获取返回结果,异常情况

import asyncio

async def factorial(name, number):

    print(name)
    if name == 'a':
        return name
    elif name == 'b':
        raise syntaxerror(name)
    await asyncio.sleep(number)


async def main():
    # schedule three calls *concurrently*:
    result = await asyncio.gather(
        factorial("a", 2),
        factorial("b", 3),
        factorial("c", 4),
        return_exceptions=true
    )
    print(result)

asyncio.run(main())

# a
# b
# c
# ['a', syntaxerror('b'), none]

版本3.7中已更改:如果取消聚集本身,则无论return_exceptions如何,都会传播取消

 

5、shielding from cancellation

awaitable asyncio.shield(aw*loop=none)

  protect an  from being cancelled.

  if aw is a coroutine it is automatically scheduled as a task.

  the statement:

res = await shield(something())

  等同于

res = await something()

  除非取消包含它的协程,否则something()不会取消运行的任务从观点来看something(),取消没有发生。虽然它的来电者仍然被取消,所以“等待”表达仍然提出了一个cancellederror

  如果something()通过其他方式取消(即从内部取消)也会取消shield()

  如果希望完全忽略取消(不推荐),则该shield()函数应与try / except子句结合使用,如下所示:

try:
    res = await shield(something())
except cancellederror:
    res = none

 

6、超时

coroutine asyncio.wait_for(aw, timeout, *, loop=none)

  wait for the aw  to complete with a timeout.

  if aw is a coroutine it is automatically scheduled as a task.

  timeout可以是none或等待的float或int秒数。如果超时none,将等到完成

  如果发生超时,它将取消任务并加注 asyncio.timeouterror

  要避免该任务cancellation,请将其包装

  该函数将一直等到将来实际取消,因此总等待时间可能会超过超时

  如果等待被取消,则未来的aw也会被取消。

  该循环参数已被弃用,并定于去除在python 3.10。

async def eternity():
    # sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.timeouterror:
        print('timeout!')

asyncio.run(main())

# expected output:
#
#     timeout!

改变在3.7版本:aw被取消,由于超时,wait_for等待aw被取消。以前,它asyncio.timeouterror立即提出 

 

7、超时原语

coroutine asyncio.wait(aws*loop=nonetimeout=nonereturn_when=all_completed)

  同时运行aws中的 并阻塞,直到return_when指定的条件为止

  如果在aws中任何等待的是协程,它将自动安排为任务。wait()直接传递协同程序对象 已被弃用,因为它会导致 

  返回两组任务/期货:(done, pending)

  用法:

done, pending = await asyncio.wait(aws)

  该循环参数已被弃用,并定于去除在python 3.10。

  timeout(浮点数或整数),如果指定,可用于控制返回前等待的最大秒数。

  请注意,此功能不会引发asyncio.timeouterror超时发生时未完成的期货或任务仅在第二组中返回。

  return_when表示此函数何时返回。它必须是以下常量之一:

不变描述
first_completed 当任何未来完成或取消时,该函数将返回。
first_exception 当任何未来通过引发异常完成时,函数将返回。如果没有未来引发异常则等同于 all_completed
all_completed 所有期货结束或取消时,该功能将返回。

  不像wait()当发生超时不会取消期货。   

  注意 wait()将协同程序自动调度为任务,然后在 集合中返回隐式创建的任务对象。因此,以下代码将无法按预期方式工作:(done, pending)

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # this branch will never be run!

  以下是如何修复上述代码段:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # everything will work as expected now.

  wait()不推荐直接传递协程对象。

 

8、 scheduling from other threads

asyncio.run_coroutine_threadsafe(coroloop)

  将协程提交给给定的事件循环。线程安全的。

  返回a concurrent.futures.future以等待另一个os线程的结果。

  此函数旨在从与运行事件循环的os线程不同的os线程调用。例:

# create a coroutine
coro = asyncio.sleep(1, result=3)

# submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# wait for the result with an optional timeout argument
assert future.result(timeout) == 3

  如果在协程中引发异常,则会通知返回的future。它还可以用于取消事件循环中的任务:

try:
    result = future.result(timeout)
except asyncio.timeouterror:
    print('the coroutine took too long, cancelling the task...')
    future.cancel()
except exception as exc:
    print(f'the coroutine raised an exception: {exc!r}')
else:
    print(f'the coroutine returned: {result!r}')

  请参阅 文档部分。

  与其他asyncio函数不同,此函数需要 显式传递循环参数。

  3.5.1版中的新功能。

 

9、自省

asyncio.current_taskloop = none 

  返回当前正在运行的task实例,或者none没有正在运行的任务。

  如果loopnone 用来获取loop。

  版本3.7中的新功能。

asyncio.all_tasksloop = none 

  返回task循环运行的一组尚未完成的对象。

  如果loopnone则用于获取当前循环。

  版本3.7中的新功能。

 

任务对象

class asyncio.task(coro,*,loop = none 

future-like object that runs a python . not thread-safe.

任务用于在事件循环中运行协同程序。如果一个协程在future上等待,则task暂停执行协程并等待future的完成。当future 完成后,包装协程的执行将恢复。

事件循环使用协作调度:事件循环一次运行一个任务。当一个task等待完成future时,事件循环运行其他任务,回调或执行io操作。

使用高级功能创建任务,或低级别或 功能。不鼓励手动实例化任务。

要取消正在运行的任务,请使用该cancel()方法。调用它将导致task将cancellederror异常抛出到包装的协同程序中。如果在取消期间协程正在等待future对象,则future对象将被取消。

cancelled()可用于检查任务是否被取消。true如果包装的协程没有抑制cancellederror异常并且实际上被取消,则该方法返回

asyncio.task继承自future其所有api,除了future.set_result()和 future.set_exception()

任务支持该contextvars模块。创建任务时,它会复制当前上下文,然后在复制的上下文中运行其协程。

版本3.7中已更改:添加了对contextvars模块的支持

cancel

  请求取消任务。

  这会安排cancellederror在事件循环的下一个循环中将异常抛入包装的协程。

  协程则有机会通过抑制异常与清理,甚至拒绝请求... ... ... ... ... ... 块。因此,不同于不保证任务将被取消,尽管完全抑制取消并不常见,并且积极地不鼓励。exceptcancellederrorfuture.cancel()task.cancel()

  以下示例说明了协同程序如何拦截取消请求:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.cancellederror:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # create a "cancel_me" task
    task = asyncio.create_task(cancel_me())

    # wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.cancellederror:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled
  true如果任务被取消,则返回。
  请求取消时取消任务, cancel()并且包装的协同程序将cancellederror异常传播 到其中。
done
  true如果任务完成则返回。
  一个任务完成时,包裹协程要么返回的值,引发异常,或者任务被取消。
result
  返回任务的结果。
  如果任务完成,则返回包装协程的结果(或者如果协程引发异常,则重新引发该异常。)
  如果已取消任务,则此方法会引发cancellederror异常。
  如果task的结果尚不可用,则此方法会引发invalidstateerror异常。
exception
  返回task的例外。
  如果包装的协同程序引发异常,则返回异常。如果包装的协程正常返回,则此方法返回none
  如果已取消任务,则此方法会引发 cancellederror异常。
  如果尚未完成任务,则此方法会引发 invalidstateerror异常。
add_done_callback回调*上下文=无
  添加要在任务完成时运行的回调。
  此方法仅应在基于低级回调的代码中使用。 
  有关future.add_done_callback() 详细信息,请参阅文档。
remove_done_callback回调
  从回调列表中删除回调
  此方法仅应在基于低级回调的代码中使用。
  有关future.remove_done_callback() 详细信息,请参阅文档。
get_stack*limit = none 
  返回此任务的堆栈帧列表。
  如果未完成包装的协同程序,则会返回挂起它的堆栈。如果协程已成功完成或被取消,则返回一个空列表。如果协程被异常终止,则返回回溯帧列表。
  帧始终从最旧到最新排序。
  对于挂起的协程,只返回一个堆栈帧。
  可选的limit参数设置要返回的最大帧数; 默认情况下,返回所有可用的帧。返回列表的排序取决于是返回堆栈还是返回:返回堆栈的最新帧,但返回最旧的回溯帧。(这与回溯模块的行为相匹配。)
print_stack*limit = nonefile = none 
  打印此任务的堆栈或回溯。
  这会为检索到的帧生成类似于回溯模块的输出get_stack()
  该极限参数传递给get_stack()直接。
  的文件参数是其中输出被写入的i / o流; 默认输出写入。
classmethod all_tasks(loop = none 
  返回一组事件循环的所有任务。
  默认情况下,返回当前事件循环的所有任务。如果是loopnone,则该函数用于获取当前循环。
  不推荐使用此方法,将在python 3.9中删除。请改用此功能。
classmethod current_task(loop = none 
  返回当前正在运行的任务或none
  如果是loopnone,则该函数用于获取当前循环。
  不推荐使用此方法,将在python 3.9中删除。请改用此功能。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网