
假如有一个文件,里面有 10 万个 url,需要对每个 url 发送 http 请求,并打印请求结果的状态码,如何编写代码尽可能快的完成这些任务呢?
Python 并发编程有很多方法,多线程的标准库 threading,concurrency,协程 asyncio,当然还有 grequests 这种异步库,每一个都可以实现上述需求,下面一一用代码实现一下,本文的代码可以直接运行,给你以后的并发编程作为参考:
队列+多线程
定义一个大小为 400 的队列,然后开启 200 个线程,每个线程都是不断的从队列中获取 url 并访问。
主线程读取文件中的 url 放入队列中,然后等待队列中所有的元素都被接收和处理完毕。代码如下:
- fromthreadingimportThread
- importsys
- fromqueueimportQueue
- importrequests
- concurrent=200
- defdoWork():
- whileTrue:
- url=q.get()
- status,url=getStatus(url)
- doSomethingWithResult(status,url)
- q.task_done()
- defgetStatus(ourl):
- try:
- res=requests.get(ourl)
- returnres.status_code,ourl
- except:
- return"error",ourl
- defdoSomethingWithResult(status,url):
- print(status,url)
- q=Queue(concurrent*2)
- foriinrange(concurrent):
- t=Thread(target=doWork)
- t.daemon=True
- t.start()
- try:
- forurlinopen("urllist.txt"):
- q.put(url.strip())
- q.join()
- exceptKeyboardInterrupt:
- sys.exit(1)
运行结果如下:

有没有 get 到新技能?
线程池
如果你使用线程池,推荐使用更高级的 concurrent.futures 库:
- importconcurrent.futures
- importrequests
- out=[]
- CONNECTIONS=100
- TIMEOUT=5
- urls=[]
- withopen("urllist.txt")asreader:
- forurlinreader:
- urls.append(url.strip())
- defload_url(url,timeout):
- ans=requests.get(url,timeout=timeout)
- returnans.status_code
- withconcurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS)asexecutor:
- future_to_url=(executor.submit(load_url,url,TIMEOUT)forurlinurls)
- forfutureinconcurrent.futures.as_completed(future_to_url):
- try:
- data=future.result()
- exceptExceptionasexc:
- data=str(type(exc))
- finally:
- out.append(data)
- print(data)
协程 + aiohttp
协程也是并发非常常用的工具了:
- importasyncio
- fromaiohttpimportClientSession,ClientConnectorError
- asyncdeffetch_html(url:str,session:ClientSession,**kwargs)->tuple:
- try:
- resp=awaitsession.request(method="GET",url=url,**kwargs)
- exceptClientConnectorError:
- return(url,404)
- return(url,resp.status)
- asyncdefmake_requests(urls:set,**kwargs)->None:
- asyncwithClientSession()assession:
- tasks=[]
- forurlinurls:
- tasks.append(
- fetch_html(url=url,session=session,**kwargs)
- )
- results=awaitasyncio.gather(*tasks)
- forresultinresults:
- print(f'{result[1]}-{str(result[0])}')
- if__name__=="__main__":
- importsys
- assertsys.version_info>=(3,7),"ScriptrequiresPython3.7+."
- withopen("urllist.txt")asinfile:
- urls=set(map(str.strip,infile))
- asyncio.run(make_requests(urls=urls))
grequests[1]
这是个第三方库,目前有 3.8K 个星,就是 Requests + Gevent[2],让异步 http 请求变得更加简单。Gevent 的本质还是协程。
使用前:
- pipinstallgrequests
使用起来那是相当的简单:
- importgrequests
- urls=[]
- withopen("urllist.txt")asreader:
- forurlinreader:
- urls.append(url.strip())
- rs=(grequests.get(u)foruinurls)
- forresultingrequests.map(rs):
- print(result.status_code,result.url)
注意 grequests.map(rs) 是并发执行的。运行结果如下:

也可以加入异常处理:
- >>>defexception_handler(request,exception):
- ...print("Requestfailed")
- >>>reqs=[
- ...grequests.get('http://httpbin.org/delay/1',timeout=0.001),
- ...grequests.get('http://fakedomain/'),
- ...grequests.get('http://httpbin.org/status/500')]
- >>>grequests.map(reqs,exception_handler=exception_handler)
- Requestfailed
- Requestfailed
- [None,None,<Response[500]>]
最后的话
今天分享了并发 http 请求的几种实现方式,有人说异步(协程)性能比多线程好,其实要分场景看的,没有一种方法适用所有的场景,笔者就曾做过一个实验,也是请求 url,当并发数量超过 500 时,协程明显变慢。
原文链接:https://mp.weixin.qq.com/s/tacsqShnGNFHO3DpuxOMPA








发表评论
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。