Skip to content

指定分配网段内任意 IPv6 作为出口 IP

谁不想要 2^64 个 IP 的代理池 ? - zu1k

基于ip6tables构建随机出口 - Type Boom

利用 IPV6 绕过B站的反爬 | yllhwa's blog

创建一个自己的 IPv6 代理池 (ndppd + openresty) - 企鹅大大的博客

IPv6地址分配统计 - 运营商·运营人 - 通信人家园 - Powered by C114

ipv6攻击视角 - r0fus0d 的博客

查看 ipv6 地址

sh
ip -6 addr show scope global

输出形如:

sh
2: eno1: <BROADCAST,MULTICAST,ALLMULTI,UP,LOWER_UP> mtu 1500 state UP qlen 1000
    inet6 240?:????:????:????:abcd:1234:5678:90ab/64 scope global temporary dynamic
       valid_lft 258952sec preferred_lft 85972sec
    inet6 240?:????:????:????:1234:5678:abcd:1124/64 scope global temporary deprecated dynamic
       valid_lft 258952sec preferred_lft 0sec
    inet6 240?:????:????:????:7890:3456:abcd:0101/64 scope global dynamic mngtmpaddr noprefixroute
       valid_lft 258952sec preferred_lft 172552sec

可以看到,240?:????:????:????::/64 是分配到的 ipv6 网段。

这一步有可能得不到 240* 开头的公网 ipv6 地址,一般是因为服务器不是直连光猫,而是通过路由器连接光猫。

这时需要登录路由器管理界面,把 ipv6 的上网模式从 自动获取IP地址 改为 桥模式。然后耐心等待几分钟,重新执行上面的命令,直到看到 240* 开头的 ipv6 地址。

可以进一步过滤出当前的公网 ipv6 地址:

sh
ip -6 addr show scope global | grep -E "2409.*" | grep -E "mngtmpaddr" | grep -v "deprecated"
sh
    inet6 240?:????:????:????:7890:3456:abcd:0101/64 scope global dynamic mngtmpaddr noprefixroute

输出形如:

三大运营商的 ipv6 地址开头:

  • 中国联通:2408
  • 中国移动:2409
  • 中国电信:240e

添加路由

将上面的获得的 ipv6 网段添加到路由表中:

sh
sudo ip route add local 240?:????:????:????::/64 dev eno1
  • 240?:????:????:????::/64 替换为实际的 ipv6 网段
  • eno1 替换为实际的网卡名称
  • 这里的 devdevice 的缩写,表示指定网络接口设备

查看路由

sh
ip -6 route show

形如:

sh
::1 dev lo proto kernel metric 256 pref medium
240?:????:????:???::/64 dev eno1 proto ra metric 100 pref medium
240?:????:????:???::/60 via fe80::1 dev eno1 proto ra metric 100 pref high
fe80::/64 dev ????? proto kernel metric 256 pref medium
fe80::/64 dev eno1 proto kernel metric 1024 pref medium
default via fe80::1 dev eno1 proto ra metric 100 pref medium

如果只想显示公网地址,可以过滤掉包含 fe80::lo 的地址:

sh
ip -6 route show | grep -v 'fe80::' | grep -v 'lo'

输出形如:

sh
240?:????:????:???::/64 dev eno1 proto ra metric 100 pref medium

修改 sysctl.conf

查看当前内核参数:

sh
sudo sysctl -a | grep -E "(proxy_ndp|forwarding|nonlocal_bind)" | grep ipv6

修改内核参数文件:

sh
sudo nano /etc/sysctl.conf

在文件末尾添加内容并保存:

sh
net.ipv6.conf.all.proxy_ndp=1
net.ipv6.conf.all.forwarding=1
net.ipv6.conf.default.proxy_ndp=1
net.ipv6.conf.default.forwarding=1
net.ipv6.conf.eno1.proxy_ndp=1
net.ipv6.conf.eno1.forwarding=1
net.ipv6.ip_nonlocal_bind=1
  • eno1 替换为实际的网卡名称

