获取稳定IPTV直播源的方法

原理与实现过程

源码地址:点我

1、使用网络空间资产搜索引擎fofazoomeye,找到提供iptv服务的ip

查询语法"iptv/live/zh_cn.js" && country="CN" && region="Anhui"

image-20240730134148288

2、通过列表推导式构造链接并获得频道名称和播放地址

http://ip + 端口 + /iptv/live/1000.json?key=txiptv

例如:http://220.180.112.196:9901/iptv/live/1000.json?key=txiptv,如果是有效链接,访问就会返回包含频道信息的json文件

image-20240730134745160

构造出该ip同一子网下所有可用链接,在ip.py源码中:

def to_check_url(self):
        result_urls = []
        ip_db = self.db
        ips = ip_db.view('ip')
        if ips:
            ips = [i[1] for i in ips]
            for ip in ips:
                # 构造json_link
                t = re.sub(r'\.\d+:', '##:', ip).split('##')
                urls = [t[0] + '.' + str(i) + t[1] + "/iptv/live/1000.json?key=txiptv" for i in range(1, 256)]
                result_urls += urls
            result_urls = list(set(result_urls))
            to_check_urls = [url for url in result_urls if 'http' or 'https' in url]
            return to_check_urls
        else: return []

访问可用的json链接,获取频道名称和播放地址,通过channel.py实现

3、 测试频道是否可用

m3u8.py实现,核心函数如下:

async def testm3u(self, data, semaphore, dp=False):
    '''
    输入m3u8源链接,返回速度
    :param url: m3u8文件链接
    :param semaphore: 并行数'''
    try:
        url = data[1]
        name = data[0]
        async with semaphore:
            conn = aiohttp.TCPConnector(limit=100, ssl=False)
            async with aiohttp.ClientSession(connector= conn) as session:
                async with session.get(url) as response:
                    channel_url_t = url.rstrip(url.split('/')[-1])  # m3u8链接前缀
                    res = await response.text()
                    lines = res.strip().split('\n')  # 获取m3u8文件内容
                    ts_lists = [line.split('/')[-1] for line in lines if line.startswith('#') == False]  # 获取m3u8文件下视频流后缀
                    ts_url = channel_url_t + ts_lists[0]
                
                # 获取视频分辨率
                if dp:
                    speed, height = await self.m_speed(session, ts_url) 
                else:
                    speed = await self.speed(session, ts_url)
                    height = 0
                # self.delete_file(f_path)
                if speed > 10:
                    print(f"正在测试{url}\n下载速度为:{speed} KB/s")
                    return (name, url, speed, height)
                else:
                    return 0
    except Exception as e:
        # print(f"Error: {e}")
        return 0

使用semaphore控制并发数,可以防止高并发请求时卡死。


获取稳定IPTV直播源的方法
https://szsc.asia/2024/07/30/huo-de-wen-ding-iptv-zhi-bo-yuan-de-fang-fa/
Author
AW
Posted on
2024年7月30日
Licensed under