reactive 编程+redisson 如何分页的问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
bleulucaswu
V2EX    程序员

reactive 编程+redisson 如何分页的问题

  •  
  •   bleulucaswu 2024-10-11 05:13:09 +08:00 1694 次点击
    这是一个创建于 444 天前的主题,其中的信息可能已经有所发展或是发生改变。
    • SpringBoot WebFlux 3.1.0 + redisson 3.27.1 + redis stack 7 单机
    • webflux control 要求业务代码返回 mono
     @Bean public RouterFunction<ServerResponse> route(CacheHandler cacheHandler) { return RouterFunctions.route() .GET("......", cacheHandler::xxxxxxx) .build(); 
    • redissonReactiveClient 是 client

      • redissonReactiveClient.getSearch(codec).search(.....); 查询 redis 返回 mono<SearchResult>
      • 所以 cacheHandler::xxxxxxx 中 call 上面的查询
      • 如果做分页的话,SearchResult 里面有 total 和 List<Document>当前页数据
    • 但是我如何在一个请求中查出所有数据呢,也就是 reactive 编程怎么搞个循环查询啊

      • 只能将第一次查询的 mono block()拿到 total 吗,但是貌似 reactive 编程不该是这个写法...
      • 如果 Mono<SearchResult> cc = client.search(..第一页数据).cache();
        • 然后 return cc.flatMap(c -> client.search(.. c.getTotal, 然后传参剩下所有数据)).concatWith(cc).collectList();
        • 测试之后返回空的,因为 client.search 本身也是异步多线程的,这个操作在另一个线程进行,webflux 就给我空数据了,还有就是第二个 search 也拿不到真正的 total

    所以这个该咋做啊 -

    12 条回复    2024-10-16 03:16:00 +08:00
    bleulucaswu
        1
    bleulucaswu  
    OP
       2024-10-11 05:18:15 +08:00
    要做分页的原因是 redis search 这个是强制分页的 (ft.search index *)就你查询默认只给你 10 个
    所以必须 (ft.search index * 0 100) 就是必须给个 offset count 参数
    ZGame
        2
    ZGame  
       2024-10-11 08:33:28 +08:00
    请求所有数据可以通过 merge ,zip 等操作符吧, 另外可以看看 hs-web ,用的就是响应式。响应式更多时候有点像函数式编程,看看你的操作能不能通过函数式的方式,关注入参出参,还有函数转换方法,尽量减少副作用和外部的变量直接传入,so.. 其实你会发现用不好比用命令式的方式写代码更难受
    OctVan
        3
    OctVan  
       2024-10-11 09:30:21 +08:00
    https://projectreactor.io/docs/core/release/reference/#sinks
    可以用这个一直翻页,直到取完数据
    spkingr
        4
    spkingr  
       2024-10-11 09:46:14 +08:00
    遇事不决问 AI ,这种问题问 AI 最好,他都能给你写好。
    编程式风格确实不推荐使用 for 循环,用 for 也是配合 yield 。
    你这个 flatMap 按理来说应该是可以的,我复制你的问题给 AI ,AI 就简单的给了一个参考代码,你可以看看,看能不能改改用上:

    public Mono<SearchResult> searchWithPagination(int pageNumber, int pageSize) {
    return redissonReactiveClient.getSearch(StringCodec.INSTANCE)
    .search(SearchArgs(pageNumber, pageSize)) // 返回是 Mono<SearchResult>,有条数和页数吧
    .flatMap(initialSearchResult -> { // flatMap 抽取结果
    int totalPages = initialSearchResult.getTotal() / pageSize; // 修改一下,获取页数
    if (pageNumber < totalPages) {
    // 这里继续查询并合并
    return redissonReactiveClient.getSearch(StringCodec.INSTANCE)
    .search(SearchArgs(pageNumber, pageSize))
    // map 则是转换结果
    .map(nextSearchResult -> combineSearchResults(initialSearchResult, nextSearchResult));
    } else {
    // 没有数据
    return Mono.just(initialSearchResult);
    }
    });
    }

    private SearchResult combineSearchResults(SearchResult initialResult, SearchResult nextResult) {
    List<Document> combinedDocuments = new ArrayList<>(initialResult.getDocuments());
    combinedDocuments.addAll(nextResult.getDocuments());
    SearchResult combinedResult = new SearchResult();
    combinedResult.setTotal(initialReult.getTotal());
    combinedResult.setDocuments(combinedDocuments);
    return combinedResult;
    }
    spkingr
        5
    spkingr  
       2024-10-11 09:46:56 +08:00
    可以用 MD 吗:

    ```
    public Mono<SearchResult> searchWithPagination(int pageNumber, int pageSize) {
    return redissonReactiveClient.getSearch(StringCodec.INSTANCE)
    .search(SearchArgs(pageNumber, pageSize)) // 返回是 Mono<SearchResult>,有条数和页数吧
    .flatMap(initialSearchResult -> { // flatMap 抽取结果
    int totalPages = initialSearchResult.getTotal() / pageSize; // 修改一下,获取页数
    if (pageNumber < totalPages) {
    // 这里继续查询并合并
    return redissonReactiveClient.getSearch(StringCodec.INSTANCE)
    .search(SearchArgs(pageNumber, pageSize))
    // map 则是转换结果
    .map(nextSearchResult -> combineSearchResults(initialSearchResult, nextSearchResult));
    } else {
    // 没有数据
    return Mono.just(initialSearchResult);
    }
    });
    }

    private SearchResult combineSearchResults(SearchResult initialResult, SearchResult nextResult) {
    List<Document> combinedDocuments = new ArrayList<>(initialResult.getDocuments());
    combinedDocuments.addAll(nextResult.getDocuments());
    SearchResult combinedResult = new SearchResult();
    combinedResult.setTotal(initialResult.getTotal());
    combinedResult.setDocuments(combinedDocuments);
    return combinedResult;
    }

    ```
    yazinnnn0
        6
    yazinnnn0  
       2024-10-11 10:31:53 +08:00
    这个效果?


    用的 mutiny, 跟 reactor 差不多
    拿到 total 的 mono 时做 flatmap 转换, 然后把你的循环查询转换称序列+步进(都用 fp 了就别用传统 for 循环了...
    然后把序列做 fold 操作, java 里貌似需要用 reduce 去实现, fold 的初始值设置为 mono 形式的零值, 比如 Mono.just(List.empty())之类, fold 的 operation 去合并 acc 和新查出来的值(mutiny 用 combine, reactor 里应该有类似的 api)
    bleulucaswu
        7
    bleulucaswu  
    OP
       2024-10-11 17:26:25 +08:00
    @spkingr 这样是没错 可是如果 search 操作不止就两次呢 就不止向 redis 请求两次 我主要想请教下 reative 风格的编程中 循环 x 操作该怎么做 且 x 是异步执行在循环之外的一个线程中 然后 Xn 的部分参数和 Xn-1 有关 然后就是如果 Xn 触发了某个条件 如何让 Xn+1 Xn+2 ... 全部取消 X1~Xn 都是异步无前后顺序执行的 该怎么做到有序的取消操作
    bleulucaswu
        8
    bleulucaswu  
    OP
       2024-10-11 17:27:26 +08:00
    @yazinnnn0 兄弟 图裂了
    spkingr
        9
    spkingr  
       2024-10-12 10:20:16 +08:00
    @bleulucaswu 这个时候不得不提 CompletableFuture 大法好了!
    thenCombine/thenCompose/thenApplyAsync 这些方法很好用,建议参考这篇文章: https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
    bleulucaswu
        10
    bleulucaswu  
    OP
       2024-10-15 07:19:11 +08:00
    @yazinnnn0 请问下 如果 fold 里每次 search 操作都在一个额外的线程中进行,可以做到把结果 combine 到一起吗
    然后你这是 0..<20 step 5 来做循环的,假设 index=10 那次循环,因为触发了某个条件将它强制停止,有什么办法让 index=15 的那次循环以及之后所有的循环全部停止不做了(如果有的话),因为每次循环都是多线程乱序的,这能做到吗用 mutiny
    reactive 编程我还不熟悉 感觉用起来太难了
    bleulucaswu
        11
    bleulucaswu  
    OP
       2024-10-15 07:23:04 +08:00
    @yazinnnn0 就类似
    (1..<all step 5).fold(initial) { 多线程操作 }.until( index > 10 )
    就 index>10 的所有多线程里的操作都停掉,until 中肯定会有更复杂的判断条件,这里就举个例子
    bleulucaswu
        12
    bleulucaswu  
    OP
       2024-10-16 03:16:00 +08:00
    @yazinnnn0 ... 搞定了 reactor 有个 expand 方法很好用
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5423 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 22ms UTC 07:50 PVG 15:50 LAX 23:50 JFK 02:50
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86