使配置生效:

sh
sudo sysctl -p

安装 ndppd

sh
sudo apt install ndppd

配置 ndppd

sh
sudo nano /etc/ndppd.conf

添加内容:

lua
route-ttl 30000
proxy eno1 {
    router no
    timeout 500
    ttl 30000
    rule 240?:????:????:???::/64 {
        static
    }
}
  • eno1 替换为实际的网卡名称
  • 240?:????:????:????::/64 替换为实际的 ipv6 网段

启动 ndppd

sh
sudo systemctl start ndppd

设置开机自启:

sh
sudo systemctl enable ndppd

测试出口地址

随机选择一个同网段下的 ipv6 地址,测试出口 IP:

sh
curl --int 240?:????:????:????:abcd:9876:5678:0123 http://ifconfig.me/ip
  • --int--interface 的缩写,用于指定出口 IP

如果之前的步骤都正确,输出的 ipv6 地址应该和 --int 指定的相同,形如:

sh
240?:????:????:????:abcd:9876:5678:0123

网段变更

如果光猫重启或者断电,可能会导致 ipv6 网段变更,需要重新添加路由:

sh
sudo ip route add local 240x:xxxx:xxxx:xxxx::/64 dev eno1
  • 这里的 240x:xxxx:xxxx:xxxx::/64 是新的 ipv6 网段

重新配置 ndppd:

sh
sudo nano /etc/ndppd.conf

rule 240?:????:????:????::/64 替换为新的 ipv6 网段:

lua
route-ttl 30000
proxy eno1 {
    router no
    timeout 500
    ttl 30000
    rule 240x:xxxx:xxxx:xxxx::/64 {
        static
    }
}

重启 ndppd:

sh
sudo systemctl restart ndppd

一键自动配置

该脚本自动完成如下步骤:

  • 查看本机最新 ipv6 前缀
  • 添加路由
  • 修改 ndppd 配置,重启 ndppd

有两个步骤不包括在内(因为只需要操作一次):

  • 启用 ip_nonlocal_bind
  • 安装 ndppd
ip_router.py
py
import netifaces
import random
import re
import requests
import requests.packages.urllib3.util.connection as urllib3_cn
import socket
import time

from pathlib import Path
from requests.adapters import HTTPAdapter
from tclogger import logger, logstr, decolored, shell_cmd
from typing import Union

REQUESTS_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0 Safari/537.36"
}


class IPv6Adapter(HTTPAdapter):
    def __init__(self, source_address, *args, **kwargs):
        self.source_address = source_address
        super().__init__(*args, **kwargs)

    def init_poolmanager(self, *args, **kwargs):
        kwargs["source_address"] = self.source_address
        return super().init_poolmanager(*args, **kwargs)


class RequestsSessionIPv6Adapter:
    @staticmethod
    def force_ipv4():
        urllib3_cn.allowed_gai_family = lambda: socket.AF_INET

    @staticmethod
    def force_ipv6():
        if urllib3_cn.HAS_IPV6:
            urllib3_cn.allowed_gai_family = lambda: socket.AF_INET6

    def adapt(self, session: requests.Session, ip: str):
        try:
            socket.inet_pton(socket.AF_INET6, ip)
        except Exception as e:
            raise ValueError(f"× Invalid IPv6 format: [{ip}]")

        adapter = IPv6Adapter((ip, 0))
        session.mount("http://", adapter)
        session.mount("https://", adapter)

        return session


