转:使用Python写一个m3u8多线程下载器

    技术2024-08-15  65

    转载:使用Python写一个m3u8多线程下载器

    可去看原文:https://blog.csdn.net/muslim377287976/article/details/104340242 文章目录 挖坑缘由 功能 代码 GUI 下载工具类 逻辑代码 总结 挖坑缘由 现在很多在线观看的视频为了防盗链使用了M3u8格式,想要下载的话比较麻烦,如果切分的ts文件名是递增的数字序号的还好说,但是很多是随机的字母,这种就无法通过使用迅雷的批量任务来下载了。然而网上搜到的m3u8downloader使用起来不是很满意,那个工具应该是单线程的,下载进度贼慢,而且如果有一个资源卡住了,就会一直卡在那里,另外我在开发这个下载工具时发现了很多m3u8资源指向是跨域的,不一定都在一个域名下,有可能我使用m3u8downloader时下载失败是这个原因导致的。 在被m3u8downloader折磨了一段时间后终于准备自己写一个下载器了。 先康康最终成果吧

    功能 1.使用线程池进行耗时操作 2.可保留所有ts文件 3.单个文件下载失败可手动下载单个文件,再通过shell命令合并 4.如果m3u8资源支持多分辨率,可以指定速度优先(下载分辨率最小的)和画质优先(下载分辨率最大的) 5.如果不填写视频名称,则使用随机字符串+数字的组合 6.引入ffmpeg,增加加密m3u8文件下载功能(2020.03.15更新)

    代码 GUI 界面部分使用tkinter,虽然丑了点但是挺好用的。。 逻辑代码部分需要与GUI进行交互,显示进度、弹框等,所以把GUI封装成了一个类。这里需要注意,GUI代码部分还没有与逻辑代码绑定。

    from tkinter import * from tkinter import ttk import tkinter.messagebox

    class M3u8Downloader:     def __init__(self, title="M3U8下载器", version=None, auth="莫近东墙"):         self.root = Tk()         self.title = title         self.version = version         self.auth = auth         self.root.title("%s-%s by %s" % (self.title, self.version, self.auth))         self.w = 350         self.h = 360         self.frm = LabelFrame(self.root, width=self.w - 20, height=170, padx=10, text="设置")         self.frm.place(x=10, y=5)         Label(self.frm, text="m3u8地址:", font=("Lucida Grande", 11)).place(x=0, y=0)         self.button_url = Entry(self.frm, width=30)         self.button_url.place(x=0, y=25)

            Label(self.frm, text="视频名称:(无需后缀名)", font=("Lucida Grande", 11)).place(x=0, y=50)         self.button_video_name = Entry(self.frm, width=30)         self.button_video_name.place(x=0, y=75)

            self.v = IntVar()         self.cb_status = IntVar()         self.v.set(1)         self.rb1 = Radiobutton(self.frm, text='速度优先', variable=self.v, value=1, font=("Lucida Grande", 11))         self.rb2 = Radiobutton(self.frm, text='画质优先', variable=self.v, value=2, font=("Lucida Grande", 11))         self.cb = Checkbutton(self.frm, text='保存源文件', variable=self.cb_status, font=("Lucida Grande", 11))         self.rb1.place(x=0, y=95)         self.rb2.place(x=100, y=95)         self.cb.place(x=200, y=95)

            self.button_start = Button(self.frm, text="开始下载", width=8, font=("Lucida Grande", 11))         self.button_start.place(x=230, y=15)         self.button_exit = Button(self.frm, text="退出", width=8, font=("Lucida Grande", 11))         self.button_exit.place(x=230, y=70)

            self.progress = ttk.Progressbar(self.frm, orient="horizontal", length=self.w - 40, mode="determinate")         self.progress.place(x=0, y=120)         self.progress["maximum"] = 100         self.progress["value"] = 0

            self.message_frm = LabelFrame(self.root, width=self.w - 20, height=170, padx=10, text="消息")         self.message_frm.place(x=10, y=180)

            self.scrollbar = Scrollbar(self.message_frm)         self.scrollbar.pack(side='right', fill='y')         self.message_v = StringVar()         self.message_s = ""         self.message_v.set(self.message_s)

            self.message = Text(self.message_frm, width=41, height='11')         self.message.insert('insert', self.message_s)         self.message.pack(side='left', fill='y')         # 以下两行代码绑定text和scrollbar         self.scrollbar.config(command=self.message.yview)         self.message.config(yscrollcommand=self.scrollbar.set)         self.message.config(state=DISABLED)

            ws, hs = self.root.winfo_screenwidth(), self.root.winfo_screenheight()         self.root.geometry('%dx%d+%d+%d' % (self.w, self.h, (ws / 2) - (self.w / 2), (hs / 2) - (self.h / 2)))         self.root.resizable(0, 0)         # self.root.mainloop()

        def alert(self, m):         print("%s" % m)         if m:             self.message.config(state=NORMAL)             self.message.insert(END, m + "\n")             # 确保scrollbar在底部             self.message.see(END)             self.message.config(state=DISABLED)         self.root.update()

        def clear_alert(self):         self.message.config(state=NORMAL)         self.message.delete('1.0', 'end')         self.message.config(state=DISABLED)         self.root.update()

        def show_info(self, m):         tkinter.messagebox.showinfo(self.title,  m)

    下载工具类 这里需要注意的是,requests的超时分为两种,请求超时和读取超时,请求超时是指连接不上,读取超时是指连接上了,但是资源下载不下来(常见于下载国外的资源),timeout=(10, 30)就是设置这两种超时时间。 header=Model_http_header.get_user_agent()是我专门写了一个类用来随机设置请求头的,毕竟很多网站设置了反爬虫。。

    import requests import Model_http_header

    def easy_download(url, cookie=None, header=Model_http_header.get_user_agent(), timeout=(10, 30),                   max_retry_time=3):     i = 1     while i <= max_retry_time:         try:             print("连接:%s" % url)             res = requests.get(url=(url.rstrip()).strip(), cookies=cookie, headers=header, timeout=timeout)             if res.status_code != 200:                 return None             return res         except Exception as e:             print(e)             i += 1     return None

    这个就是随机设置请求头的代码,其中需要注意的是'Accept-Encoding': 'gzip, deflate',可接受的编码格式里面我去掉了br,因为真的有网站把ts文件用br格式进行编码。但是requests默认是不支持解码br格式的。

    import random

    """随机设置user_agent""" user_agent_list = [     "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",     "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",     "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",     "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6",     "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1",     "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5",     "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.36 Safari/536.5",     "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",     "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",     "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 "     "Safari/536.3",     "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",     "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",     "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",     "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",     "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",     "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3",     "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24",     "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24" ]

    def get_user_agent():     header = {         'Accept': 'application/json, text/javascript, */*; q=0.01',         'Accept-Encoding': 'gzip, deflate',         'content-type': 'application/json',         'x-requested-with': 'XMLHttpRequest',         'Accept-Language': 'zh-CN,zh;q=0.8',         'User-Agent': random.choice(user_agent_list)}     return header

    逻辑代码 各个方法注释的挺详细的,我只提一下几个比较重要的地方: 1.代码中会执行下载的耗时操作,需要另开一个线程来跑逻辑代码,不然GUI会卡住。 2.如果在GUI初始化的时候就绑定逻辑代码,就是把s()绑定到button_start这个按钮上,那么代码运行过程中show_info等方法是无法生效的,因为__init__的时候,已经把逻辑代码绑定好了,这时的m3还是None,因此只能等m3对象初始化完成以后,手动绑定按键事件。(我已经晕了) 3.获取ts下载地址是最麻烦的,首先大部分的m3u8文件里面会再嵌套一个m3u8文件,这样做原本是为了提供多分辨率资源可供选择,但是现在基本上都是用来屏蔽m3u8下载插件的。然后ts下载地址都是相对路径,但是这个相对路径有的是相对m3u8文件的,有的是相对域名的。甚至有的m3u8文件域名和嵌套的m3u8文件域名不一样。所以在正式开始下载以前只能先拿一个下载地址进行测试,测试通过了再开始下载

    #!/usr/bin/python3 import Model_download as dm import os import sys import shutil import threadpool import random import m3u8Downloader import threading

    m3 = None download_fail_list = [] running = False url_list = [] order_increase = True exit_flag = False save_source_file = False url_host = None url_path = None

    # 设置排序模式 def order_type(type_):     global order_increase     global m3     order_increase = type_     if type_:         m3.alert("设置速度优先")     else:         m3.alert("设置画质优先")

    # 是否保存源文件 def save_source():     global save_source_file     global m3     if m3.cb_status.get() == 0:         save_source_file = True         m3.alert("下载完成后保存源文件")     else:         save_source_file = False         m3.alert("下载完成后删除源文件")

    # 获取域名 def get_host(url):     url_param = url.split("//")     return url_param[0]+"//"+url_param[1].split("/")[0]+"/"

    # 获取目录 def get_dir(url):     host = get_host(url)     url = url.replace(host, '')     return ("/"+url[0:url.rfind("/")]+"/").replace("//", "/")

    # 获取域名+路径 def get_path(url):     if url.rfind("/") != -1:         return url[0:url.rfind("/")]+"/"     else:         return url[0:url.rfind("\\")] + "\\"

    # 检查地址是否正确 def check_href(m3u8_href):     if m3u8_href:         return True     else:         return False

    # 检查文件名是否正确 def check_video_name(name):     if name is None or "" == name:         a = "1234567890"         b = "abcdefghijklmnopqrstuvwxyz"         aa = []         bb = []         for i in range(6):             aa.append(random.choice(a))             bb.append(random.choice(b))         res = "".join(i + j for i, j in zip(aa, bb))         return res     return name.replace("\t", "").replace("\n", "")

    # 获取带宽 def get_band_width(info):     info_list = info.split("\n")[0].split(",")     for info in info_list:         if info.startswith("BANDWIDTH"):             return int(info.split("=")[1])     return 0

    # 排序 def order_list(o_type, o_list):     o_list.sort(key=get_band_width, reverse=o_type)     return o_list

    # 获取视频下载地址 def get_ts_add(m3u8_href):     global url_path     global url_host     global m3     m3.alert("获取ts下载地址,m3u8地址:\n%s" % m3u8_href)     url_host = get_host(m3u8_href)     url_path = get_path(m3u8_href)     response = dm.easy_download(m3u8_href)     if response is not None:         response = response.text     else:         return []     m3.alert("响应体:\n%s\n" % response)     response_list = response.split("#")     ts_add = []     m3u8_href_list_new = []     for res_obj in response_list:         if res_obj.startswith("EXT-X-KEY"):             m3.show_info("视频文件已加密,请等待后续版本")             break         if res_obj.startswith("EXT-X-STREAM-INF"):             # m3u8 作为主播放列表(Master Playlist),其内部提供的是同一份媒体资源的多份流列表资源(Variant Stream)             # file_add = res_obj.split("\n")[1]             file = res_obj.split(":")[1]             m3u8_href_list_new.append(file)         if res_obj.startswith("EXTINF"):             # 当 m3u8 文件作为媒体播放列表(Media Playlist),其内部信息记录的是一系列媒体片段资源             file = res_obj.split("\n")[1]             ts_add.append(file)     if len(m3u8_href_list_new) > 0:         # 根据画质优先/速度优先排序         m3u8_href_list_new = order_list(order_increase, m3u8_href_list_new)         for info in m3u8_href_list_new:             file = info.split("\n")[1]             ts_add = get_ts_add(url_host + file)             if len(ts_add) == 0:                 ts_add = get_ts_add(url_path + file)     return ts_add

    # 下载视频并保存为文件 def download_to_file(url, file_name):     global download_fail_list     global url_list     global exit_flag     if exit_flag:         return     response = dm.easy_download(url)     if response is None:         download_fail_list.append((url, file_name))         return     with open(file_name, 'wb') as file:         file.write(response.content)         p = count_file(file_name)/len(url_list)*100         set_progress(p)

    # 设置进度条 def set_progress(v):     global m3     m3.progress["value"] = v     m3.root.update()

    # 重新下载视频 def download_fail_file():     global download_fail_list     global m3     if len(download_fail_list) > 0:         for info in download_fail_list:             url = info[0]             file_name = info[1]             m3.alert("正在尝试重新下载%s" % file_name)             response = dm.easy_download(url=url, max_retry_time=50)             if response is None:                 m3.alert("%s下载失败,请手动下载:\n%s" % (file_name, url))                 continue             with open(file_name, 'wb') as file:                 file.write(response.content)                 p = count_file(file_name)/len(url_list)*100                 set_progress(p)

    # 合并文件 def merge_file(dir_name):     global m3     com = "copy /b \"" + dir_name + "\\*\" \"" + dir_name + ".ts\""     m3.alert("执行文件合并命令:%s" % com)     res = os.system(com)     if res == 0:         return True     else:         return False

    # 拼接下载用的参数 def get_download_params(head, dir_name):     global url_list     i = 0     params = []     while i < len(url_list):         index = "%05d" % i         param = ([head + url_list[i], dir_name + "\\" + index + ".ts"], None)         params.append(param)         i += 1     return params

    # 设置线程池开始下载 def start_download_in_pool(params):     global m3     m3.alert("已确认正确地址,开始下载")     pool = threadpool.ThreadPool(10)     thread_requests = threadpool.makeRequests(download_to_file, params)     [pool.putRequest(req) for req in thread_requests]     pool.wait()

    # 获取视频文件数量 def count_file(file_name):     path = get_path(file_name)     file_num = 0     for f_path, f_dir_name, f_names in os.walk(path):         for name in f_names:             if name.endswith(".ts"):                 file_num += 1     return file_num

    # 检查视频文件是否全部下载完成 def check_file(dir_name):     global url_list     path = dir_name     file_num = 0     for f_path, f_dir_name, f_names in os.walk(path):         for name in f_names:             if name.endswith(".ts"):                 file_num += 1     return file_num == len(url_list)

    # 测试下载地址 def test_download_url(url):     global m3     m3.alert("尝试使用%s下载视频" % url)     res = dm.easy_download(url, max_retry_time=10)     return res is not None

    def start(m3u8_href, video_name):     global download_fail_list     global running     global url_list     global m3     global url_path     global url_host

        m3.clear_alert()     set_progress(0)     # 检查地址是否合法     if check_href(m3u8_href) is False:         m3.alert("请输入正确的m3u8地址")         return     # 格式化文件名     video_name = check_video_name(video_name)     # 任务开始标志,防止重复开启下载任务     running = True     # 获取所有ts视频下载地址     url_list = get_ts_add(m3u8_href)     if len(url_list) == 0:         m3.alert("获取地址失败")         # 重置任务开始标志         running = False         return     # 获取程序所在目录     path = os.path.dirname(os.path.realpath(sys.argv[0]))     video_name = path+"\\"+video_name     if not os.path.exists(video_name):         os.makedirs(video_name)     m3.alert("总计%s个视频" % str(len(url_list)))     # 拼接正确的下载地址开始下载     if test_download_url(url_host+url_list[0]):         params = get_download_params(head=url_host, dir_name=video_name)         # 线程池开启线程下载视频         start_download_in_pool(params)     elif test_download_url(url_path+url_list[0]):         params = get_download_params(head=url_path, dir_name=video_name)         # 线程池开启线程下载视频         start_download_in_pool(params)     else:         m3.alert("地址连接失败")         running = False         return     # 重新下载先前下载失败的视频     download_fail_file()     # 检查ts文件总数是否对应     if check_file(video_name):         # 调用cmd方法合并视频         if merge_file(video_name):             if save_source_file is False:                 # 删除文件夹                 shutil.rmtree(video_name)             m3.alert("下载完成")             m3.show_info("下载完成")             set_progress(0)         else:             m3.alert("视频文件合并失败,请查看消息列表")             m3.show_info("视频文件合并失败,请查看消息列表")     else:         m3.alert("请手动下载缺失文件并合并")         m3.show_info("请手动下载缺失文件并合并")     # 清空下载失败视频列表     download_fail_list = []     # 重置任务开始标志     running = False

    def s():     global m3     if running is False:         m3u8_href = m3.button_url.get().rstrip()         video_name = m3.button_video_name.get().rstrip()         # 开启线程执行耗时操作,防止GUI卡顿         t = threading.Thread(target=start, args=(m3u8_href, video_name,))         # 设置守护线程,进程退出不用等待子线程完成         t.setDaemon(True)         t.start()     else:         m3.show_info("任务执行中,请勿重复开启任务")

    def e():     global exit_flag     exit_flag = True     sys.exit(0)

    def run():     global m3     m3 = m3u8Downloader.M3u8Downloader(version="3.6.8")     # 绑定点击事件     m3.rb1.bind("<Button-1>", lambda x: order_type(True))     m3.rb2.bind("<Button-1>", lambda x: order_type(False))     m3.cb.bind("<Button-1>", lambda x: save_source())     m3.button_start.bind("<Button-1>", lambda x: s())     m3.button_exit.bind("<Button-1>", lambda x: e())     # 手动加入消息队列     m3.root.mainloop()

    if __name__ == "__main__":     run()

    总结 贴出来的是我修改以后的第三个版本,日后有时间了再优化。 打包成可执行文件的工具下载地址:链接: https://pan.baidu.com/s/1go2awUhjJgoAQpxfRMeqVw 提取码: r8jx(2020.03.15更新3.7.0版本) 各位要注意身体啊 原文链接:https://blog.csdn.net/muslim377287976/article/details/104340242

    Processed: 0.014, SQL: 9