指定分配网段内任意 IPv6 作为出口 IP
谁不想要 2^64 个 IP 的代理池 ? - zu1k
基于ip6tables构建随机出口 - Type Boom
利用 IPV6 绕过B站的反爬 | yllhwa's blog
创建一个自己的 IPv6 代理池 (ndppd + openresty) - 企鹅大大的博客
IPv6地址分配统计 - 运营商·运营人 - 通信人家园 - Powered by C114
ipv6攻击视角 - r0fus0d 的博客
查看 ipv6 地址
ip -6 addr show scope global
输出形如:
2: eno1: <BROADCAST,MULTICAST,ALLMULTI,UP,LOWER_UP> mtu 1500 state UP qlen 1000
inet6 240?:????:????:????:abcd:1234:5678:90ab/64 scope global temporary dynamic
valid_lft 258952sec preferred_lft 85972sec
inet6 240?:????:????:????:1234:5678:abcd:1124/64 scope global temporary deprecated dynamic
valid_lft 258952sec preferred_lft 0sec
inet6 240?:????:????:????:7890:3456:abcd:0101/64 scope global dynamic mngtmpaddr noprefixroute
valid_lft 258952sec preferred_lft 172552sec
可以看到,240?:????:????:????::/64
是分配到的 ipv6 网段。
这一步有可能得不到 240*
开头的公网 ipv6 地址,一般是因为服务器不是直连光猫,而是通过路由器连接光猫。
这时需要登录路由器管理界面,把 ipv6 的上网模式从 自动获取IP地址
改为 桥模式
。然后耐心等待几分钟,重新执行上面的命令,直到看到 240*
开头的 ipv6 地址。
可以进一步过滤出当前的公网 ipv6 地址:
ip -6 addr show scope global | grep -E "2409.*" | grep -E "mngtmpaddr" | grep -v "deprecated"
inet6 240?:????:????:????:7890:3456:abcd:0101/64 scope global dynamic mngtmpaddr noprefixroute
输出形如:
三大运营商的 ipv6 地址开头:
- 中国联通:
2408
- 中国移动:
2409
- 中国电信:
240e
添加路由
将上面的获得的 ipv6 网段添加到路由表中:
sudo ip route add local 240?:????:????:????::/64 dev eno1
- 将
240?:????:????:????::/64
替换为实际的 ipv6 网段 - 将
eno1
替换为实际的网卡名称 - 这里的
dev
是device
的缩写,表示指定网络接口设备
查看路由
ip -6 route show
形如:
::1 dev lo proto kernel metric 256 pref medium
240?:????:????:???::/64 dev eno1 proto ra metric 100 pref medium
240?:????:????:???::/60 via fe80::1 dev eno1 proto ra metric 100 pref high
fe80::/64 dev ????? proto kernel metric 256 pref medium
fe80::/64 dev eno1 proto kernel metric 1024 pref medium
default via fe80::1 dev eno1 proto ra metric 100 pref medium
如果只想显示公网地址,可以过滤掉包含 fe80::
和 lo
的地址:
ip -6 route show | grep -v 'fe80::' | grep -v 'lo'
输出形如:
240?:????:????:???::/64 dev eno1 proto ra metric 100 pref medium
修改 sysctl.conf
查看当前内核参数:
sudo sysctl -a | grep -E "(proxy_ndp|forwarding|nonlocal_bind)" | grep ipv6
修改内核参数文件:
sudo nano /etc/sysctl.conf
在文件末尾添加内容并保存:
net.ipv6.conf.all.proxy_ndp=1
net.ipv6.conf.all.forwarding=1
net.ipv6.conf.default.proxy_ndp=1
net.ipv6.conf.default.forwarding=1
net.ipv6.conf.eno1.proxy_ndp=1
net.ipv6.conf.eno1.forwarding=1
net.ipv6.ip_nonlocal_bind=1
- 将
eno1
替换为实际的网卡名称
使配置生效:
sudo sysctl -p
安装 ndppd
sudo apt install ndppd
配置 ndppd
sudo nano /etc/ndppd.conf
添加内容:
route-ttl 30000
proxy eno1 {
router no
timeout 500
ttl 30000
rule 240?:????:????:???::/64 {
static
}
}
- 将
eno1
替换为实际的网卡名称 - 将
240?:????:????:????::/64
替换为实际的 ipv6 网段
启动 ndppd
sudo systemctl start ndppd
设置开机自启:
sudo systemctl enable ndppd
测试出口地址
随机选择一个同网段下的 ipv6 地址,测试出口 IP:
curl --int 240?:????:????:????:abcd:9876:5678:0123 http://ifconfig.me/ip
--int
是--interface
的缩写,用于指定出口 IP
如果之前的步骤都正确,输出的 ipv6 地址应该和 --int
指定的相同,形如:
240?:????:????:????:abcd:9876:5678:0123
网段变更
如果光猫重启或者断电,可能会导致 ipv6 网段变更,需要重新添加路由:
sudo ip route add local 240x:xxxx:xxxx:xxxx::/64 dev eno1
- 这里的
240x:xxxx:xxxx:xxxx::/64
是新的 ipv6 网段
重新配置 ndppd:
sudo nano /etc/ndppd.conf
将 rule 240?:????:????:????::/64
替换为新的 ipv6 网段:
route-ttl 30000
proxy eno1 {
router no
timeout 500
ttl 30000
rule 240x:xxxx:xxxx:xxxx::/64 {
static
}
}
重启 ndppd:
sudo systemctl restart ndppd
一键自动配置
该脚本自动完成如下步骤:
- 查看本机最新 ipv6 前缀
- 添加路由
- 修改 ndppd 配置,重启 ndppd
有两个步骤不包括在内(因为只需要操作一次):
- 启用 ip_nonlocal_bind
- 安装 ndppd
ip_router.py
import netifaces
import random
import re
import requests
import requests.packages.urllib3.util.connection as urllib3_cn
import socket
import time
from pathlib import Path
from requests.adapters import HTTPAdapter
from tclogger import logger, logstr, decolored, shell_cmd
from typing import Union
REQUESTS_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0 Safari/537.36"
}
class IPv6Adapter(HTTPAdapter):
def __init__(self, source_address, *args, **kwargs):
self.source_address = source_address
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
kwargs["source_address"] = self.source_address
return super().init_poolmanager(*args, **kwargs)
class RequestsSessionIPv6Adapter:
@staticmethod
def force_ipv4():
urllib3_cn.allowed_gai_family = lambda: socket.AF_INET
@staticmethod
def force_ipv6():
if urllib3_cn.HAS_IPV6:
urllib3_cn.allowed_gai_family = lambda: socket.AF_INET6
def adapt(self, session: requests.Session, ip: str):
try:
socket.inet_pton(socket.AF_INET6, ip)
except Exception as e:
raise ValueError(f"× Invalid IPv6 format: [{ip}]")
adapter = IPv6Adapter((ip, 0))
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
class IPv6Generator:
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.interfaces = []
# self.get_prefix()
def get_addr_prefix(self, addr: str, netmask: str):
prefix_length = netmask.count("f")
prefix = addr[: prefix_length // 4 * 5 - 1]
return prefix, prefix_length * 4
def get_network_interfaces(self):
interfaces = netifaces.interfaces()
for interface in interfaces:
addresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET6 not in addresses:
continue
for addr_info in addresses[netifaces.AF_INET6]:
if not addr_info["addr"].startswith("2"):
break
addr = addr_info["addr"]
netmask = addr_info.get("netmask") or addr_info.get("mask")
prefix, prefix_bits = self.get_addr_prefix(addr, netmask)
self.interfaces.append(
{
"interface": interface,
"addr": addr,
"netmask": netmask,
"prefix": prefix,
"prefix_bits": prefix_bits,
}
)
def get_prefix(self, return_netint: bool = False):
logger.note("> Get ipv6 prefix:")
self.get_network_interfaces()
interface = self.interfaces[0]
prefix = interface["prefix"].strip(":")
prefix_bits = interface["prefix_bits"]
netint = interface["interface"]
if self.verbose:
logger.note(f"> IPv6 prefix:", end=" ")
logger.success(f"[{prefix}]", end=" ")
logger.mesg(f"(/{prefix_bits})")
self.netint = netint
self.prefix = prefix
self.prefix_bits = prefix_bits
logger.file(f" * prefix: {logstr.okay(prefix)}")
logger.file(f" * netint: {logstr.okay(netint)}")
if return_netint:
return self.prefix, netint
else:
return self.prefix
def generate(
self, prefix: str = None, return_segs: bool = False
) -> Union[str, tuple[str, list[str], list[str]]]:
prefix = prefix or self.prefix
prefix_segs = prefix.split(":")
suffix_seg_count = 8 - len(prefix_segs)
suffix_segs = [f"{random.randint(0, 65535):x}" for _ in range(suffix_seg_count)]
addr = ":".join(prefix_segs + suffix_segs)
if return_segs:
return addr, prefix_segs, suffix_segs
else:
return addr
class IPv6RouteModifier:
def __init__(self, prefix: str, netint: str, ndppd_conf: Union[Path, str] = None):
self.ndppd_conf = ndppd_conf or Path("/etc/ndppd.conf")
self.prefix = prefix
self.netint = netint
def is_ndppd_conf_latest(self):
logger.note("> Check proxy (netint) and rule (prefix) in ndppd.conf:")
if not self.ndppd_conf.exists():
logger.mesg(f"ndppd.conf does not exist: {self.ndppd_conf}")
return False
with open(self.ndppd_conf, "r") as rf:
lines = rf.readlines()
is_netint_found = False
netint_pattern = re.compile(rf"proxy\s+{self.netint}")
for line in lines:
if netint_pattern.search(line):
netint_str = logstr.file(self.netint)
logger.mesg(f" + Found proxy (netint): {netint_str}")
is_netint_found = True
break
if not is_netint_found:
logger.mesg(f" - Not found proxy (netint): {netint_str}")
return False
is_prefix_found = False
prefix_pattern = re.compile(rf"rule\s+{self.prefix}::/64")
for line in lines:
if prefix_pattern.search(line):
prefix_str = logstr.file(f"{self.prefix}::/64")
logger.mesg(f" + Found rule (prefix/): {prefix_str}")
is_prefix_found = True
break
if not is_prefix_found:
logger.mesg(f" - Not found rule (prefix/): {prefix_str}")
return False
return True
def add_route(self):
logger.note("> Add IP route:")
cmd = f"sudo ip route add local {self.prefix}::/64 dev {self.netint}"
# logger.mesg(cmd)
shell_cmd(cmd)
def del_route(self):
logger.note("> Delete IP route:")
cmd = f"sudo ip route del local {self.prefix}::/64 dev {self.netint}"
# logger.mesg(cmd)
shell_cmd(cmd)
def modify_ndppd_conf(self, overwrite: bool = False):
if self.ndppd_conf.exists():
with open(self.ndppd_conf, "r") as rf:
old_ndppd_conf_str = rf.read()
logger.note(f"> Read: {logstr.file(self.ndppd_conf)}")
logger.mesg(f"{old_ndppd_conf_str}")
if not self.ndppd_conf.exists() or overwrite:
new_ndppd_conf_str = (
f"route-ttl 30000\n"
f"proxy {logstr.success(self.netint)} {{\n"
f" router no\n"
f" timeout 500\n"
f" ttl 30000\n"
f" rule {logstr.success(self.prefix)}::/64 {{\n"
f" static\n"
f" }}\n"
f"}}\n"
)
logger.note(f"> Write: {logstr.file(self.ndppd_conf)}")
logger.mesg(f"{new_ndppd_conf_str}")
with open(self.ndppd_conf, "w") as wf:
wf.write(decolored(new_ndppd_conf_str))
logger.success(f"✓ Modified: {logstr.file(self.ndppd_conf)}")
def restart_ndppd(self):
logger.note("> Restart ndppd:")
cmd = "sudo systemctl restart ndppd"
# logger.mesg(cmd)
shell_cmd(cmd)
logger.success(f"✓ Restarted: {logstr.file('ndppd')}")
def wait_ndppd_work(self, wait_seconds: int = 5):
logger.note(f"> Waiting {wait_seconds} seconds for ndppd to work ...")
time.sleep(wait_seconds)
def main():
generator = IPv6Generator()
prefix, netint = generator.get_prefix(return_netint=True)
modifier = IPv6RouteModifier(prefix=prefix, netint=netint)
modifier.add_route()
if modifier.is_ndppd_conf_latest():
logger.success(f"✓ ndppd.conf is up-to-date, skip restart.")
else:
modifier.modify_ndppd_conf(overwrite=True)
modifier.restart_ndppd()
modifier.wait_ndppd_work(wait_seconds=5)
logger.note("> Testing ipv6 addrs:")
session = requests.Session()
adapter = RequestsSessionIPv6Adapter()
for i in range(5):
ipv6, prefix_segs, suffix_segs = generator.generate(return_segs=True)
prefix = ":".join(prefix_segs)
suffix = ":".join(suffix_segs)
logger.note(f" > [{prefix}:{logstr.file(suffix)}]")
adapter.adapt(session, ipv6)
response = session.get("https://test.ipw.cn", headers=REQUESTS_HEADERS)
resp_ipv6_str = response.text.split("\n")[0].strip()
resp_ipv6_str = resp_ipv6_str.replace(":0:", "::")
logger.mesg(f" * [{resp_ipv6_str}]")
if __name__ == "__main__":
main()
# sudo is needed to modify ndppd.conf
# Case1: Run directly, need to type sudo password
# sudo env "PATH=$PATH" python -m networks.ipv6.router
# Case2: Run with piped password
# echo $SUDOPASS | sudo -S env "PATH=$PATH" python -m networks.ipv6.router
运行:
sudo env "PATH=$PATH" python ip_router.py
生成脚本
ip_generator.py
import json
import netifaces
import random
import requests
import requests.packages.urllib3.util.connection as urllib3_cn
import socket
import threading
from pathlib import Path
from requests.adapters import HTTPAdapter
from tclogger import logger, logstr, get_now_str
from typing import Union
CACHE_ROOT = Path(__file__).parent
REQUESTS_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
}
class IPv6Adapter(HTTPAdapter):
def __init__(self, source_address, *args, **kwargs):
self.source_address = source_address
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
kwargs["source_address"] = self.source_address
return super().init_poolmanager(*args, **kwargs)
class RequestsSessionIPv6Adapter:
@staticmethod
def force_ipv4():
urllib3_cn.allowed_gai_family = lambda: socket.AF_INET
@staticmethod
def force_ipv6():
if urllib3_cn.HAS_IPV6:
urllib3_cn.allowed_gai_family = lambda: socket.AF_INET6
def adapt(self, session: requests.Session, ip: str):
try:
socket.inet_pton(socket.AF_INET6, ip)
except Exception as e:
raise ValueError(f"× Invalid IPv6 format: [{ip}]")
adapter = IPv6Adapter((ip, 0))
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
class IPv6Prefixer:
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.interfaces = []
self.init_prefix()
def get_addr_prefix(self, addr: str, netmask: str):
prefix_length = netmask.count("f")
prefix = addr[: prefix_length // 4 * 5 - 1]
return prefix, prefix_length * 4
def get_network_interfaces(self):
interfaces = netifaces.interfaces()
for interface in interfaces:
addresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET6 not in addresses:
continue
for addr_info in addresses[netifaces.AF_INET6]:
if not addr_info["addr"].startswith("2"):
break
addr = addr_info["addr"]
netmask = addr_info.get("netmask") or addr_info.get("mask")
prefix, prefix_bits = self.get_addr_prefix(addr, netmask)
self.interfaces.append(
{
"interface": interface,
"addr": addr,
"netmask": netmask,
"prefix": prefix,
"prefix_bits": prefix_bits,
}
)
def init_prefix(self) -> str:
self.get_network_interfaces()
interface = self.interfaces[0]
prefix = interface["prefix"].strip(":")
prefix_bits = interface["prefix_bits"]
if self.verbose:
logger.note(f"> IPv6 prefix:", end=" ")
logger.okay(f"[{prefix}]", end=" ")
logger.mesg(f"(/{prefix_bits})")
self.prefix = prefix
self.prefix_bits = prefix_bits
return self.prefix
class IPv6Pool:
def __init__(self, verbose: bool = False):
self.lock = threading.Lock()
self.using_addrs = set()
self.verbose = verbose
def push_addr_to_using(self, addr: str):
with self.lock:
self.using_addrs.add(addr)
def pop_addr_from_using(self, addr: str):
with self.lock:
self.using_addrs.discard(addr)
def is_addr_using(self, addr: str) -> bool:
with self.lock:
return addr in self.using_addrs
class IPv6Cacher:
def __init__(self, cache_name: str = None, verbose: bool = False):
self.cache_name = cache_name
self.verbose = verbose
self.lock = threading.Lock()
self.init_paths()
def init_paths(self):
cache_name = self.cache_name or "ipv6_addrs_cache"
self.cache_path = (CACHE_ROOT / cache_name).with_suffix(".json")
def is_cache_exists(self) -> bool:
return self.cache_path.exists()
def read_cache(self) -> list[dict]:
with open(self.cache_path, "r", encoding="utf-8") as rf:
cache_data: list[dict] = json.load(rf)
return cache_data
def write_cache(self, cache_data: list[dict]):
with open(self.cache_path, "w", encoding="utf-8") as wf:
json.dump(cache_data, wf, ensure_ascii=False, indent=4)
def push_addr_to_cache(self, addr: str):
with self.lock:
logger.mesg(f" + Push addr to cache: [{addr}]", verbose=self.verbose)
if not self.is_cache_exists():
self.cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_data: list[dict] = []
else:
cache_data: list[dict] = self.read_cache()
addr_item = {
"addr": addr,
"cache_at": get_now_str(),
}
cache_data.append(addr_item)
self.write_cache(cache_data)
def pop_addr_from_cache(self, addr: str):
with self.lock:
logger.warn(f" - Pop addr from cache: [{addr}]", verbose=self.verbose)
if not self.is_cache_exists() or not addr:
return
cache_data: list[dict] = self.read_cache()
cache_data = [item for item in cache_data if item.get("addr") != addr]
self.write_cache(cache_data)
def addr_to_segs(self, prefix: str, addr: str):
addr_segs = addr.split(":")
prefix_segs = prefix.split(":")
suffix_segs = addr_segs[len(prefix_segs) :]
return addr_segs, prefix_segs, suffix_segs
def get_cache_addr(
self, prefix: str, return_segs: bool = False, pool: IPv6Pool = None
) -> Union[str, tuple[str, list[str], list[str]]]:
if not self.is_cache_exists():
return None
cache_data = self.read_cache()
# filter using_addrs in pool from cache_data
if pool and pool.using_addrs:
filtered_cache_data = [
item
for item in cache_data
if not pool.is_addr_using(item.get("addr", ""))
]
if not filtered_cache_data:
return None
else:
filtered_cache_data = cache_data
# pick a random valid addr
addr: str = None
random.shuffle(filtered_cache_data)
for item in filtered_cache_data:
addr = item.get("addr", "")
if addr.startswith(prefix):
break
if not addr:
return None
if pool:
pool.push_addr_to_using(addr)
if return_segs:
_, prefix_segs, suffix_segs = self.addr_to_segs(prefix, addr)
return addr, prefix_segs, suffix_segs
else:
return addr
class IPv6Checker:
def __init__(self, timeout: float = 10, verbose: bool = False):
self.adapter = RequestsSessionIPv6Adapter()
self.session = requests.Session()
self.timeout = timeout
self.verbose = verbose
def check(self, addr: str) -> bool:
self.adapter.adapt(self.session, addr)
response = self.session.get(
"https://test.ipw.cn", headers=REQUESTS_HEADERS, timeout=self.timeout
)
addr_hash = addr.replace(":", "")
resp_text = response.text.strip()
resp_hash = resp_text.replace(":", "")
is_success = addr_hash == resp_hash
if self.verbose:
if is_success:
logger.okay(f" ✓ [{resp_text}]")
else:
logger.warn(f" x [{resp_text}]")
return is_success
class IPv6Randomizer:
def __init__(
self,
cacher: IPv6Cacher = None,
checker: IPv6Checker = None,
verbose: bool = False,
):
self.cacher = cacher
self.checker = checker
self.verbose = verbose
def get_random_addr_segs(self, prefix: str) -> tuple[str, list[str], list[str]]:
prefix_segs = prefix.split(":")
suffix_seg_count = 8 - len(prefix_segs)
suffix_segs = [f"{random.randint(1, 65535):x}" for _ in range(suffix_seg_count)]
addr = ":".join(prefix_segs + suffix_segs)
return addr, prefix_segs, suffix_segs
def get_random_addr(
self,
prefix: str,
return_segs: bool = False,
is_check: bool = True,
max_retries: int = 5,
is_cache_addr: bool = True,
pool: IPv6Pool = None,
) -> Union[str, tuple[str, list[str], list[str]]]:
if is_check and self.checker:
retry_idx = 0
is_valid = False
while retry_idx < max_retries and not is_valid:
addr, prefix_segs, suffix_segs = self.get_random_addr_segs(prefix)
is_valid = self.checker.check(addr)
if is_valid:
break
retry_idx += 1
else:
addr, prefix_segs, suffix_segs = self.get_random_addr_segs(prefix)
is_valid = True
if not is_valid:
logger.warn(f" x [{addr}]")
return None
else:
if is_cache_addr and self.cacher:
self.cacher.push_addr_to_cache(addr)
if pool:
pool.push_addr_to_using(addr)
if return_segs:
return addr, prefix_segs, suffix_segs
else:
return addr
class IPv6Generator:
def __init__(self, cache_name: str = None, verbose: bool = False):
self.prefixer = IPv6Prefixer(verbose=verbose)
self.pool = IPv6Pool(verbose=verbose)
self.cacher = IPv6Cacher(cache_name=cache_name, verbose=verbose)
self.checker = IPv6Checker(verbose=verbose)
self.randomizer = IPv6Randomizer(
cacher=self.cacher, checker=self.checker, verbose=verbose
)
self.lock = threading.Lock()
self.verbose = verbose
def generate(
self,
return_segs: bool = False,
is_use_cache: bool = True,
is_cache_addr: bool = True,
) -> Union[str, tuple[str, list[str], list[str]]]:
with self.lock:
prefix = self.prefixer.prefix
if is_use_cache:
cache_res = self.cacher.get_cache_addr(
prefix=prefix, return_segs=return_segs, pool=self.pool
)
if cache_res:
return cache_res
random_res = self.randomizer.get_random_addr(
prefix=prefix,
return_segs=return_segs,
is_cache_addr=is_cache_addr,
pool=self.pool,
)
return random_res
def test_ipv6_generator():
generator = IPv6Generator(cache_name="ipv6_addrs")
checker = IPv6Checker(verbose=True)
for i in range(5):
ipv6, prefix_segs, suffix_segs = generator.generate(
return_segs=True, is_use_cache=False, is_cache_addr=True
)
prefix = ":".join(prefix_segs)
suffix = ":".join(suffix_segs)
suffix_str = logstr.file(suffix)
logger.note(f" > [{prefix}:{suffix_str}]")
checker.check(ipv6)
logger.note(f"> using_addrs:")
for addr in generator.pool.using_addrs:
logger.file(f" * [{addr}]")
def test_ipv6_generator_parallel():
from concurrent.futures import ThreadPoolExecutor, as_completed
generator = IPv6Generator(cache_name="ipv6_addrs", verbose=True)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(
generator.generate,
return_segs=True,
is_use_cache=False,
is_cache_addr=True,
)
for _ in range(10)
]
for future in as_completed(futures):
addr, prefix_segs, suffix_segs = future.result()
prefix = ":".join(prefix_segs)
suffix = ":".join(suffix_segs)
suffix_str = logstr.file(suffix)
logger.note(f" > [{prefix}:{suffix_str}]")
if __name__ == "__main__":
# test_ipv6_generator()
test_ipv6_generator_parallel()
# python -m networks.ipv6.generator
运行:
python ip_generator.py
测试脚本
ip_tester.py
import netifaces
import random
import requests
import requests.packages.urllib3.util.connection as urllib3_cn
import socket
from tclogger import logger
from requests.adapters import HTTPAdapter
class IPv6Extractor:
def __init__(self):
self.interfaces = []
def extract_prefix(self, addr: str, netmask: str):
prefix_length = netmask.count("f")
prefix = addr[: prefix_length // 4 * 5 - 1]
return prefix, prefix_length * 4
def get_network_interfaces(self):
interfaces = netifaces.interfaces()
for interface in interfaces:
addresses = netifaces.ifaddresses(interface)
if netifaces.AF_INET6 not in addresses:
continue
for addr_info in addresses[netifaces.AF_INET6]:
if not addr_info["addr"].startswith("2"):
break
addr = addr_info["addr"]
netmask = addr_info["netmask"]
prefix, prefix_bits = self.extract_prefix(addr, netmask)
self.interfaces.append(
{
"interface": interface,
"addr": addr,
"netmask": netmask,
"prefix": prefix,
"prefix_bits": prefix_bits,
}
)
def get_prefix(self):
self.get_network_interfaces()
interface = self.interfaces[0]
prefix = interface["prefix"]
prefix_bits = interface["prefix_bits"]
logger.note(f"> IPv6 prefix:", end=" ")
logger.success(f"[{prefix}]", end=" ")
logger.mesg(f"(/{prefix_bits})")
return prefix
def random_ipv6(self, prefix: str = None) -> str:
if prefix is None:
prefix = self.get_prefix()
prefix_segs = prefix.split(":")
suffix_seg_count = 8 - len(prefix_segs)
suffix_seg = [f"{random.randint(0, 65535):x}" for _ in range(suffix_seg_count)]
addr = ":".join(prefix_segs + suffix_seg)
return addr
class IPv6Adapter(HTTPAdapter):
def __init__(self, source_address, *args, **kwargs):
self.source_address = source_address
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
kwargs["source_address"] = self.source_address
return super().init_poolmanager(*args, **kwargs)
class IPTester:
def __init__(self):
self.url = "http://ifconfig.me/ip"
def force_ipv4(self):
urllib3_cn.allowed_gai_family = lambda: socket.AF_INET
def force_ipv6(self):
if urllib3_cn.HAS_IPV6:
urllib3_cn.allowed_gai_family = lambda: socket.AF_INET6
def check_ip_addr(self, ip: str = None):
if not ip:
return 4
try:
socket.inet_pton(socket.AF_INET6, ip)
return 6
except Exception as e:
logger.warn(f"× Invalid ip string: [{ip}]")
return None
def set_session_adapter(self, session: requests.Session, ip: str = None):
ip_version = self.check_ip_addr(ip)
if ip_version == 4:
self.force_ipv4()
elif ip_version == 6:
self.force_ipv6()
adapter = IPv6Adapter((ip, 0))
session.mount("http://", adapter)
session.mount("https://", adapter)
else:
pass
def get(self, ip: str = None):
session = requests.Session()
self.set_session_adapter(session, ip)
logger.note(f" > Set:", end=" ")
if not ip:
logger.line(f"[ipv4]")
else:
logger.line(f"[{ip}]")
try:
resp = session.get(self.url, timeout=5)
if resp and resp.status_code == 200:
logger.file(f" * Get:", end=" ")
logger.success(f"[{resp.text.strip()}]")
except Exception as e:
logger.error(f"× Error: {e}")
if __name__ == "__main__":
extractor = IPv6Extractor()
prefix = extractor.get_prefix()
random_ipv6_addrs = [extractor.random_ipv6(prefix) for _ in range(5)]
ip_tester = IPTester()
ip_tester.get()
for ip in random_ipv6_addrs:
ip_tester.get(ip)
运行:
# pip install netifaces requests tclogger
python ip_tester.py
输出形如:
> IPv6 prefix: [240?:????:????:????] (/64)
> Set: [ipv4]
* Get: [???.???.???.???]
> Set: [240?:????:????:????:f618:ad3a:fd1a:f0d0]
* Get: [240?:????:????:????:f618:ad3a:fd1a:f0d0]
> Set: [240?:????:????:????:410b:9770:6504:de53]
* Get: [240?:????:????:????:410b:9770:6504:de53]
> Set: [240?:????:????:????:b05b:87b2:26b4:15f3]
* Get: [240?:????:????:????:b05b:87b2:26b4:15f3]
> Set: [240?:????:????:????:d5bc:58c9:dd74:b45]
* Get: [240?:????:????:????:d5bc:58c9:dd74:b45]