class IPv6Generator:
    def __init__(self, verbose: bool = False):
        self.verbose = verbose
        self.interfaces = []
        # self.get_prefix()

    def get_addr_prefix(self, addr: str, netmask: str):
        prefix_length = netmask.count("f")
        prefix = addr[: prefix_length // 4 * 5 - 1]
        return prefix, prefix_length * 4

    def get_network_interfaces(self):
        interfaces = netifaces.interfaces()
        for interface in interfaces:
            addresses = netifaces.ifaddresses(interface)
            if netifaces.AF_INET6 not in addresses:
                continue
            for addr_info in addresses[netifaces.AF_INET6]:
                if not addr_info["addr"].startswith("2"):
                    break
                addr = addr_info["addr"]
                netmask = addr_info.get("netmask") or addr_info.get("mask")
                prefix, prefix_bits = self.get_addr_prefix(addr, netmask)
                self.interfaces.append(
                    {
                        "interface": interface,
                        "addr": addr,
                        "netmask": netmask,
                        "prefix": prefix,
                        "prefix_bits": prefix_bits,
                    }
                )

    def get_prefix(self, return_netint: bool = False):
        logger.note("> Get ipv6 prefix:")
        self.get_network_interfaces()
        interface = self.interfaces[0]
        prefix = interface["prefix"].strip(":")
        prefix_bits = interface["prefix_bits"]
        netint = interface["interface"]
        if self.verbose:
            logger.note(f"> IPv6 prefix:", end=" ")
            logger.success(f"[{prefix}]", end=" ")
            logger.mesg(f"(/{prefix_bits})")
        self.netint = netint
        self.prefix = prefix
        self.prefix_bits = prefix_bits
        logger.file(f"  * prefix: {logstr.okay(prefix)}")
        logger.file(f"  * netint: {logstr.okay(netint)}")
        if return_netint:
            return self.prefix, netint
        else:
            return self.prefix

    def generate(
        self, prefix: str = None, return_segs: bool = False
    ) -> Union[str, tuple[str, list[str], list[str]]]:
        prefix = prefix or self.prefix
        prefix_segs = prefix.split(":")
        suffix_seg_count = 8 - len(prefix_segs)
        suffix_segs = [f"{random.randint(0, 65535):x}" for _ in range(suffix_seg_count)]
        addr = ":".join(prefix_segs + suffix_segs)
        if return_segs:
            return addr, prefix_segs, suffix_segs
        else:
            return addr


class IPv6RouteModifier:
    def __init__(self, prefix: str, netint: str, ndppd_conf: Union[Path, str] = None):
        self.ndppd_conf = ndppd_conf or Path("/etc/ndppd.conf")
        self.prefix = prefix
        self.netint = netint

    def is_ndppd_conf_latest(self):
        logger.note("> Check proxy (netint) and rule (prefix) in ndppd.conf:")
        if not self.ndppd_conf.exists():
            logger.mesg(f"ndppd.conf does not exist: {self.ndppd_conf}")
            return False

        with open(self.ndppd_conf, "r") as rf:
            lines = rf.readlines()

        is_netint_found = False
        netint_pattern = re.compile(rf"proxy\s+{self.netint}")
        for line in lines:
            if netint_pattern.search(line):
                netint_str = logstr.file(self.netint)
                logger.mesg(f"  + Found proxy (netint): {netint_str}")
                is_netint_found = True
                break
        if not is_netint_found:
            logger.mesg(f"  - Not found proxy (netint): {netint_str}")
            return False

        is_prefix_found = False
        prefix_pattern = re.compile(rf"rule\s+{self.prefix}::/64")
        for line in lines:
            if prefix_pattern.search(line):
                prefix_str = logstr.file(f"{self.prefix}::/64")
                logger.mesg(f"  + Found rule (prefix/): {prefix_str}")
                is_prefix_found = True
                break
        if not is_prefix_found:
            logger.mesg(f"  - Not found rule (prefix/): {prefix_str}")
            return False

        return True

    def add_route(self):
        logger.note("> Add IP route:")
        cmd = f"sudo ip route add local {self.prefix}::/64 dev {self.netint}"
        # logger.mesg(cmd)
        shell_cmd(cmd)

    def del_route(self):
        logger.note("> Delete IP route:")
        cmd = f"sudo ip route del local {self.prefix}::/64 dev {self.netint}"
        # logger.mesg(cmd)
        shell_cmd(cmd)

    def modify_ndppd_conf(self, overwrite: bool = False):
        if self.ndppd_conf.exists():
            with open(self.ndppd_conf, "r") as rf:
                old_ndppd_conf_str = rf.read()
            logger.note(f"> Read: {logstr.file(self.ndppd_conf)}")
            logger.mesg(f"{old_ndppd_conf_str}")

        if not self.ndppd_conf.exists() or overwrite:
            new_ndppd_conf_str = (
                f"route-ttl 30000\n"
                f"proxy {logstr.success(self.netint)} {{\n"
                f"    router no\n"
                f"    timeout 500\n"
                f"    ttl 30000\n"
                f"    rule {logstr.success(self.prefix)}::/64 {{\n"
                f"        static\n"
                f"    }}\n"
                f"}}\n"
            )
            logger.note(f"> Write: {logstr.file(self.ndppd_conf)}")
            logger.mesg(f"{new_ndppd_conf_str}")
            with open(self.ndppd_conf, "w") as wf:
                wf.write(decolored(new_ndppd_conf_str))
            logger.success(f"✓ Modified: {logstr.file(self.ndppd_conf)}")

    def restart_ndppd(self):
        logger.note("> Restart ndppd:")
        cmd = "sudo systemctl restart ndppd"
        # logger.mesg(cmd)
        shell_cmd(cmd)
        logger.success(f"✓ Restarted: {logstr.file('ndppd')}")

    def wait_ndppd_work(self, wait_seconds: int = 5):
        logger.note(f"> Waiting {wait_seconds} seconds for ndppd to work ...")
        time.sleep(wait_seconds)


def main():
    generator = IPv6Generator()
    prefix, netint = generator.get_prefix(return_netint=True)
    modifier = IPv6RouteModifier(prefix=prefix, netint=netint)
    modifier.add_route()

    if modifier.is_ndppd_conf_latest():
        logger.success(f"✓ ndppd.conf is up-to-date, skip restart.")
    else:
        modifier.modify_ndppd_conf(overwrite=True)
        modifier.restart_ndppd()
        modifier.wait_ndppd_work(wait_seconds=5)

    logger.note("> Testing ipv6 addrs:")
    session = requests.Session()
    adapter = RequestsSessionIPv6Adapter()
    for i in range(5):
        ipv6, prefix_segs, suffix_segs = generator.generate(return_segs=True)
        prefix = ":".join(prefix_segs)
        suffix = ":".join(suffix_segs)
        logger.note(f"  > [{prefix}:{logstr.file(suffix)}]")
        adapter.adapt(session, ipv6)
        response = session.get("https://test.ipw.cn", headers=REQUESTS_HEADERS)
        resp_ipv6_str = response.text.split("\n")[0].strip()
        resp_ipv6_str = resp_ipv6_str.replace(":0:", "::")
        logger.mesg(f"  * [{resp_ipv6_str}]")


if __name__ == "__main__":
    main()

    # sudo is needed to modify ndppd.conf

    # Case1: Run directly, need to type sudo password
    # sudo env "PATH=$PATH" python -m networks.ipv6.router

    # Case2: Run with piped password
    # echo $SUDOPASS | sudo -S env "PATH=$PATH" python -m networks.ipv6.router

运行:

sh
sudo env "PATH=$PATH" python ip_router.py

生成脚本

ip_generator.py
py
import json
import netifaces
import random
import requests
import requests.packages.urllib3.util.connection as urllib3_cn
import socket
import threading

from pathlib import Path
from requests.adapters import HTTPAdapter
from tclogger import logger, logstr, get_now_str
from typing import Union

CACHE_ROOT = Path(__file__).parent

REQUESTS_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
}


