V2EX zhaoyy0513 的所有回复 第 1 页 / 共 3 页
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX    zhaoyy0513    全部回复第 1 页 / 共 3 页
回复总数  49
1  2  3  
如果技术栈是 java,python 会用 vue ,这种容易找工作吗,零基础去日本学校学语言到找工作大约多久啊,老哥
@getty123 某宝倒是有加内存的,但是可能售后失效
老哥,内存条时序有点高啊,最次也换成 c32 的吧,显卡建议再加点上 9070XT 能抢到 4999 的话,抢不到就算了
@bettercallbalds 夸克网盘吧,蹲一手
@HMYDK 404 了哥们
没有 广告系统领域经验 , 路过帮顶
@yangxin0 招 JAVA 吗 老哥
2023-05-10 10:15:03 +08:00
回复了 CoCoMcRee 创建的主题 分享创造 我把 ChatGPT+ Midjourney 接入了钉钉群聊.
该群不存在或群主未开启群可被搜索功能 ...
2023-05-09 11:50:14 +08:00
回复了 Braisdom 创建的主题 程序员 智能 SQL 分析系统(我的新作品)
等一波开源
2023-05-08 10:18:17 +08:00
回复了 slomo 创建的主题 Linux 我该用哪个 Linux 发行版
@onice 确实很不错 打算用来做开发环境了
2023-05-06 10:21:21 +08:00
回复了 slomo 创建的主题 Linux 我该用哪个 Linux 发行版
推荐 debian 的 mint 我现在就是用的 mint 回复你的
要实现上游系统 A 将消息发送到下游系统 B 、C 、D ,并确保每个下游系统只处理自己需要处理的消息,同时还要确保消息只被消费一次,可以采用以下方案:

使用 Kafka 作为消息中间件,将上游系统 A 发送的消息发布到一个名为"topic-A"的 Kafka 主题中。

在下游系统 B 、C 、D 中,创建三个不同的消费者组,分别为"group-B"、"group-C"、"group-D",并订阅"topic-A"主题。

在消费者端,使用 Kafka 中的消息过滤器来过滤掉不需要的消息,只选择要处理的消息。可以使用 Kafka 中的消息键(key)来实现过滤。例如,下游系统 B 只想处理键(key)为"key-B"的消息,可以使用以下代码来实现:

java
Copy
// 创建 Kafka 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group-B");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> cOnsumer= new KafkaConsumer<>(props);

// 订阅"topic-A"主题
consumer.subscribe(Collections.singletonList("topic-A"));

// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.key().equals("key-B")) {
// 处理消息
}
}
consumer.commitSync();
}
```

为了确保消息只被消费一次,将消费者的 auto.offset.reset 属性设置为"earliest",并启用自动提交偏移量。这将确保消费者在启动时从最早可用的偏移量开始消费,以避免漏掉任何消息,并且将自动提交偏移量以确保每个消息只被消费一次。例如,可以使用以下代码来实现:

java
Copy
// 创建 Kafka 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group-B");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put"enable.auto.commit", "true");
KafkaConsumer<String, String> cOnsumer= new KafkaConsumer<>(props);
```
使用上述方案,上游系统 A 可以将消息发送到"topic-A"主题中,下游系统 B 、C 、D 可以使用 Kafka 消费者订阅该主题,并使用消息过滤器来过滤掉不需要的消息,只选择要处理的消息。自动提交偏移量将确保每个消息只被消费一次。





上面两条回复都是 chatgpt 回复的
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class KafkaStreamsExample {
public static void main(String[] args) {
// 设置 Kafka Streams 属性
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// 创建 Kafka Streams 实例
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("topic-A");

// 根据消息的 key 将消息路由到不同的分区中
stream.selectKey((key, value) -> key)
.through("topic-A-shuffle")
.groupByKey()
.foreach((key, value) -> {
// 处理消息
System.out.println("Processed message: " + value);
});

// 将处理后的消息发送到下游服务
stream.mapValues(value -> "processed " + value)
.to("topic-B", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();
}
}

在上面的代码中,首先使用 selectKey()方法将消息的 key 作为新的 key ,然后使用 through()方法将消息发送到一个新的 Topic 中,这个新的 Topic 会使用 Kafka 默认的分区策略将消息路由到不同的分区中。然后,我们使用 groupByKey()方法将同一个 key 的消息分组,确保每个消费者只消费自己需要的消息。最后,我们使用 foreach()方法处理分组后的消息,并使用 mapValues()方法将处理后的消息发送到下游服务。

需要注意的是,使用分流操作可能会导致数据倾斜(data skew)问题,因为某些 key 的消息可能比其他 key 的消息更频繁,从而导致某些分区比其他分区拥有更多的消息。为了解决这个问题,可以使用一些分区策略(partitioning strategy),例如随机分配、循环分配、哈希分配等。
@Super8 我创建的 KafkaConsumer 用到的 api 里面没有这两个参数的方法啊老哥,你说的这个 kafka 是哪个版本的啊
@wuwukai007 确实叼
1  2  3  
关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5490 人在线   最高记录 6679       Select Language
创意工作者们的社区
World is powered by solitude
VERSION: 3.9.8.5 40ms UTC 06:05 PVG 14:05 LAX 23:05 JFK 02:05
Do have faith in what you're doing.
ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86