校园网免流看B站视频-基于socks5的再探索

引言

校园网下通过修改hosts方式实现基于ipv6的免流观看b站的尝试 – jcy1998的WordPress校园网免流看b站(bilibili/哔哩哔哩)视频——基于 Technitium DNS Server – jcy1998的WordPress中,我使用了类似于修改hosts的方法,将B站的视频CDN,全部统一定向到某个ipv6地址.由于Windows的机制,不能使用通配符匹配,所以我选取了本地DNS服务器的思路。

在那之后,我学习了套接字编程,熟读了Shadowsocks的程序设计,一直想要再有所想法,却又有点不知如何下手。在午睡看本子,更新app时,偶然看到了Pica Comic的开发文档:在flutter/dart中使用ip进行https请求 (wgh136.github.io),这是避免漫画站点域名被污染,而采取直接使用ip访问的思路。

在这个过程中,我就有了灵感,即通过socks5代理操作——将b站视频源的地址直接强制换成指定ipv6地址,故有了这么一篇博客。做了一个雏形的程序,验证了可行性,有很多要进一步完善的事情,这里先放上来第一版,以给自己满足感,并激励下一步的完善。

这篇博客只是简单阐述想法,不详述,还有一些很意思的事情想写,比如http中host头的问题,这里先搁置。

程序



import select
import socket
import struct
import logging
import re

from socketserver import StreamRequestHandler, ThreadingTCPServer

SOCKS_VERSION = 5
ipv6_address = '240e:978:90d:1000::19'

class SocksProxy(StreamRequestHandler):
    def handle(self):
        print('Accepting connection from {}'.format(self.client_address))
        # 协商
        # 从客户端读取并解包两个字节的数据
        header = self.connection.recv(2)
        version, nmethods = struct.unpack("!BB", header)
        # 设置socks5协议,METHODS字段的数目大于0
        assert version == SOCKS_VERSION
        assert nmethods > 0
        # 接受支持的方法
        methods = self.get_available_methods(nmethods)
        # 无需认证
        if 0 not in set(methods):
            self.server.close_request(self.request)
            return
        # 发送协商响应数据包
        self.connection.sendall(struct.pack("!BB", SOCKS_VERSION, 0))
        # 请求
        version, cmd, _, address_type = struct.unpack("!BBBB", self.connection.recv(4))
        assert version == SOCKS_VERSION
        if address_type == 1:  # IPv4
            address = socket.inet_ntoa(self.connection.recv(4))
        elif address_type == 3:  # Domain name
            domain_length = self.connection.recv(1)[0]
            address = self.connection.recv(domain_length)
            change_flag = 0
            #address = socket.gethostbyname(address.decode("UTF-8"))  # 将域名转化为IP,这一行可以去掉

            if self.match_domain(address.decode('utf-8')):
                print(f"{address} 匹配")
                address= ipv6_address
                change_flag = 1
                #address=socket.inet_ntoa( address)
                print(address)

        elif address_type == 4: # IPv6
            addr_ip = self.connection.recv(16)
            address = socket.inet_ntoa(socket.AF_INET6, addr_ip)
        else:
            self.server.close_request(self.request)
            return
        port = struct.unpack('!H', self.connection.recv(2))[0]
        # 响应,只支持CONNECT请求
        try:
            if cmd == 1:  # CONNECT
                if address_type == 1 or (address_type == 3 and change_flag == 0):
                    remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)#这个是ipv4
#                print("要连接"+address+ str(port))
                elif address_type == 4 or (address_type == 3 and change_flag == 1):
                    remote = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
                print((address, port))
                remote.connect((address, port))
                bind_address = remote.getsockname()
                print('Connected to {} {} '.format(address, port))
            else:
                self.server.close_request(self.request)
            #addr = struct.unpack("!I", socket.inet_aton(bind_address[0]))[0]
            port = bind_address[1]
            #reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, address_type, addr, port)
            # 注意:按照标准协议,返回的应该是对应的address_type,但是实际测试发现,当address_type=3,也就是说是域名类型时,会出现卡死情况,但是将address_type该为1,则不管是IP类型和域名类型都能正常运行
            reply = struct.pack("!BBBBIH", SOCKS_VERSION, 0, 0, 1, 0, port) #bind_address获得的是远程的地址和端口,用来socks5回去没啥用
        except Exception as err:
            logging.error(err)
            # 响应拒绝连接的错误
            reply = self.generate_failed_reply(address_type, 5)
        self.connection.sendall(reply)
        # 建立连接成功,开始交换数据
        if reply[1] == 0 and cmd == 1:
            self.exchange_loop(self.connection, remote)
        self.server.close_request(self.request)
    def get_available_methods(self, n):
        methods = []
        for i in range(n):
            methods.append(ord(self.connection.recv(1)))
        return methods
    def generate_failed_reply(self, address_type, error_number):
        return struct.pack("!BBBBIH", SOCKS_VERSION, error_number, 0, address_type, 0, 0)
    def exchange_loop(self, client, remote):
        while True:
            # 等待数据
            r, w, e = select.select([client, remote], [], [])
            if client in r:
                data = client.recv(4096)
                if remote.send(data) <= 0:
                    break
            if remote in r:
                data = remote.recv(4096)
                if client.send(data) <= 0:
                    break

    def match_domain(self,domain):
        pattern1 = r'.+\.bilivideo\.com' #r'\w+\.mcdn\.bilivideo\.cn$|\w+\.bilivideo\.com$'
        pattern2 = r'\w+\.mcdn\.bilivideo\.cn'
        match1 = re.match(pattern1, domain)
        match2 = re.match(pattern2, domain)
        if match1 or match2:
            return bool(1)
        else:
            return bool(0)

       # return bool(re.match(pattern1, domain))

   # def special_for_bilibili(self):

if __name__ == '__main__':
    # 使用socketserver库的多线程服务器ThreadingTCPServer启动代理
    with ThreadingTCPServer(('127.0.0.1', 1080), SocksProxy) as server:
        server.serve_forever()


验证

ipv6-bilibili-socks5-verify

结语

这是草稿,一时兴起所作,不完善。记录一点自己有趣的想法。

发表评论