class IPv6Adapter(HTTPAdapter):
    def __init__(self, source_address, *args, **kwargs):
        self.source_address = source_address
        super().__init__(*args, **kwargs)

    def init_poolmanager(self, *args, **kwargs):
        kwargs["source_address"] = self.source_address
        return super().init_poolmanager(*args, **kwargs)


class RequestsSessionIPv6Adapter:
    @staticmethod
    def force_ipv4():
        urllib3_cn.allowed_gai_family = lambda: socket.AF_INET

    @staticmethod
    def force_ipv6():
        if urllib3_cn.HAS_IPV6:
            urllib3_cn.allowed_gai_family = lambda: socket.AF_INET6

    def adapt(self, session: requests.Session, ip: str):
        try:
            socket.inet_pton(socket.AF_INET6, ip)
        except Exception as e:
            raise ValueError(f"× Invalid IPv6 format: [{ip}]")

        adapter = IPv6Adapter((ip, 0))
        session.mount("http://", adapter)
        session.mount("https://", adapter)

        return session


class IPv6Prefixer:
    def __init__(self, verbose: bool = False):
        self.verbose = verbose
        self.interfaces = []
        self.init_prefix()

    def get_addr_prefix(self, addr: str, netmask: str):
        prefix_length = netmask.count("f")
        prefix = addr[: prefix_length // 4 * 5 - 1]
        return prefix, prefix_length * 4

    def get_network_interfaces(self):
        interfaces = netifaces.interfaces()
        for interface in interfaces:
            addresses = netifaces.ifaddresses(interface)
            if netifaces.AF_INET6 not in addresses:
                continue
            for addr_info in addresses[netifaces.AF_INET6]:
                if not addr_info["addr"].startswith("2"):
                    break
                addr = addr_info["addr"]
                netmask = addr_info.get("netmask") or addr_info.get("mask")
                prefix, prefix_bits = self.get_addr_prefix(addr, netmask)
                self.interfaces.append(
                    {
                        "interface": interface,
                        "addr": addr,
                        "netmask": netmask,
                        "prefix": prefix,
                        "prefix_bits": prefix_bits,
                    }
                )

    def init_prefix(self) -> str:
        self.get_network_interfaces()
        interface = self.interfaces[0]
        prefix = interface["prefix"].strip(":")
        prefix_bits = interface["prefix_bits"]
        if self.verbose:
            logger.note(f"> IPv6 prefix:", end=" ")
            logger.okay(f"[{prefix}]", end=" ")
            logger.mesg(f"(/{prefix_bits})")
        self.prefix = prefix
        self.prefix_bits = prefix_bits
        return self.prefix


class IPv6Pool:
    def __init__(self, verbose: bool = False):
        self.lock = threading.Lock()
        self.using_addrs = set()
        self.verbose = verbose

    def push_addr_to_using(self, addr: str):
        with self.lock:
            self.using_addrs.add(addr)

    def pop_addr_from_using(self, addr: str):
        with self.lock:
            self.using_addrs.discard(addr)

    def is_addr_using(self, addr: str) -> bool:
        with self.lock:
            return addr in self.using_addrs


class IPv6Cacher:
    def __init__(self, cache_name: str = None, verbose: bool = False):
        self.cache_name = cache_name
        self.verbose = verbose
        self.lock = threading.Lock()
        self.init_paths()

    def init_paths(self):
        cache_name = self.cache_name or "ipv6_addrs_cache"
        self.cache_path = (CACHE_ROOT / cache_name).with_suffix(".json")

    def is_cache_exists(self) -> bool:
        return self.cache_path.exists()

    def read_cache(self) -> list[dict]:
        with open(self.cache_path, "r", encoding="utf-8") as rf:
            cache_data: list[dict] = json.load(rf)
        return cache_data

    def write_cache(self, cache_data: list[dict]):
        with open(self.cache_path, "w", encoding="utf-8") as wf:
            json.dump(cache_data, wf, ensure_ascii=False, indent=4)

    def push_addr_to_cache(self, addr: str):
        with self.lock:
            logger.mesg(f"  + Push addr to cache: [{addr}]", verbose=self.verbose)
            if not self.is_cache_exists():
                self.cache_path.parent.mkdir(parents=True, exist_ok=True)
                cache_data: list[dict] = []
            else:
                cache_data: list[dict] = self.read_cache()
            addr_item = {
                "addr": addr,
                "cache_at": get_now_str(),
            }
            cache_data.append(addr_item)
            self.write_cache(cache_data)

    def pop_addr_from_cache(self, addr: str):
        with self.lock:
            logger.warn(f"  - Pop addr from cache: [{addr}]", verbose=self.verbose)
            if not self.is_cache_exists() or not addr:
                return
            cache_data: list[dict] = self.read_cache()
            cache_data = [item for item in cache_data if item.get("addr") != addr]
            self.write_cache(cache_data)

    def addr_to_segs(self, prefix: str, addr: str):
        addr_segs = addr.split(":")
        prefix_segs = prefix.split(":")
        suffix_segs = addr_segs[len(prefix_segs) :]
        return addr_segs, prefix_segs, suffix_segs

    def get_cache_addr(
        self, prefix: str, return_segs: bool = False, pool: IPv6Pool = None
    ) -> Union[str, tuple[str, list[str], list[str]]]:
        if not self.is_cache_exists():
            return None

        cache_data = self.read_cache()

        # filter using_addrs in pool from cache_data
        if pool and pool.using_addrs:
            filtered_cache_data = [
                item
                for item in cache_data
                if not pool.is_addr_using(item.get("addr", ""))
            ]
            if not filtered_cache_data:
                return None
        else:
            filtered_cache_data = cache_data

        # pick a random valid addr
        addr: str = None
        random.shuffle(filtered_cache_data)
        for item in filtered_cache_data:
            addr = item.get("addr", "")
            if addr.startswith(prefix):
                break

        if not addr:
            return None

        if pool:
            pool.push_addr_to_using(addr)

        if return_segs:
            _, prefix_segs, suffix_segs = self.addr_to_segs(prefix, addr)
            return addr, prefix_segs, suffix_segs
        else:
            return addr


class IPv6Checker:
    def __init__(self, timeout: float = 10, verbose: bool = False):
        self.adapter = RequestsSessionIPv6Adapter()
        self.session = requests.Session()
        self.timeout = timeout
        self.verbose = verbose

    def check(self, addr: str) -> bool:
        self.adapter.adapt(self.session, addr)
        response = self.session.get(
            "https://test.ipw.cn", headers=REQUESTS_HEADERS, timeout=self.timeout
        )
        addr_hash = addr.replace(":", "")
        resp_text = response.text.strip()
        resp_hash = resp_text.replace(":", "")
        is_success = addr_hash == resp_hash
        if self.verbose:
            if is_success:
                logger.okay(f"  ✓ [{resp_text}]")
            else:
                logger.warn(f"  x [{resp_text}]")
        return is_success


class IPv6Randomizer:
    def __init__(
        self,
        cacher: IPv6Cacher = None,
        checker: IPv6Checker = None,
        verbose: bool = False,
    ):
        self.cacher = cacher
        self.checker = checker
        self.verbose = verbose

    def get_random_addr_segs(self, prefix: str) -> tuple[str, list[str], list[str]]:
        prefix_segs = prefix.split(":")
        suffix_seg_count = 8 - len(prefix_segs)
        suffix_segs = [f"{random.randint(1, 65535):x}" for _ in range(suffix_seg_count)]
        addr = ":".join(prefix_segs + suffix_segs)
        return addr, prefix_segs, suffix_segs

    def get_random_addr(
        self,
        prefix: str,
        return_segs: bool = False,
        is_check: bool = True,
        max_retries: int = 5,
        is_cache_addr: bool = True,
        pool: IPv6Pool = None,
    ) -> Union[str, tuple[str, list[str], list[str]]]:
        if is_check and self.checker:
            retry_idx = 0
            is_valid = False
            while retry_idx < max_retries and not is_valid:
                addr, prefix_segs, suffix_segs = self.get_random_addr_segs(prefix)
                is_valid = self.checker.check(addr)
                if is_valid:
                    break
                retry_idx += 1
        else:
            addr, prefix_segs, suffix_segs = self.get_random_addr_segs(prefix)
            is_valid = True

        if not is_valid:
            logger.warn(f"  x [{addr}]")
            return None
        else:
            if is_cache_addr and self.cacher:
                self.cacher.push_addr_to_cache(addr)
            if pool:
                pool.push_addr_to_using(addr)
            if return_segs:
                return addr, prefix_segs, suffix_segs
            else:
                return addr


class IPv6Generator:
    def __init__(self, cache_name: str = None, verbose: bool = False):
        self.prefixer = IPv6Prefixer(verbose=verbose)
        self.pool = IPv6Pool(verbose=verbose)
        self.cacher = IPv6Cacher(cache_name=cache_name, verbose=verbose)
        self.checker = IPv6Checker(verbose=verbose)
        self.randomizer = IPv6Randomizer(
            cacher=self.cacher, checker=self.checker, verbose=verbose
        )
        self.lock = threading.Lock()
        self.verbose = verbose

    def generate(
        self,
        return_segs: bool = False,
        is_use_cache: bool = True,
        is_cache_addr: bool = True,
    ) -> Union[str, tuple[str, list[str], list[str]]]:
        with self.lock:
            prefix = self.prefixer.prefix
            if is_use_cache:
                cache_res = self.cacher.get_cache_addr(
                    prefix=prefix, return_segs=return_segs, pool=self.pool
                )
                if cache_res:
                    return cache_res

            random_res = self.randomizer.get_random_addr(
                prefix=prefix,
                return_segs=return_segs,
                is_cache_addr=is_cache_addr,
                pool=self.pool,
            )
            return random_res


def test_ipv6_generator():
    generator = IPv6Generator(cache_name="ipv6_addrs")
    checker = IPv6Checker(verbose=True)
    for i in range(5):
        ipv6, prefix_segs, suffix_segs = generator.generate(
            return_segs=True, is_use_cache=False, is_cache_addr=True
        )
        prefix = ":".join(prefix_segs)
        suffix = ":".join(suffix_segs)
        suffix_str = logstr.file(suffix)
        logger.note(f"  > [{prefix}:{suffix_str}]")
        checker.check(ipv6)

    logger.note(f"> using_addrs:")
    for addr in generator.pool.using_addrs:
        logger.file(f"  * [{addr}]")


def test_ipv6_generator_parallel():
    from concurrent.futures import ThreadPoolExecutor, as_completed

    generator = IPv6Generator(cache_name="ipv6_addrs", verbose=True)
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(
                generator.generate,
                return_segs=True,
                is_use_cache=False,
                is_cache_addr=True,
            )
            for _ in range(10)
        ]
        for future in as_completed(futures):
            addr, prefix_segs, suffix_segs = future.result()
            prefix = ":".join(prefix_segs)
            suffix = ":".join(suffix_segs)
            suffix_str = logstr.file(suffix)
            logger.note(f"  > [{prefix}:{suffix_str}]")


