common.py


#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2013-2015 clowwindy
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import, division, print_function, \
    with_statement

import socket
import struct
import logging
import hashlib
import hmac

ONETIMEAUTH_BYTES = 10
ONETIMEAUTH_CHUNK_BYTES = 12
ONETIMEAUTH_CHUNK_DATA_LEN = 2

def sha1_hmac(secret, data):
    return hmac.new(secret, data, hashlib.sha1).digest()

def onetimeauth_verify(_hash, data, key):
    return _hash == sha1_hmac(key, data)[:ONETIMEAUTH_BYTES]

def onetimeauth_gen(data, key):
    return sha1_hmac(key, data)[:ONETIMEAUTH_BYTES]

def compat_ord(s):
    if type(s) == int:
        return s
    return _ord(s)

def compat_chr(d):
    if bytes == str:
        return _chr(d)
    return bytes([d])

_ord = ord
_chr = chr
ord = compat_ord
chr = compat_chr

#首先,它检查当前 Python 解释器是否支持字节串(bytes)。在 Python 2 和 Python 3 中,字节串的类型和处理方式略有不同,因此这个函数首先进行了一些适配工作。
def to_bytes(s):
    if bytes != str:
        if type(s) == str:
            return s.encode('utf-8')
    return s

#正常就不相等 bytes和str
def to_str(s):
    if bytes != str:
        if type(s) == bytes:
            return s.decode('utf-8')
    return s

#二进制转字节串
def inet_ntop(family, ipstr):#返回类似于b'127.0.0.1'
    if family == socket.AF_INET:
        return to_bytes(socket.inet_ntoa(ipstr))#(bytes->str)->bytes
    elif family == socket.AF_INET6:
        import re
        v6addr = ':'.join(('%02X%02X' % (ord(i), ord(j))).lstrip('0')#lstrip是去掉开头的0 每个字节转为转为16进制数字
                          for i, j in zip(ipstr[::2], ipstr[1::2]))#zip返回迭代器,后面是每两个取出一个字节 从0和从1开始取
        v6addr = re.sub('::+', '::', v6addr, count=1)
        return to_bytes(v6addr)

