阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

asyncio模块

103次阅读
没有评论

共计 7037 个字符,预计需要花费 18 分钟才能阅读完成。

一、概述

  • asyncio 模块

    是 python3.4 版本引入的标准库,直接内置了对异步 IO 的操作

  • 编程模式

    是一个消息循环,我们从 asyncio 模块中直接获取一个 EventLoop 的引用,然后把需要执行的协程扔到 EventLoop 中执行,就实现了异步 IO

  • 说明

    到目前为止实现协程的不仅仅只有 asyncio,tornado 和 gevent 都实现了类似功能

  • 关键字的说明

    关键字 说明
    event_loop 消息循环,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数
    coroutine 协程对象,指一个使用 async 关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用
    task 任务,一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态
    future 代表将来执行或没有执行的任务的结果,它和 task 上没有本质上的区别
    async/await python3.5 用于定义协程的关键字,async 定义一个协程,await 用于挂起阻塞的异步调用接口

二、asyncio 基本使用

  • 定义一个协程

    import asyncio import time # 通过 async 关键字定义了一个协程,协程是不能直接运行的,需要将协程放到消息循环中 async def run(x): print("waiting:%d"%x) time.sleep(x) print("结束 run") # 得到一个协程对象 coroutine = run(2) # 创建一个消息循环 # 注意:真实是在 asyncio 模块中获取一个引用 loop = asyncio.get_event_loop() # 将协程对象加入到消息循环 loop.run_until_complete(coroutine)
  • 创建一个任务

    import asyncio import time async def run(x): print("waiting:%d"%x) time.sleep(x) print("结束 run") coroutine = run(2) # 创建任务 task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() # 协程对象加入到消息循环中,协程对象不能直接运行的,在注册消息循环时 run_until_complete 方法将加入的协程对象包装成一个任务,task 对象时 Future 类的子类对象,保存协程运行后的状态,用于未来获取协程的结果 # loop.run_until_complete(coroutine) # 将任务加入到消息循环 loop.run_until_complete(task)
  • 绑定回调

    回调:不需要手动调用,触发某种条件才会调用

    import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) # 向百度要数据,网络 IO asyncio.sleep(5) data = "'%s' 的数据 "%(url) print("给你数据") return data # 定义一个回调函数(不需要手动调用,触发某种条件才会调用) def call_back(future): print("call_back:", future.result()) coroutine = run("百度") # 创建一个任务对象 task = asyncio.ensure_future(coroutine) # 给任务添加回调,在任务结束后调用回调函数 task.add_done_callback(call_back) loop = asyncio.get_event_loop() loop.run_until_complete(task) print("-------main------") while 1: time.sleep(2)

    注意:asyncio.sleep(5) 会报 RuntimeWarning

asyncio 模块

  • 阻塞和 await

    async 可以定义协程,使用 await 可以针对耗时操作进行挂起,就与生成器的 yield 一样,函数交出控制权。协程遇到 await,消息循环会挂起该协程,执行别的协程,直到其他协程也会挂起或者执行完毕,在进行下一次执行

    import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) # 向百度要数据,网络 IO await asyncio.sleep(5) data = "'%s' 的数据 "%(url) print("给你数据") return data # 定义一个回调函数(不需要手动调用,触发某种条件才会调用) def call_back(future): print("call_back:", future.result()) coroutine = run("百度") task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) loop = asyncio.get_event_loop() loop.run_until_complete(task) print("-------main------")

三、多任务

  • 同步

    同时请求 ” 百度 ”,“阿里”,“腾讯”, “ 新浪 ” 四个网站,假设响应时长均为 2 秒

    import time def run(url): print("开始向'%s'要数据……"%(url)) # 向百度要数据,网络 IO time.sleep(2) data = "'%s' 的数据 "%(url) return data if __name__ == "__main__": t1 = time.time() for url in ["百度", "阿里", "腾讯", "新浪"]: print(run(url)) t2 = time.time() print("总耗时:%.2f"%(t2-t1))
  • 异步

    同时请求 ” 百度 ”,“阿里”,“腾讯”, “ 新浪 ” 四个网站,假设响应时长均为 2 秒

    import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) await asyncio.sleep(2) data = "'%s' 的数据 "%(url) return data def call_back(future): print("call_back:", future.result()) if __name__ == "__main__": loop = asyncio.get_event_loop() tasks = [] t1 = time.time() for url in ["百度", "阿里", "腾讯", "新浪"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) tasks.append(task) # 同时添加 4 个异步任务 loop.run_until_complete(asyncio.gather(*tasks)) t2 = time.time() print("总耗时:%.2f" % (t2 - t1))

四、协程嵌套

使用 async 可以定义协程,协程用于耗时的 io 操作,我们也可以封装更多的 io 操作过程,这样就实现了嵌套的协程,即一个协程中 await 了另外一个协程,如此连接起来

asyncio 模块