if __name__ == "__main__":
    # test_ipv6_generator()
    test_ipv6_generator_parallel()

    # python -m networks.ipv6.generator

运行:

sh
python ip_generator.py

测试脚本

ip_tester.py
py
import netifaces
import random
import requests
import requests.packages.urllib3.util.connection as urllib3_cn
import socket

from tclogger import logger
from requests.adapters import HTTPAdapter


class IPv6Extractor:
    def __init__(self):
        self.interfaces = []

    def extract_prefix(self, addr: str, netmask: str):
        prefix_length = netmask.count("f")
        prefix = addr[: prefix_length // 4 * 5 - 1]
        return prefix, prefix_length * 4

    def get_network_interfaces(self):
        interfaces = netifaces.interfaces()
        for interface in interfaces:
            addresses = netifaces.ifaddresses(interface)
            if netifaces.AF_INET6 not in addresses:
                continue
            for addr_info in addresses[netifaces.AF_INET6]:
                if not addr_info["addr"].startswith("2"):
                    break
                addr = addr_info["addr"]
                netmask = addr_info["netmask"]
                prefix, prefix_bits = self.extract_prefix(addr, netmask)
                self.interfaces.append(
                    {
                        "interface": interface,
                        "addr": addr,
                        "netmask": netmask,
                        "prefix": prefix,
                        "prefix_bits": prefix_bits,
                    }
                )

    def get_prefix(self):
        self.get_network_interfaces()
        interface = self.interfaces[0]
        prefix = interface["prefix"]
        prefix_bits = interface["prefix_bits"]
        logger.note(f"> IPv6 prefix:", end=" ")
        logger.success(f"[{prefix}]", end=" ")
        logger.mesg(f"(/{prefix_bits})")
        return prefix

    def random_ipv6(self, prefix: str = None) -> str:
        if prefix is None:
            prefix = self.get_prefix()
        prefix_segs = prefix.split(":")
        suffix_seg_count = 8 - len(prefix_segs)
        suffix_seg = [f"{random.randint(0, 65535):x}" for _ in range(suffix_seg_count)]
        addr = ":".join(prefix_segs + suffix_seg)
        return addr


class IPv6Adapter(HTTPAdapter):
    def __init__(self, source_address, *args, **kwargs):
        self.source_address = source_address
        super().__init__(*args, **kwargs)

    def init_poolmanager(self, *args, **kwargs):
        kwargs["source_address"] = self.source_address
        return super().init_poolmanager(*args, **kwargs)


class IPTester:
    def __init__(self):
        self.url = "http://ifconfig.me/ip"

    def force_ipv4(self):
        urllib3_cn.allowed_gai_family = lambda: socket.AF_INET

    def force_ipv6(self):
        if urllib3_cn.HAS_IPV6:
            urllib3_cn.allowed_gai_family = lambda: socket.AF_INET6

    def check_ip_addr(self, ip: str = None):
        if not ip:
            return 4

        try:
            socket.inet_pton(socket.AF_INET6, ip)
            return 6
        except Exception as e:
            logger.warn(f"× Invalid ip string: [{ip}]")
            return None

    def set_session_adapter(self, session: requests.Session, ip: str = None):
        ip_version = self.check_ip_addr(ip)
        if ip_version == 4:
            self.force_ipv4()
        elif ip_version == 6:
            self.force_ipv6()
            adapter = IPv6Adapter((ip, 0))
            session.mount("http://", adapter)
            session.mount("https://", adapter)
        else:
            pass

    def get(self, ip: str = None):
        session = requests.Session()
        self.set_session_adapter(session, ip)
        logger.note(f"  > Set:", end=" ")
        if not ip:
            logger.line(f"[ipv4]")
        else:
            logger.line(f"[{ip}]")
        try:
            resp = session.get(self.url, timeout=5)
            if resp and resp.status_code == 200:
                logger.file(f"  * Get:", end=" ")
                logger.success(f"[{resp.text.strip()}]")
        except Exception as e:
            logger.error(f"× Error: {e}")


if __name__ == "__main__":
    extractor = IPv6Extractor()
    prefix = extractor.get_prefix()
    random_ipv6_addrs = [extractor.random_ipv6(prefix) for _ in range(5)]

    ip_tester = IPTester()
    ip_tester.get()
    for ip in random_ipv6_addrs:
        ip_tester.get(ip)

运行:

sh
# pip install netifaces requests tclogger
python ip_tester.py

输出形如:

sh
> IPv6 prefix: [240?:????:????:????] (/64)
  > Set: [ipv4]
  * Get: [???.???.???.???]
  > Set: [240?:????:????:????:f618:ad3a:fd1a:f0d0]
  * Get: [240?:????:????:????:f618:ad3a:fd1a:f0d0]
  > Set: [240?:????:????:????:410b:9770:6504:de53]
  * Get: [240?:????:????:????:410b:9770:6504:de53]
  > Set: [240?:????:????:????:b05b:87b2:26b4:15f3]
  * Get: [240?:????:????:????:b05b:87b2:26b4:15f3]
  > Set: [240?:????:????:????:d5bc:58c9:dd74:b45]
  * Get: [240?:????:????:????:d5bc:58c9:dd74:b45]