Pyqt制作B站视频下载软件
Pyqt制作B站视频下载软件
- 界面
- B站下载视频Url提取
- 视频和音频的合成
- 知识点
- 线程下载
- 提取了线程下载基类
- GitHub地址
界面
B站下载视频Url提取
需要注意的是B站的音频和视频是分开的,也就是说要下载两个东西,然后自己合并才能变成一个完整的视频,如下:
sText为请求的视频网站的内容
self.m_oRequest =requests.session()oResult = self.m_oRequest.get(sUrl, headers=base_headers, timeout=3)sText = oResult.text
def getVideoLinks(self, sText):# get video download linksfrom public import toolsdctDownloadLink = {}try:data = re.findall(r'<script>window\.__playinfo__=(.*?)</script>', sText)[0]dctBiliData = json.loads(data)#pprint.pprint(dctBiliData)lstLinkData = dctBiliData['data']['dash']['video']sAudioUrl = dctBiliData['data']['dash']['audio'][0]["base_url"].replace("http", "https")sVideoUrl = dctBiliData['data']['dash']['video'][0]["base_url"].replace("http", "https")dctDownloadLink["AudioUrl"] = sAudioUrldctDownloadLink["VideoUrl"] = sVideoUrlexcept:print("解析地址失败...")print("下载链接 ", dctDownloadLink)return dctDownloadLink
视频和音频的合成
这里使用了第三方软件
ffmpeg
网上下载该软件,里面会有exe执行文件,将该执行文件路径加入到系统环境变量中去
合成代码如下
# 合成音频和视频def combineVideo(self, sAudioPath, sVideoPath, sOutputPath):import subprocessimport ffmpegprint("开始合成音频和视频...", sAudioPath, sVideoPath, sOutputPath)sCmd = 'ffmpeg -i ' + sAudioPath + ' -i ' + sVideoPath + ' -vcodec copy -acodec copy -y ' + sOutputPathsubprocess.run(sCmd, shell=True)print("合成完成...")selfbineFinish()
知识点
线程下载
支持暂停,继续,停止
class CDownloader(QThread):"""download class"""oSignalDownload = pyqtSignal(QThread, int) # 下载量信号 list: value flagoSignalFinish = pyqtSignal(QThread, int) # 下载结束信号def __init__(self, iAvID, dctLinks):super(CDownloader, self).__init__()self.m_iAvID = iAvIDself.m_dctLinks = dctLinksself.m_oEventPause = threading.Event() # pause flagself.m_oEventPause.set() # Trueself.m_oEventRunning = threading.Event() # stop flagself.m_oEventRunning.set() # TruesScrPath = os.getcwd()self.m_sDir = os.path.join(sScrPath, "./videos")if not os.path.exists(self.m_sDir):os.mkdir(self.m_sDir)def run(self):"""download a videomay contain muti-slices videos"""headers = {'Accept':'*/*','Accept-Encoding':'gzip, deflate, br','Accept-Language':'zh-CN,zh;q=0.8,en;q=0.6','Cache-Control':'no-cache','Connection':'keep-alive','Origin':'','Pragma':'no-cache','User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36',}headers['Referer'] = '' % self.m_iAvIDprint("下载线程开启...")sAudioPath = self.m_sDir + os.sep + str(self.m_iAvID) + ".mp3"sVideoPath = self.m_sDir + os.sep + str(self.m_iAvID) + ".mp4"sOutputPath = self.m_sDir + os.sep + "av" + str(self.m_iAvID) + ".mp4"for sType, sUrl in self.m_dctLinks.items():if sType == "AudioUrl":sFileName = sAudioPathelse:sFileName = sVideoPathwhile self.m_oEventRunning.isSet(): # 如果被设置为了true就继续,false就终止了print("开始下载...", sFileName)with contextlib.closing(requests.get(sUrl, headers = headers, timeout = 3, stream=True)) as oRequest:print("状态码: ", oRequest.status_code)iContentSize = int(oRequest.headers['content-length']) # 内容体总大小iChunkSize = int(iContentSize/100) # 单次请求最大值print('文件总大小: %02sM, 单次请求最大值: %s' % (iContentSize / 1024 / 1024, iChunkSize))count = 0try:with open(sFileName, "wb") as file:#当流下载时,用Response.iter_content或许更方便些#requests.get(url)默认是下载在内存中的 下载完成才存到硬盘上# 可以用Response.iter_content 来边下载边存硬盘for data in oRequest.iter_content(chunk_size=iChunkSize):if not self.m_oEventRunning.isSet():self.finishToEmit(-2)returnself.m_oEventPause.wait() # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回file.write(data)count += len(data)#print("已下载大小: %s, 进度: %d%%" % (count, count / iContentSize * 100))self.progressToEmit(int(count / iContentSize * 100))except Exception as e:self.finishToEmit(-1)print("错误:", e)returnprint("下载完成! ", sFileName)breaktry:selfbineVideo(sAudioPath, sVideoPath, sOutputPath)self.delSourceFile(sAudioPath, sVideoPath)except Exception as e:print("错误,请检查是否安装了ffmpeg", e)self.finishToEmit(1)# 合成音频和视频def combineVideo(self, sAudioPath, sVideoPath, sOutputPath):import subprocessimport ffmpegprint("开始合成音频和视频...", sAudioPath, sVideoPath, sOutputPath)sCmd = 'ffmpeg -i ' + sAudioPath + ' -i ' + sVideoPath + ' -vcodec copy -acodec copy -y ' + sOutputPathsubprocess.run(sCmd, shell=True)print("合成完成...")selfbineFinish()# 删除源文件def delSourceFile(self, sAudioPath, sVideoPath):print("删除源文件: ", sAudioPath, sVideoPath)os.remove(sAudioPath)os.remove(sVideoPath)def combineFinish(self):print("合成成功!")def pause(self):self.m_oEventPause.clear()def resume(self):self.m_oEventPause.set()def stop(self):self.m_oEventRunning.clear()self.exit()def progressToEmit(self, iProgress):self.oSignalDownload.emit(self, iProgress)def finishToEmit(self, iFlag):self.oSignalFinish.emit(self, iFlag)
提取了线程下载基类
# -*- coding: utf-8 -*-
from PyQt5.QtCore import QThread, pyqtSignal
import threading
import contextlib
import requests
import os# 线程下载基类
class CDownloaderBase(QThread):"""download class"""oSignalDownload = pyqtSignal(QThread, int) # 下载量信号 list: value flagoSignalFinish = pyqtSignal(QThread, int) # 下载结束信号def __init__(self, sSavePath, sUrl):super(CDownloaderBase, self).__init__()self.m_sSavePath = sSavePathself.m_sUrl = sUrlself.m_oEventPause = threading.Event() # pause flagself.m_oEventPause.set() # Trueself.m_oEventRunning = threading.Event() # stop flagself.m_oEventRunning.set() # Trueself.m_dctHeader = {}def setHeader(self, dctHeader):self.m_dctHeader = {}def getHeader(self):dctHeaders = {'Accept': '*/*','Accept-Encoding': 'gzip, deflate, br','Accept-Language': 'zh-CN,zh;q=0.8,en;q=0.6','Cache-Control': 'no-cache','Connection': 'keep-alive',# 'Origin':'','Pragma': 'no-cache','User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36','Referer': "",}return self.m_dctHeader.update(dctHeaders)def run(self):dctHeaders = self.getHeader()print("下载线程开启...")while self.m_oEventRunning.isSet(): # 如果被设置为了true就继续,false就终止了print("开始下载...", self.m_sSavePath, self.m_sUrl)with contextlib.closing(requests.get(self.m_sUrl, headers = dctHeaders, timeout = 5, stream=True)) as oRequest:print("状态码: ", oRequest.status_code)if oRequest.status_code != 200:print("状态码有误,下载失败")returniContentSize = int(oRequest.headers['content-length']) # 内容体总大小iChunkSize = int(iContentSize/100) # 单次请求最大值print('文件总大小: %.3f M, 单次请求最大值: %s' % (iContentSize / 1024 / 1024, iChunkSize))count = 0try:self.createFile(self.m_sSavePath)with open(self.m_sSavePath, "wb") as oFile:#当流下载时,用Response.iter_content或许更方便些#requests.get(url)默认是下载在内存中的 下载完成才存到硬盘上# 可以用Response.iter_content 来边下载边存硬盘for data in oRequest.iter_content(chunk_size=iChunkSize):if not self.m_oEventRunning.isSet():self.finishToEmit(-2)returnself.m_oEventPause.wait() # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回oFile.write(data)count += len(data)#print("已下载大小: %s, 进度: %d%%" % (count, count / iContentSize * 100))self.progressToEmit(int(count / iContentSize * 100))except Exception as e:self.finishToEmit(-1)print("错误:", e)returnbreakprint("下载完成! ", self.m_sSavePath)self.finishToEmit(1)def pause(self):self.m_oEventPause.clear()def resume(self):self.m_oEventPause.set()def stop(self):self.m_oEventRunning.clear()self.exit()def progressToEmit(self, iProgress):self.oSignalDownload.emit(self, iProgress)def finishToEmit(self, iFlag):self.oSignalFinish.emit(self, iFlag)def createFile(self, sPath):import ossPath = sPath.replace("\\", "/")sDir = sPath[0:sPath.rfind("/")]if not os.path.isdir(sDir): # 无文件夹时创建os.makedirs(sDir)if not os.path.isfile(sPath): # 无文件时创建with open(sPath, mode="w", encoding="utf-8"):passelse:pass# 不使用线程下载
def downloadByUrl(url, sPath, headers = None, dctConfig = None):import contextlibimport requestsif not dctConfig:dctConfig = {}if not headers:headers = {}header_base = {'Accept': '*/*','Accept-Encoding': 'gzip, deflate, br','Accept-Language': 'zh-CN,zh;q=0.8,en;q=0.6','Cache-Control': 'no-cache','Connection': 'keep-alive','Pragma': 'no-cache','User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36',#'Referer': "",#'Origin': '',}header_base.update(headers)createFile(sPath)with contextlib.closing(requests.get(url, headers=header_base, stream=True)) as oResponse: # stream属性必须带上chunk_size = dctConfig.get("ChunkSize", 1024) # 每次下载的数据大小content_size = int(oResponse.headers['content-length']) # 总大小if oResponse.status_code == 200:print('[文件大小]:%0.2f MB' % (content_size / 1024 / 1024)) # 换算单位with open(sPath, 'wb') as file:for data in oResponse.iter_content(chunk_size=chunk_size):file.write(data)print("下载成功", url, sPath)def createFile(sPath):import ossPath = sPath.replace("\\", "/")sDir = sPath[0:sPath.rfind("/")]if not os.path.isdir(sDir): # 无文件夹时创建os.makedirs(sDir)if not os.path.isfile(sPath): # 无文件时创建with open(sPath, mode="w", encoding="utf-8"):passelse:pass
GitHub地址
GitHub
Pyqt制作B站视频下载软件
Pyqt制作B站视频下载软件
- 界面
- B站下载视频Url提取
- 视频和音频的合成
- 知识点
- 线程下载
- 提取了线程下载基类
- GitHub地址
界面
B站下载视频Url提取
需要注意的是B站的音频和视频是分开的,也就是说要下载两个东西,然后自己合并才能变成一个完整的视频,如下:
sText为请求的视频网站的内容
self.m_oRequest =requests.session()oResult = self.m_oRequest.get(sUrl, headers=base_headers, timeout=3)sText = oResult.text
def getVideoLinks(self, sText):# get video download linksfrom public import toolsdctDownloadLink = {}try:data = re.findall(r'<script>window\.__playinfo__=(.*?)</script>', sText)[0]dctBiliData = json.loads(data)#pprint.pprint(dctBiliData)lstLinkData = dctBiliData['data']['dash']['video']sAudioUrl = dctBiliData['data']['dash']['audio'][0]["base_url"].replace("http", "https")sVideoUrl = dctBiliData['data']['dash']['video'][0]["base_url"].replace("http", "https")dctDownloadLink["AudioUrl"] = sAudioUrldctDownloadLink["VideoUrl"] = sVideoUrlexcept:print("解析地址失败...")print("下载链接 ", dctDownloadLink)return dctDownloadLink
视频和音频的合成
这里使用了第三方软件
ffmpeg
网上下载该软件,里面会有exe执行文件,将该执行文件路径加入到系统环境变量中去
合成代码如下
# 合成音频和视频def combineVideo(self, sAudioPath, sVideoPath, sOutputPath):import subprocessimport ffmpegprint("开始合成音频和视频...", sAudioPath, sVideoPath, sOutputPath)sCmd = 'ffmpeg -i ' + sAudioPath + ' -i ' + sVideoPath + ' -vcodec copy -acodec copy -y ' + sOutputPathsubprocess.run(sCmd, shell=True)print("合成完成...")selfbineFinish()
知识点
线程下载
支持暂停,继续,停止
class CDownloader(QThread):"""download class"""oSignalDownload = pyqtSignal(QThread, int) # 下载量信号 list: value flagoSignalFinish = pyqtSignal(QThread, int) # 下载结束信号def __init__(self, iAvID, dctLinks):super(CDownloader, self).__init__()self.m_iAvID = iAvIDself.m_dctLinks = dctLinksself.m_oEventPause = threading.Event() # pause flagself.m_oEventPause.set() # Trueself.m_oEventRunning = threading.Event() # stop flagself.m_oEventRunning.set() # TruesScrPath = os.getcwd()self.m_sDir = os.path.join(sScrPath, "./videos")if not os.path.exists(self.m_sDir):os.mkdir(self.m_sDir)def run(self):"""download a videomay contain muti-slices videos"""headers = {'Accept':'*/*','Accept-Encoding':'gzip, deflate, br','Accept-Language':'zh-CN,zh;q=0.8,en;q=0.6','Cache-Control':'no-cache','Connection':'keep-alive','Origin':'','Pragma':'no-cache','User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36',}headers['Referer'] = '' % self.m_iAvIDprint("下载线程开启...")sAudioPath = self.m_sDir + os.sep + str(self.m_iAvID) + ".mp3"sVideoPath = self.m_sDir + os.sep + str(self.m_iAvID) + ".mp4"sOutputPath = self.m_sDir + os.sep + "av" + str(self.m_iAvID) + ".mp4"for sType, sUrl in self.m_dctLinks.items():if sType == "AudioUrl":sFileName = sAudioPathelse:sFileName = sVideoPathwhile self.m_oEventRunning.isSet(): # 如果被设置为了true就继续,false就终止了print("开始下载...", sFileName)with contextlib.closing(requests.get(sUrl, headers = headers, timeout = 3, stream=True)) as oRequest:print("状态码: ", oRequest.status_code)iContentSize = int(oRequest.headers['content-length']) # 内容体总大小iChunkSize = int(iContentSize/100) # 单次请求最大值print('文件总大小: %02sM, 单次请求最大值: %s' % (iContentSize / 1024 / 1024, iChunkSize))count = 0try:with open(sFileName, "wb") as file:#当流下载时,用Response.iter_content或许更方便些#requests.get(url)默认是下载在内存中的 下载完成才存到硬盘上# 可以用Response.iter_content 来边下载边存硬盘for data in oRequest.iter_content(chunk_size=iChunkSize):if not self.m_oEventRunning.isSet():self.finishToEmit(-2)returnself.m_oEventPause.wait() # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回file.write(data)count += len(data)#print("已下载大小: %s, 进度: %d%%" % (count, count / iContentSize * 100))self.progressToEmit(int(count / iContentSize * 100))except Exception as e:self.finishToEmit(-1)print("错误:", e)returnprint("下载完成! ", sFileName)breaktry:selfbineVideo(sAudioPath, sVideoPath, sOutputPath)self.delSourceFile(sAudioPath, sVideoPath)except Exception as e:print("错误,请检查是否安装了ffmpeg", e)self.finishToEmit(1)# 合成音频和视频def combineVideo(self, sAudioPath, sVideoPath, sOutputPath):import subprocessimport ffmpegprint("开始合成音频和视频...", sAudioPath, sVideoPath, sOutputPath)sCmd = 'ffmpeg -i ' + sAudioPath + ' -i ' + sVideoPath + ' -vcodec copy -acodec copy -y ' + sOutputPathsubprocess.run(sCmd, shell=True)print("合成完成...")selfbineFinish()# 删除源文件def delSourceFile(self, sAudioPath, sVideoPath):print("删除源文件: ", sAudioPath, sVideoPath)os.remove(sAudioPath)os.remove(sVideoPath)def combineFinish(self):print("合成成功!")def pause(self):self.m_oEventPause.clear()def resume(self):self.m_oEventPause.set()def stop(self):self.m_oEventRunning.clear()self.exit()def progressToEmit(self, iProgress):self.oSignalDownload.emit(self, iProgress)def finishToEmit(self, iFlag):self.oSignalFinish.emit(self, iFlag)
提取了线程下载基类
# -*- coding: utf-8 -*-
from PyQt5.QtCore import QThread, pyqtSignal
import threading
import contextlib
import requests
import os# 线程下载基类
class CDownloaderBase(QThread):"""download class"""oSignalDownload = pyqtSignal(QThread, int) # 下载量信号 list: value flagoSignalFinish = pyqtSignal(QThread, int) # 下载结束信号def __init__(self, sSavePath, sUrl):super(CDownloaderBase, self).__init__()self.m_sSavePath = sSavePathself.m_sUrl = sUrlself.m_oEventPause = threading.Event() # pause flagself.m_oEventPause.set() # Trueself.m_oEventRunning = threading.Event() # stop flagself.m_oEventRunning.set() # Trueself.m_dctHeader = {}def setHeader(self, dctHeader):self.m_dctHeader = {}def getHeader(self):dctHeaders = {'Accept': '*/*','Accept-Encoding': 'gzip, deflate, br','Accept-Language': 'zh-CN,zh;q=0.8,en;q=0.6','Cache-Control': 'no-cache','Connection': 'keep-alive',# 'Origin':'','Pragma': 'no-cache','User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36','Referer': "",}return self.m_dctHeader.update(dctHeaders)def run(self):dctHeaders = self.getHeader()print("下载线程开启...")while self.m_oEventRunning.isSet(): # 如果被设置为了true就继续,false就终止了print("开始下载...", self.m_sSavePath, self.m_sUrl)with contextlib.closing(requests.get(self.m_sUrl, headers = dctHeaders, timeout = 5, stream=True)) as oRequest:print("状态码: ", oRequest.status_code)if oRequest.status_code != 200:print("状态码有误,下载失败")returniContentSize = int(oRequest.headers['content-length']) # 内容体总大小iChunkSize = int(iContentSize/100) # 单次请求最大值print('文件总大小: %.3f M, 单次请求最大值: %s' % (iContentSize / 1024 / 1024, iChunkSize))count = 0try:self.createFile(self.m_sSavePath)with open(self.m_sSavePath, "wb") as oFile:#当流下载时,用Response.iter_content或许更方便些#requests.get(url)默认是下载在内存中的 下载完成才存到硬盘上# 可以用Response.iter_content 来边下载边存硬盘for data in oRequest.iter_content(chunk_size=iChunkSize):if not self.m_oEventRunning.isSet():self.finishToEmit(-2)returnself.m_oEventPause.wait() # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回oFile.write(data)count += len(data)#print("已下载大小: %s, 进度: %d%%" % (count, count / iContentSize * 100))self.progressToEmit(int(count / iContentSize * 100))except Exception as e:self.finishToEmit(-1)print("错误:", e)returnbreakprint("下载完成! ", self.m_sSavePath)self.finishToEmit(1)def pause(self):self.m_oEventPause.clear()def resume(self):self.m_oEventPause.set()def stop(self):self.m_oEventRunning.clear()self.exit()def progressToEmit(self, iProgress):self.oSignalDownload.emit(self, iProgress)def finishToEmit(self, iFlag):self.oSignalFinish.emit(self, iFlag)def createFile(self, sPath):import ossPath = sPath.replace("\\", "/")sDir = sPath[0:sPath.rfind("/")]if not os.path.isdir(sDir): # 无文件夹时创建os.makedirs(sDir)if not os.path.isfile(sPath): # 无文件时创建with open(sPath, mode="w", encoding="utf-8"):passelse:pass# 不使用线程下载
def downloadByUrl(url, sPath, headers = None, dctConfig = None):import contextlibimport requestsif not dctConfig:dctConfig = {}if not headers:headers = {}header_base = {'Accept': '*/*','Accept-Encoding': 'gzip, deflate, br','Accept-Language': 'zh-CN,zh;q=0.8,en;q=0.6','Cache-Control': 'no-cache','Connection': 'keep-alive','Pragma': 'no-cache','User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36',#'Referer': "",#'Origin': '',}header_base.update(headers)createFile(sPath)with contextlib.closing(requests.get(url, headers=header_base, stream=True)) as oResponse: # stream属性必须带上chunk_size = dctConfig.get("ChunkSize", 1024) # 每次下载的数据大小content_size = int(oResponse.headers['content-length']) # 总大小if oResponse.status_code == 200:print('[文件大小]:%0.2f MB' % (content_size / 1024 / 1024)) # 换算单位with open(sPath, 'wb') as file:for data in oResponse.iter_content(chunk_size=chunk_size):file.write(data)print("下载成功", url, sPath)def createFile(sPath):import ossPath = sPath.replace("\\", "/")sDir = sPath[0:sPath.rfind("/")]if not os.path.isdir(sDir): # 无文件夹时创建os.makedirs(sDir)if not os.path.isfile(sPath): # 无文件时创建with open(sPath, mode="w", encoding="utf-8"):passelse:pass
GitHub地址
GitHub
发布评论