用 netty 编写代理服务器,切换出口 IP,不能及时生效 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
montaro2017
V2EX    Java

用 netty 编写代理服务器,切换出口 IP,不能及时生效

  •  
  •   montaro2017 13 小时 21 分钟前 992 次点击

    公司有一台服务器,有很多个公网 IP ,就想着能不能利用起来。

    然后现在有一个任务是用浏览器打开指定网址,返回网页源代码,我就打算把这个服务器做成代理服务器。

    本来是计划每个 IP 做一个代理服务器,代理服务器根据入口 IP 用对应的 IP 连接目标服务器。

    开启每个浏览器的时候设置代理地址,比如 1.2.3.4:8639, 1.2.3.5:8639 这样 。

    然后发现多开浏览器非常吃性能,要充分利用所有的公 IP 得开几十个浏览器,这时候已经卡到动不了了,肯定不行。

    所以我就想开 4 个浏览器,每个浏览器设置一个代理,然后通过接口去切换代理后端的出口。

    代理服务器是用 netty 写的,逻辑改成了绑定不同端口,然后通过接口指定端口号和出口 IP ,来切换不同端口对应代理的出口 IP 。

    其实就是存了一个 Map<Integer, String>,调接口修改这个 map ,netty 代理服务器连接目标服务器的时候使用出口地址去连接。

    现在问题在于,调用接口切换出口 IP 后,日志显示已经使用新的出口 IP 了,但是访问查询 IP 的网站,还是使用之前的 IP ,好像要等一段时间才生效,这是什么问题,求各位大佬指教

    @Log4j2 public class ProxyFrontendHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final AddressFunction addressFunction; public ProxyFrontendHandler(AddressFunction addressFunction) { this.addressFunction = addressFunction; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) { if (HttpMethod.CONNECT.equals(req.method())) { handleConnectRequest(ctx, req); return; } handleHttpRequest(ctx, req); } private void handleConnectRequest(ChannelHandlerContext ctx, FullHttpRequest req) { List<String> split = StrUtil.split(req.uri(), ":"); String host = CollUtil.getFirst(split); int port = Convert.toInt(CollUtil.get(split, 1), 443); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(ctx.channel().eventLoop()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new RelayHandler(ctx.channel())); } }); ChannelFuture connectFuture; InetSocketAddress remoteAddress = new InetSocketAddress(host, port); InetSocketAddress sourceAddress = addressFunction.apply(ctx); if (sourceAddress != null) { log.info("Using outbound: {} | host: {}", sourceAddress, remoteAddress.getHostString()); cOnnectFuture= bootstrap.connect(remoteAddress, sourceAddress); } else { cOnnectFuture= bootstrap.connect(remoteAddress); } connectFuture.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { Channel outboundChannel = future.channel(); DefaultFullHttpResponse respOnse= new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK ); response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> { try { ctx.pipeline().remove(HttpServerCodec.class); ctx.pipeline().remove(HttpObjectAggregator.class); ctx.pipeline().addLast(new RelayHandler(outboundChannel)); } catch (Exception ignored) { } }); } else { sendErrorResponse(ctx, "无法连接到目标服务器"); closeOnFlush(ctx.channel()); } }); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { String host = req.headers().get(HttpHeaderNames.HOST); if (host == null) { sendErrorResponse(ctx, "缺少 Host 头"); closeOnFlush(ctx.channel()); return; } String[] hostParts = host.split(":"); String targetHost = hostParts[0]; int targetPort = hostParts.length > 1 ? Integer.parseInt(hostParts[1]) : 80; // 修改请求 URI 为绝对路径 req.setUri(req.uri().replace("http://" + host, "")); req.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); // 复制请求以避免在异步操作期间被释放 FullHttpRequest copiedReq = req.copy(); // 创建到目标服务器的连接 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(ctx.channel().eventLoop()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new HttpClientCodec()); ch.pipeline().addLast(new HttpObjectAggregator(1024 * 1024)); // 增加到 1MB ch.pipeline().addLast(new RelayHandler(ctx.channel())); } }); ChannelFuture connectFuture; InetSocketAddress remoteAddress = new InetSocketAddress(targetHost, targetPort); InetSocketAddress sourceAddress = addressFunction.apply(ctx); if (sourceAddress != null) { log.info("Using outbound: {} | host: {}", sourceAddress, remoteAddress.getHostString()); cOnnectFuture= bootstrap.connect(remoteAddress, sourceAddress); } else { cOnnectFuture= bootstrap.connect(remoteAddress); } connectFuture.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { future.channel().writeAndFlush(copiedReq); } else { closeOnFlush(ctx.channel()); } if (copiedReq.refCnt() != 0) { copiedReq.release(); } }); } private void sendErrorResponse(ChannelHandlerContext ctx, String message) { FullHttpResponse respOnse= new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_GATEWAY, Unpooled.wrappedBuffer(message.getBytes()) ); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset="); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if (cause instanceof SocketException) { closeOnFlush(ctx.channel()); return; } log.error(cause.getMessage()); closeOnFlush(ctx.channel()); } private void closeOnFlush(Channel ch) { if (ch.isActive()) { ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } } } 
    @Log4j2 public class RelayHandler extends ChannelInboundHandlerAdapter { private final Channel relayChannel; public RelayHandler(Channel relayChannel) { this.relayChannel = relayChannel; } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.read(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (relayChannel.isActive()) { relayChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { ctx.read(); // 继续读取数据 } else { future.channel().close(); } }); } else { closeOnFlush(ctx.channel()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error(cause); closeOnFlush(ctx.channel()); } private void closeOnFlush(Channel ch) { if (ch.isActive()) { ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } } } 
    第 1 条附言    12 小时 49 分钟前

    日志显示已经从110切换到120,但刷新了几次访问还是显示110

    第 2 条附言    8 小时 44 分钟前

    后续来了,用了一种比较迂回的解决方法,PAC + 浏览器Proxy Re-apply。

    写了一个接口返回PAC,切换IP时,返回对应代理的PAC,然后浏览器打开chrome://net-internals/#proxy,点一下Re-apply,这样就会获取新的PAC并使用代理,然后再访问目标网址。

    15 条回复    2025-10-30 20:38:41 +08:00
    aladdinding
        1
    aladdinding  
       13 小时 13 分钟前
    浏览器连接池的问题吧
    Ipsum
        2
    Ipsum  
       12 小时 59 分钟前 via Android
    我猜没有保存 conn ,切换时,没有关闭旧链接。
    montaro2017
        3
    montaro2017  
    OP
       12 小时 56 分钟前
    @Ipsum #2
    ChannelFuture connectFuture;
    InetSocketAddress remoteAddress = new InetSocketAddress(targetHost, targetPort);
    InetSocketAddress sourceAddress = addressFunction.apply(ctx);
    if (sourceAddress != null) {
    log.info("Using outbound: {} | host: {}", sourceAddress, remoteAddress.getHostString());
    cOnnectFuture= bootstrap.connect(remoteAddress, sourceAddress);
    } else {
    cOnnectFuture= bootstrap.connect(remoteAddress);
    }
    这里在连接目标服务器的时候获取了出口地址,然后指定用这个地址去连接的,日志有打印出来,只是连接的时候不知道为什么用的还是之前的出口地址
    montaro2017
        4
    montaro2017  
    OP
       12 小时 42 分钟前
    @aladdinding #1 还真是这个问题,我用 curl 试了,IP 立马就变了
    montaro2017
        5
    montaro2017  
    OP
       12 小时 34 分钟前
    @aladdinding #1 但为啥我日志里显示是用新的出口 IP
    cq65617875
        6
    cq65617875  
       12 小时 9 分钟前
    浏览器的缓存 用 curl
    cq65617875
        7
    cq65617875  
       12 小时 9 分钟前
    @cq65617875 或者尝试每次测试都新起一个隐私窗口
    montaro2017
        8
    montaro2017  
    OP
       12 小时 2 分钟前
    @cq65617875 #7 就是因为每次开新窗口开销大,才选择代理后端切换的,浏览器是用 selenium 控制的
    5waker
        9
    5waker  
       11 小时 52 分钟前
    ```rust
    5waker
        10
    5waker  
       11 小时 48 分钟前
    @montaro2017 我刚好昨天也遇到了类似的问题,我的做法是在一个长连接里不断检测 header ,然后根据内容再做转发
    ```rust
    loop {
    // 读取到完整头部
    let header_end = loop {
    if let Some(pos) = headers_end_pos(&buf) {
    break Some(pos);
    }
    let n = client_stream.read(&mut tmp).await?;
    if n == 0 {
    // 客户端关闭或无更多数据
    if buf.is_empty() {
    return Ok(());
    } else {
    return Ok(());
    }
    }
    buf.extend_from_slice(&tmp[..n]);
    if buf.len() > 128 * 1024 {
    error!("请求头过大,终止连接");
    return Ok(());
    }
    };
    let header_end = header_end.unwrap();
    let headers_vec: Vec<u8> = buf[..header_end].to_vec();

    let virtual_env = parse_virtual_env_from_headers(&headers_vec);
    let content_length = parse_content_length(&headers_vec);
    let chunked = is_chunked(&headers_vec);

    // 读取完整正文
    let body_len = if let Some(cl) = content_length {
    while buf.len() < header_end + cl {
    let n = client_stream.read(&mut tmp).await?;
    if n == 0 {
    error!("Content-Length 指定但连接提前关闭");
    return Ok(());
    }
    buf.extend_from_slice(&tmp[..n]);
    }
    cl
    } else if chunked {
    loop {
    if let Some(end) = chunked_body_end_pos(&buf[header_end..]) {
    break end;
    }
    let n = client_stream.read(&mut tmp).await?;
    if n == 0 {
    error!("chunked 正文未完整但连接已关闭");
    return Ok(());
    }
    buf.extend_from_slice(&tmp[..n]);
    }
    } else {
    0
    };

    let request_end = header_end + body_len;
    let body = &buf[header_end..request_end];
    let mut new_headers = rewrite_connection_close(&headers_vec);
    new_headers.extend_from_slice(body);

    // 路由:每个请求一个目标连接;响应仅回写到客户端
    if let Some(env) = virtual_env {
    let ctrl_opt = {
    let envs = (*VIRTUAL_ENVS).lock().unwrap();
    envs.get(&env).cloned()
    };
    if let Some(mut ctrl) = ctrl_opt {
    match ctrl.open_stream().await {
    Ok(mut sub) => {
    sub.write_all(&new_headers).await?;
    let _ = tokio::io::copy(&mut sub, &mut client_stream).await;
    }
    Err(e) => {
    error!("打开虚拟环境 {} 的子流失败: {:?}", env, e);
    let mut upstream_stream = TcpStream::connect(upstream_addr).await?;
    upstream_stream.write_all(&new_headers).await?;
    let _ = tokio::io::copy(&mut upstream_stream, &mut client_stream).await;
    }
    }
    } else {
    let mut upstream_stream = TcpStream::connect(upstream_addr).await?;
    upstream_stream.write_all(&new_headers).await?;
    let _ = tokio::io::copy(&mut upstream_stream, &mut client_stream).await;
    }
    } else {
    let mut upstream_stream = TcpStream::connect(upstream_addr).await?;
    upstream_stream.write_all(&new_headers).await?;
    let _ = tokio::io::copy(&mut upstream_stream, &mut client_stream).await;
    }

    // 删除已消费的请求字节,保留后续请求(若已到达)
    if request_end < buf.len() {
    let remaining = buf.split_off(request_end);
    buf = remaining;
    } else {
    buf.clear();
    }
    }
    ```
    montaro2017
        11
    montaro2017  
    OP
       11 小时 42 分钟前
    @5waker #10 我要用浏览器自动化来获取网页源代码,所以只能用代理服务器
    Gilfoyle26
        12
    Gilfoyle26  
       11 小时 12 分钟前
    用 c 写,解决一切烦恼
    ronyin
        13
    ronyin  
       11 小时 8 分钟前
    这是搞爬虫么。。。
    montaro2017
        14
    montaro2017  
    OP
       10 小时 53 分钟前
    @ronyin #13 对
    testFor
        15
    testFor  
       2 小时 33 分钟前
    写这个 netty 不如 go 好用
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2656 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 24ms UTC 15:12 PVG 23:12 LAX 08:12JFK 11:12
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86