原文地址:https://www.douyacun.com/article/e244bb20907ed56e08164ac69ec6d249
版本
平台
QT:5.15.7
aiohttp: 3.8.3
qasync: 0.23.0
思路一:
启动一个线程QThread后台去执行,下载任务和下载进度 都是 通过pyqtSignal进行交互
class DownloadThread(QtCore.QThread):
def __init__(self):
super(DownloadThread, self).__init__()
def run(self) -> None:
loop = qasync.QEventLoop(self)
asyncio.set_event_loop(loop)
loop.run_forever()
class Download(QtCore.QObject):
url_signal = pyqtSignal(str)
progress_signal = pyqtSignal(str)
def __init__(self):
super(Download, self).__init__()
self.url_signal.connect(self.schedule)
@asyncSlot(str)
async def schedule(self, data):
if data == "":
return
video = json.loads(data, object_hook=decodeVideo)
loop = asyncio.get_event_loop()
loop.set_exception_handler(self.exception_handle)
session_timeout = aiohttp.ClientTimeout(total=None, sock_connect=10)
async with aiohttp.ClientSession(loop=loop, timeout=session_timeout) as session:
cwlist = [loop.create_task(self.fetch_limit(semaphore, session, video, dir)) for video in videos]
await asyncio.gather(*cwlist)
async def fetch(self, video):
filename = video.title if video.title else video.id
file = path.join(dir, filename + ".mp4")
try:
with open(file, "wb+") as fd:
logging.info("%s\t%s\t%s" % (video.title, video.video_url, file))
# 在windows上这里会被卡住,不再仅需运行,替换使用requests都可以正常返回
async with session.get(video.video_url, headers=self.getHeaders()) as resp:
video.content_length = resp.headers["Content-Length"]
logging.info("content-length: " + video.content_length)
logging.info("file: " + file)
async for chunk in resp.content.iter_chunked(chunk_size):
fd.write(chunk)
video.size += len(chunk)
video.progress = int(video.size / int(video.content_length) * 100)
self.progress_signal.emit(json.dumps(video, default=encodeVideo))
except asyncio.exceptions.TimeoutError as e:
print("fetch timeout: ", e)
class Batch(QtWidgets.QWidget):
def __init__(self, context: Context):
super(IndexBatch, self).__init__()
self.context = context
self.thread = DownloadThread()
self.download = Download()
self.download.moveToThread(self.thread)
self.thread.start()
self.setUi()
def onDownload(self):
for video in self._videos:
if video.checked:
self.download.url_signal.emit(json.dumps(video, default=encodeVideo),
path.join(self.save_dir, self._author_nickname))
问题:
思路一的代码在mac上可以运行,session请求是可以响应的,但是在win10上,async with session.get(video.video_url) as resp
会被卡住不再执行,解决思路
reqeusts.get(video.video_url)
是可以正常响应的asyncio.exceptions.TimeoutError
异常打印,可能是async with session.get
没有进入到执行队列中去~思路二
尝试去掉DownloadThread执行下载任务的思路,直接将下载任务放到异步执行队列(loop)中去
class Download(QtCore.QObject):
progress_signal = pyqtSignal(str)
def __init__(self):
super(Download, self).__init__()
async def schedule(self, videos, dir: str):
logging.info("download count: %d" % (len(videos)))
loop = asyncio.get_event_loop()
loop.set_exception_handler(self.exception_handle)
session_timeout = aiohttp.ClientTimeout(total=None, sock_connect=10)
async with aiohttp.ClientSession(loop=loop, timeout=session_timeout) as session:
cwlist = [loop.create_task(self.fetch(semaphore, session, video, dir)) for video in videos]
await asyncio.gather(*cwlist)
def exception_handle(self, *args):
logging.info("asyncio.exception.handle: {}" % args)
async def fetch(self, session, video, dir):
filename = video.title if video.title else video.id
file = path.join(dir, filename + ".mp4")
with open(file, "wb+") as fd:
logging.info("%s\t%s\t%s" % (video.title, video.video_url, file))
async with session.get(video.video_url, headers=self.getHeaders()) as resp:
video.content_length = resp.headers["Content-Length"]
async for chunk in resp.content.iter_chunked(chunk_size):
fd.write(chunk)
video.size += len(chunk)
video.progress = int(video.size / int(video.content_length) * 100)
self.progress_signal.emit(json.dumps(video, default=encodeVideo))
class Batch(QtWidgets.QWidget):
def __init__(self, context: Context):
super(IndexBatch, self).__init__()
self.context = context
self.download = Download()
self.setUi()
def onConnect(self):
self._ui.download_btn.clicked.connect(self.onDownload)
self.download.progress_signal.connect(self.onDownloadProcess)
# 订阅Download下载进度更新进度条
def onDownloadProcess(self, data):
vd = json.loads(data, object_hook=decodeVideo)
bar = self._progress_bar_dict[vd.id]
bar.setValue(vd.progress)
@asyncSlot()
async def onDownload(self):
videos = []
for video in self._videos:
if video.checked and video.progress == 0:
videos.append(video)
await self.download.schedule(videos, self.save_dir)
async def main():
def close_future(future, loop):
loop.call_later(10, future.cancel)
future.cancel()
loop = asyncio.get_event_loop()
future = asyncio.Future()
app = QApplication.instance()
if hasattr(app, "aboutToQuit"):
getattr(app, "aboutToQuit").connect(
functools.partial(close_future, future, loop)
)
m = MainWindow()
m.run()
await future
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
QtCore.QCoreApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling)
try:
qasync.run(main())
except asyncio.exceptions.CancelledError:
sys.exit(1)
效果截图