def inet_pton(family, addr):#返回类似于b'\x7f\x00\x00\x01' input:str
    addr = to_str(addr)
    if family == socket.AF_INET:
        #返回二进制字节串
        return socket.inet_aton(addr)
    elif family == socket.AF_INET6:
        if '.' in addr:  # a v4 addr
            #最后一个:后面的地址 addr.rindex返回的是索引 127.0.0.1
            v4addr = addr[addr.rindex(':') + 1:]
            #返回一个包含了这个地址二进制表示的 4 字节的字节串 b'\x7f\x00\x00\x01' 字节串则是4个部分 可以逐步运行
            v4addr = socket.inet_aton(v4addr)
            #转为ascii 逐字节处理 返回类似于['7F', '00', '00', '01']
            v4addr = map(lambda x: ('%02X' % ord(x)), v4addr)
            #

            v4addr.insert(2, ':')
            newaddr = addr[:addr.rindex(':') + 1] + ''.join(v4addr)

            #处理一次没有嵌套得了
            return inet_pton(family, newaddr)
        dbyts = [0] * 8  # 8 groups
        #::1 结果是‘’ ‘’ ‘1’
        grps = addr.split(':')
        for i, v in enumerate(grps): #index, value
            if v:
                dbyts[i] = int(v, 16)
            else:
                for j, w in enumerate(grps[::-1]):#[None:None:-1]。这表示从序列的开头到末尾,以步长为 -1 的方式进行切片,实现反转序列的效果。
                    if w:
                        #int(w, 16) 是将十六进制字符串 w 转换为整数。这里的 16 表示使用十六进制解析 w 出来是整数了 双冒号的处理
                        dbyts[7 - j] = int(w, 16)
                    else:
                        break
                #退出了外面的循环
                break
        #用ascii编码存了地址 在字节串中 0也有 把缩写也打开了
        return b''.join((chr(i // 256) + chr(i % 256)) for i in dbyts)
    else:
        raise RuntimeError("What family?")

'''
这个 is_ip 函数用于检测一个给定的地址是否是有效的 IP 地址(IPv4 或 IPv6)。

它的工作原理是:

首先,它遍历了一个地址族的列表 (socket.AF_INET, socket.AF_INET6),分别表示 IPv4 和 IPv6 地址族。
对于每个地址族,它尝试使用 inet_pton() 函数将传入的地址解析为该地址族对应的格式。
如果能够成功解析,且不抛出 TypeError, ValueError, OSError, IOError 异常,则说明传入的地址是有效的 IPv4 或 IPv6 地址,函数会返回相应的地址族常量 (socket.AF_INET 或 socket.AF_INET6)。
如果无法成功解析,或者抛出了上述异常,就继续尝试下一个地址族。
如果对所有地址族的尝试都失败了,函数最终返回 False,表示传入的地址不是有效的 IPv4 或 IPv6 地址。
这个函数是一个通用的地址检查函数,用于判断传入的地址是否符合 IPv4 或 IPv6 地址的格式。
'''

#返回是v4还算v6             return socket.AF_INET6之类的

def is_ip(address):
    for family in (socket.AF_INET, socket.AF_INET6):
        try:
            if type(address) != str:
                address = address.decode('utf8')
            inet_pton(family, address)
            return family
        #不是ip地址则报异常 换一个 最后不是就是false
        except (TypeError, ValueError, OSError, IOError):
            pass
    return False

def patch_socket():#返回二进制字节串 ip地址的 用上面的函数做补丁
    if not hasattr(socket, 'inet_pton'):#str->bytes 字符串点分十进制 到 二进制标识
        socket.inet_pton = inet_pton

    if not hasattr(socket, 'inet_ntop'):#bytes->bytes 二进制表示到点分十进制
        socket.inet_ntop = inet_ntop

patch_socket()

ADDRTYPE_IPV4 = 0x01
ADDRTYPE_IPV6 = 0x04
ADDRTYPE_HOST = 0x03
ADDRTYPE_AUTH = 0x10
ADDRTYPE_MASK = 0xF

def pack_addr(address): #打包地址 如果错了 可能是主机 跳过 ipv4返回的是b'\x01和32位的串' ipv6是b'\x04和128位的串' 地址是b'\x03+长度+源地址'
    address_str = to_str(address)
    for family in (socket.AF_INET, socket.AF_INET6):
        try:
            r = socket.inet_pton(family, address_str)
            if family == socket.AF_INET6:
                return b'\x04' + r
            else:
                return b'\x01' + r
        except (TypeError, ValueError, OSError, IOError):
            pass
    if len(address) > 255:
        address = address[:255]  # TODO
    return b'\x03' + chr(len(address)) + address#chr返回字符

'''
        +----+-----+-------+------+----------+----------+
        |拿走 | 拿走 |  拿走  | ATYP | DST.ADDR | DST.PORT |
        +----+-----+-------+------+----------+----------+
        |VER | CMD |  RSV  | ATYP | DST.ADDR | DST.PORT |
        +----+-----+-------+------+----------+----------+
        | 1  |  1  | X'00' |  1   | Variable |    2     |
        +----+-----+-------+------+----------+----------+
'''
def parse_header(data):
    addrtype = ord(data[0])#返回Unicode 码点
    dest_addr = None
    dest_port = None
    header_length = 0
    if addrtype & ADDRTYPE_MASK == ADDRTYPE_IPV4:
        if len(data) >= 7:
            dest_addr = socket.inet_ntoa(data[1:5])#转为字符串

            dest_port = struct.unpack('>H', data[5:7])[0]#索引5/6的端口号
            header_length = 7
        else:
            logging.warn('header is too short')
    elif addrtype & ADDRTYPE_MASK == ADDRTYPE_HOST:
        if len(data) > 2:
            addrlen = ord(data[1])#取出地址长度
            if len(data) >= 4 + addrlen:
                dest_addr = data[2:2 + addrlen]
                dest_port = struct.unpack('>H', data[2 + addrlen:4 +
                                                     addrlen])[0]
                header_length = 4 + addrlen
            else:
                logging.warn('header is too short')
        else:
            logging.warn('header is too short')
    elif addrtype & ADDRTYPE_MASK == ADDRTYPE_IPV6:
        if len(data) >= 19:
            dest_addr = socket.inet_ntop(socket.AF_INET6, data[1:17])
            dest_port = struct.unpack('>H', data[17:19])[0]
            header_length = 19
        else:
            logging.warn('header is too short')
    else:
        logging.warn('unsupported addrtype %d, maybe wrong password or '
                     'encryption method' % addrtype)
    if dest_addr is None:
        return None
    return addrtype, to_bytes(dest_addr), dest_port, header_length

#shell.check_config会调用
class IPNetwork(object):#基类是object
    ADDRLENGTH = {socket.AF_INET: 32, socket.AF_INET6: 128, False: 0}

    def __init__(self, addrs): #shell.check_config中IPNetwork     IPNetwork(config.get('forbidden_ip', '127.0.0.0/8,::1/128')) 会调用 传入的参数地址 每一个以元组(网络号,后边0的个数)进入列表
        self._network_list_v4 = []
        self._network_list_v6 = []
        if type(addrs) == str:
            addrs = addrs.split(',')
        #list() 函数将迭代器转换为列表 每个列表成员都执行了函数
        list(map(self.add_network, addrs))

    def add_network(self, addr):#将二进制的网络字节序的加入列表
        if addr is "":
            return
        block = addr.split('/')
        #返回的是那一族ip socket.AF_INET或者socket.AF_INET6
        addr_family = is_ip(block[0])
        addr_len = IPNetwork.ADDRLENGTH[addr_family]
        if addr_family is socket.AF_INET:#inet_aton 字节串 ;! 表示使用网络字节序(big-endian),即最高有效位在前的字节序 实际没有点分十进制 例如127.0.0.1 二进制转换2130706433 保存为无符号整数
            # 小端序解析出来0001000000000000000001111111 是16777343 最后一个字节000000001是高位(前面的位置) struct.pack 解析的顺序 只是解析字节串的顺序
            ip, = struct.unpack("!I", socket.inet_aton(block[0]))
        elif addr_family is socket.AF_INET6:
            hi, lo = struct.unpack("!QQ", inet_pton(addr_family, block[0])) #拆分为两个 64 位无符号整数。 高位high 低位low  Q 表示无符号长长整型(64 位无符号整数)  I 表示无符号整型(32 位无符号整数
            ip = (hi >= 1 #右移一位
                prefix_size += 1
            logging.warn("You did't specify CIDR routing prefix size for %s, "
                         "implicit treated as %s/%d" % (addr, addr, addr_len))
        elif block[1].isdigit() and int(block[1]) >= prefix_size
        else:
            raise Exception("Not a valid CIDR notation: %s" % addr) #异常推出了
        if addr_family is socket.AF_INET:
            self._network_list_v4.append((ip, prefix_size)) #列表的append 添加元组(网络号和后面的0的后缀数(注意不是前缀))
            #存入的缩短了的ip 去掉了主机号的 类似于0.0.0.01000000 存的是 (0.0.0.01,1)
        else:
            self._network_list_v6.append((ip, prefix_size))

#__contains__ 方法允许自定义类 MyClass 实现了包含性检查。当使用 in 关键字检查某个元素是否存在于 MyClass 实例时,实际上会调用 __contains__ 方法来执行检查操作。
    def __contains__(self, addr):#检查地址在IPNetwork._network_list_v4列表中吗
        addr_family = is_ip(addr)
        if addr_family is socket.AF_INET:
            ip, = struct.unpack("!I", socket.inet_aton(addr))
            return any(map(lambda n_ps: n_ps[0] == ip >> n_ps[1],#主要查看ip在cidr段中不,否则不用这么麻烦
                           self._network_list_v4)) #对self._network_list_v4每一个操作,n_ps[0] == (ip >> n_ps[1]):将ip右移列表中各个掩码位,看是否相等,map返回迭代器,any查看使得否有true相等的。
        elif addr_family is socket.AF_INET6:
            hi, lo = struct.unpack("!QQ", inet_pton(addr_family, addr))#Q 表示一个无符号长长整型数 64位 ipv6 128位正好两个
            ip = (hi > n_ps[1],
                           self._network_list_v6))
        else:
            return False

def test_inet_conv():
    ipv4 = b'8.8.4.4'
    b = inet_pton(socket.AF_INET, ipv4)
    assert inet_ntop(socket.AF_INET, b) == ipv4
    ipv6 = b'2404:6800:4005:805::1011'
    b = inet_pton(socket.AF_INET6, ipv6)
    assert inet_ntop(socket.AF_INET6, b) == ipv6

def test_parse_header():
    assert parse_header(b'\x03\x0ewww.google.com\x00\x50') == \
        (3, b'www.google.com', 80, 18)
    assert parse_header(b'\x01\x08\x08\x08\x08\x00\x35') == \
        (1, b'8.8.8.8', 53, 7)
    assert parse_header((b'\x04$\x04h\x00@\x05\x08\x05\x00\x00\x00\x00\x00'
                         b'\x00\x10\x11\x00\x50')) == \
        (4, b'2404:6800:4005:805::1011', 80, 19)

def test_pack_header():
    assert pack_addr(b'8.8.8.8') == b'\x01\x08\x08\x08\x08'
    assert pack_addr(b'2404:6800:4005:805::1011') == \
        b'\x04$\x04h\x00@\x05\x08\x05\x00\x00\x00\x00\x00\x00\x10\x11'
    assert pack_addr(b'www.google.com') == b'\x03\x0ewww.google.com'

def test_ip_network():
    ip_network = IPNetwork('127.0.0.0/24,::ff:1/112,::1,192.168.1.1,192.0.2.0')
    assert '127.0.0.1' in ip_network
    assert '127.0.1.1' not in ip_network
    assert ':ff:ffff' in ip_network
    assert '::ffff:1' not in ip_network
    assert '::1' in ip_network
    assert '::2' not in ip_network
    assert '192.168.1.1' in ip_network
    assert '192.168.1.2' not in ip_network
    assert '192.0.2.1' in ip_network
    assert '192.0.3.1' in ip_network  # 192.0.2.0 is treated as 192.0.2.0/23
    assert 'www.google.com' not in ip_network

if __name__ == '__main__':
    test_inet_conv()
    test_parse_header()
    test_pack_header()
    test_ip_network()


发表评论