import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) await asyncio.sleep(2) data = "'%s' 的数据 "%(url) return data def call_back(future): print("call_back:", future.result()) async def main(): tasks = [] for url in ["百度", "阿里", "腾讯", "新浪"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) # task.add_done_callback(call_back) tasks.append(task) # #1、可以没有回调函数 # dones, pendings = await asyncio.wait(tasks) # #处理数据,类似回调,建议使用回调 # for t in dones: # print("数据:%s"%(t.result())) # #2、可以没有回调函数 # results = await asyncio.gather(*tasks) # # 处理数据,类似回调,建议使用回调 # for result in results: # print("数据:%s"%(result)) # 3、 # return await asyncio.wait(tasks) # 4、 # return await asyncio.gather(*tasks) # 5、 # for t in asyncio.as_completed(tasks): # await t # 6、 for t in asyncio.as_completed(tasks): # 可以没有回调 result = await t print("数据:%s"%(result)) if __name__ == "__main__": loop = asyncio.get_event_loop() t1 = time.time() #1、 # loop.run_until_complete(main()) #2、 # loop.run_until_complete(main()) # # 3、 # dones, pendings = loop.run_until_complete(main()) # #处理数据,类似回调,建议使用回调 # for t in dones: # print("数据:%s"%(t.result())) # 4、 # results = loop.run_until_complete(main()) # for result in results: # print("数据:%s"%(result)) # 5、 # loop.run_until_complete(main()) # 6、 loop.run_until_complete(main()) t2 = time.time() print("总耗时:%.2f" % (t2 - t1))

整理协程嵌套

import time import asyncio async def run(url): print("开始向'%s'要数据……"%(url)) await asyncio.sleep(2) data = "'%s' 的数据 "%(url) return data def call_back(future): print("call_back:", future.result()) async def main(): tasks = [] for url in ["百度", "阿里", "腾讯", "新浪"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) tasks.append(task) await asyncio.wait(tasks) if __name__ == "__main__": loop = asyncio.get_event_loop() t1 = time.time() loop.run_until_complete(main()) t2 = time.time() print("总耗时:%.2f" % (t2 - t1))

五、消息循环在另一个线程中启动

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被 block

import asyncio import threading import time def run(url): print("开始向'%s'要数据……"%(url)) time.sleep(2) data = "'%s' 的数据 "%(url) print("结束请求") return data def start_loop(loop): # 启动消息循环 asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": # 创建消息循环 (类似死循环) # 注意:此时消息循环没有启动 loop = asyncio.get_event_loop() threading.Thread(target=start_loop, args=(loop,)).start() t1 = time.time() # 给消息循环添加任务 # loop.run_until_complete(create_tasks()) loop.call_soon_threadsafe(run, "百度") loop.call_soon_threadsafe(run, "腾讯") t2 = time.time() print("总耗时:%.2f" % (t2 - t1))

六、asyncio 终极使用

使用到协程嵌套与消息循环在另一个线程中启动相关联

import asyncio import threading async def run(url): print("开始向'%s'要数据……"%(url)) await asyncio.sleep(2) data = "'%s' 的数据 "%(url) print("结束请求") return data def call_back(future): print("call_back:", future.result()) async def create_tasks(): tasks = [] for url in ["百度", "阿里", "腾讯", "新浪"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) task.add_done_callback(call_back) tasks.append(task) await asyncio.wait(tasks) def start_loop(loop): # 启动消息循环 asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": # 创建消息循环 (类似死循环) # 注意:此时消息循环没有启动 loop = asyncio.get_event_loop() threading.Thread(target=start_loop, args=(loop,)).start() # 给消息循环添加任务 asyncio.run_coroutine_threadsafe(create_tasks(), loop) # asyncio.run_coroutine_threadsafe(run("百度"), loop) # asyncio.run_coroutine_threadsafe(run("腾讯"), loop) # asyncio.run_coroutine_threadsafe(run("阿里"), loop) # asyncio.run_coroutine_threadsafe(run("新浪"), loop)

七、获取网页信息

import asyncio import threading async def run(url): print("开始加载'%s'页面……" % (url)) # 发起链接,耗时 IO connet = asyncio.open_connection(url, 80) reader, writer = await connet # 链接成功 # 发起请求,耗时 IO header = "GET / HTTP/1.0\r\nHost: %s\r\n\r\n"%(url) writer.write(header.encode("utf-8")) await writer.drain() # 接收数据 with open(url + ".html", "wb") as fp: while True: line = await reader.readline() if line == b"\r\n": break else: fp.write(line) fp.flush() async def create_tasks(): tasks = [] for url in ["www.baidu.com", "www.zutuanxue.com", "www.sina.com.cn"]: coroutine = run(url) task = asyncio.ensure_future(coroutine) tasks.append(task) await asyncio.wait(tasks) def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": loop = asyncio.get_event_loop() threading.Thread(target=start_loop, args=(loop,)).start() asyncio.run_coroutine_threadsafe(create_tasks(), loop)

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-05-26发表,共计7037字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中