pyqt5异步批量下载

原创:pyqt11/08/2022发布pv:0uv:0ip:0twitter #pyqt

原文地址:https://www.douyacun.com/article/e244bb20907ed56e08164ac69ec6d249

批量下载视频文件,不影响GUI运行,GUI展示相应的进度

版本

  • 平台

    • mac 11
    • windows 10
  • QT:5.15.7

  • aiohttp: 3.8.3

  • qasync: 0.23.0

思路一:

启动一个线程QThread后台去执行,下载任务和下载进度 都是 通过pyqtSignal进行交互

class DownloadThread(QtCore.QThread):
    def __init__(self):
        super(DownloadThread, self).__init__()

    def run(self) -> None:
        loop = qasync.QEventLoop(self)
        asyncio.set_event_loop(loop)
        loop.run_forever()


class Download(QtCore.QObject):
    url_signal = pyqtSignal(str)
    progress_signal = pyqtSignal(str)

    def __init__(self):
        super(Download, self).__init__()
        self.url_signal.connect(self.schedule)

		@asyncSlot(str)
    async def schedule(self, data):
        if data == "":
            return
        video = json.loads(data, object_hook=decodeVideo)
        loop = asyncio.get_event_loop()
        loop.set_exception_handler(self.exception_handle)
        session_timeout = aiohttp.ClientTimeout(total=None, sock_connect=10)
        async with aiohttp.ClientSession(loop=loop, timeout=session_timeout) as session:
            cwlist = [loop.create_task(self.fetch_limit(semaphore, session, video, dir)) for video in videos]
            await asyncio.gather(*cwlist)
          
    async def fetch(self, video):
      	filename = video.title if video.title else video.id
        file = path.join(dir, filename + ".mp4")
        try:
            with open(file, "wb+") as fd:
                logging.info("%s\t%s\t%s" % (video.title, video.video_url, file))
                # 在windows上这里会被卡住,不再仅需运行,替换使用requests都可以正常返回
                async with session.get(video.video_url, headers=self.getHeaders()) as resp:
                    video.content_length = resp.headers["Content-Length"]
                    logging.info("content-length: " + video.content_length)
                    logging.info("file: " + file)
                    async for chunk in resp.content.iter_chunked(chunk_size):
                        fd.write(chunk)
                        video.size += len(chunk)
                        video.progress = int(video.size / int(video.content_length) * 100)
                        self.progress_signal.emit(json.dumps(video, default=encodeVideo))
        except asyncio.exceptions.TimeoutError as e:
            print("fetch timeout: ", e)
                        
 class Batch(QtWidgets.QWidget):
    def __init__(self, context: Context):
        super(IndexBatch, self).__init__()
        self.context = context
        self.thread = DownloadThread()
        self.download = Download()
        self.download.moveToThread(self.thread)
        self.thread.start()
        self.setUi()                       
		
    def onDownload(self):
        for video in self._videos:
            if video.checked:
                self.download.url_signal.emit(json.dumps(video, default=encodeVideo),
                                              path.join(self.save_dir, self._author_nickname))

问题:

思路一的代码在mac上可以运行,session请求是可以响应的,但是在win10上,async with session.get(video.video_url) as resp 会被卡住不再执行,解决思路

  • 使用reqeusts.get(video.video_url)是可以正常响应的
  • aiohttp设置sock_connect没有收到asyncio.exceptions.TimeoutError 异常打印,可能是async with session.get没有进入到执行队列中去~

思路二

尝试去掉DownloadThread执行下载任务的思路,直接将下载任务放到异步执行队列(loop)中去

class Download(QtCore.QObject):
    progress_signal = pyqtSignal(str)

    def __init__(self):
        super(Download, self).__init__()

    async def schedule(self, videos, dir: str):
        logging.info("download count: %d" % (len(videos)))
        loop = asyncio.get_event_loop()
        loop.set_exception_handler(self.exception_handle)
        session_timeout = aiohttp.ClientTimeout(total=None, sock_connect=10)
        async with aiohttp.ClientSession(loop=loop, timeout=session_timeout) as session:
            cwlist = [loop.create_task(self.fetch(semaphore, session, video, dir)) for video in videos]
            await asyncio.gather(*cwlist)

    def exception_handle(self, *args):
        logging.info("asyncio.exception.handle: {}" % args)

    async def fetch(self, session, video, dir):
        filename = video.title if video.title else video.id
        file = path.join(dir, filename + ".mp4")
        with open(file, "wb+") as fd:
          logging.info("%s\t%s\t%s" % (video.title, video.video_url, file))
          async with session.get(video.video_url, headers=self.getHeaders()) as resp:
            video.content_length = resp.headers["Content-Length"]
            async for chunk in resp.content.iter_chunked(chunk_size):
              fd.write(chunk)
              video.size += len(chunk)
              video.progress = int(video.size / int(video.content_length) * 100)
              self.progress_signal.emit(json.dumps(video, default=encodeVideo))
              
class Batch(QtWidgets.QWidget):
    def __init__(self, context: Context):
        super(IndexBatch, self).__init__()
        self.context = context
        self.download = Download()
        self.setUi()                       
		
    def onConnect(self):
        self._ui.download_btn.clicked.connect(self.onDownload)
        self.download.progress_signal.connect(self.onDownloadProcess)

    # 订阅Download下载进度更新进度条
    def onDownloadProcess(self, data):
        vd = json.loads(data, object_hook=decodeVideo)
        bar = self._progress_bar_dict[vd.id]
        bar.setValue(vd.progress)
        
    @asyncSlot()
    async def onDownload(self):
        videos = []
        for video in self._videos:
            if video.checked and video.progress == 0:
                videos.append(video)
        await self.download.schedule(videos, self.save_dir)
        
async def main():
    def close_future(future, loop):
        loop.call_later(10, future.cancel)
        future.cancel()

    loop = asyncio.get_event_loop()
    future = asyncio.Future()

    app = QApplication.instance()
    if hasattr(app, "aboutToQuit"):
        getattr(app, "aboutToQuit").connect(
            functools.partial(close_future, future, loop)
        )
    m = MainWindow()
    m.run()
    await future


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    QtCore.QCoreApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling)
    try:
        qasync.run(main())
    except asyncio.exceptions.CancelledError:
        sys.exit(1)

效果截图

pyqt5-asyncio-download

异步加载图片,不影响GUI运行, 待补充