请教一个并发设计问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
swqslwl
V2EX    Go 编程语言

请教一个并发设计问题

  • nbsp;
  •   swqslwl 2023-05-08 07:37:55 +08:00 via Android 4460 次点击
    这是一个创建于 887 天前的主题,其中的信息可能已经有所发展或是发生改变。
    我在做一个监测流量的项目。每秒会从数据源中获取 1w 条 json 格式的流量信息,我希望对这些流量进行分析,但是现在会出现丢数据的情况。

    我的做法是
    1.接受到数据后先传入 channelA
    2.启动一个协程循环从 A 中读取数据存入切片 B
    3.另起一个协程处理切片 B 的数据,同时在处理业务时利用 mutex 锁住 B

    实际调试中发现,mutex 的次数会影响数据的丢失量
    请问我这样设计是否有问题,是否会导致丢数据
    第 1 条附言    2023-05-08 11:05:04 +08:00
     var fmutex = sync.Mutex{} var A = make(chan string, 1048576) var B = make([]flowStatistic,0) func foo(){ go getData() go txData() go handleData() } //接受数据 func getData(){ for { ... data := conn.Read() A<-data } } func txData(){ for{ var fs flowStatistic err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析A传过来的数据 ... B = append(B,fs) //仅仅在这里会插入B } } func handleData(){ //这里每5秒钟对B中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住B,处理完后清空B中数据并解锁 for{ time.Sleep(5 * time.Second) fmutex.Lock() ... B = make([]flowStatistic,0) fmutex.Unlock() } } 

    有老哥提到了append B的时候应该锁住,我试了下发现实际上还是会丢数据。

    第 2 条附言    2023-05-08 11:32:11 +08:00
    var fmutex = sync.Mutex{} var A = make(chan string, 1048576) var B = make([]flowStatistic,0) func foo(){ go getData() go txData() go handleData() } //接受数据 func getData(){ for { ... addr, err := net.ResolveUnixAddr("unixgram", sock) conn, err := net.ListenUnixgram("unixgram", addr) data := conn.ReadFromUnix() A<-data } } func txData(){ for{ var fs flowStatistic err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析A传过来的数据 ... fmutex.Lock() B = append(B,fs) fmutex.Unlock() } } func handleData(){ //这里每5秒钟对B中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住B,处理完后清空B中数据并解锁 for{ time.Sleep(5 * time.Second) fmutex.Lock() ... B = make([]flowStatistic,0) fmutex.Unlock() } } 

    改进了一下

    39 条回复    2023-05-09 11:03:17 +08:00
    RH
        1
    RH  
       2023-05-08 07:58:19 +08:00
    需要 demo 才能分析,你描述的逻辑里有很多不确定性。
    lrh3321
        2
    lrh3321  
       2023-05-08 08:18:57 +08:00 via Android
    要 demo, 你两个协程都会写切片 B
    ox180
        3
    ox180  
       2023-05-08 08:44:29 +08:00
    @lrh3321 冒泡
    piaodazhu
        4
    piaodazhu  
       2023-05-08 08:54:24 +08:00   1
    我就提一个可能,切片 B 扩容,导致这种特殊情况:
    时刻 1 ,goroutine1 加锁,用 B=append(B, item)向切片 B 追加一个元素。刚好触发了扩容,B 的底层数组指针发生了转移。即,append 的参数 B 和返回值 B 中的 ptr 不同。
    时刻 2 紧接着时刻 1 ,goroutine2 拿到锁,这个时候在 goroutine2 看来,B 只是一个由(size,cap,ptr)构成结构体,它察觉不到 B 底层数组指针的变化,所以看不到 goroutine1 追加的数据。

    具体可以检查一下代码。
    xuboying
        5
    xuboying  
       2023-05-08 09:04:24 +08:00
    我感觉 sync.Pool 是干这个事情的。但是我一直没有掌握 sync.Pool 的正确用法,希望有大佬解释一下。
    ding2dong
        6
    ding2dong  
       2023-05-08 09:04:28 +08:00
    调大 A 的 bufsize
    ding2dong
        7
    ding2dong  
       2023-05-08 09:05:47 +08:00
    另外写入 B 的时候也要 mutex ,否则会被污染
    lysS
        8
    lysS  
       2023-05-08 09:07:39 +08:00
    “但是现在会出现丢数据的情况” 这是为什么呢?实际没有从数据源中获取到 1w 条?
    liangkang1436
        9
    liangkang1436  
       2023-05-08 09:08:47 +08:00 via Android
    有没有考虑用时序数据库来存储这些数据然后订阅? 1w/s ,这个数据量不小了
    fregie
        10
    fregie  
       2023-05-08 09:21:16 +08:00 via Android
    2 中存入 b 的过程也要锁。
    其实这里不用切片用队列比较合适
    ghost024
        11
    ghost024  
       2023-05-08 09:21:48 +08:00
    没看到你的代码,粗略的分析,你的第一个协程,从 channelA 中写到切片 b 也需要先获得 b 的 mutex 锁的,要不然,如果在锁 b 的时候你从 channelA 中获取数据,因为 b 锁住了,你写不进去就丢了
    WispZhan
        12
    WispZhan  
       2023-05-08 09:28:28 +08:00   1
    The Golden Rule - Don't Bloc the Event Loop or Coroutine.
    Martens
        13
    Martens  
       2023-05-08 09:32:34 +08:00
    写切片 B 和读切片 B 的时候都要加锁
    dode
        14
    dode  
       2023-05-08 09:40:47 +08:00
    先放 kafka ,再批量读出来处理呢
    8355
        15
    8355  
       2023-05-08 09:42:27 +08:00
    @ghost024 #11 +1 跟我理解的一样
    而且整个过程感觉效率并不高
    使用中间件哪怕 redis stream 整个代码都可以简单很多
    joesonw
        16
    joesonw  
       2023-05-08 09:52:00 +08:00 via iPhone
    能确定数量就用 channel ,不行的话用 linked list 。尽量避免用锁,传递锁的时候要传指针&。
    matrix1010
        17
    matrix1010  
       2023-05-08 10:09:41 +08:00
    丢数据算 bug 吗? 如果算请写个并发的单元测试并加上-race 测一下
    Nazz
        18
    Nazz  
       2023-05-08 10:19:36 +08:00
    数据量有点大, 建议使用 sync.Pool + 任务队列
    swqslwl
        19
    swqslwl  
    OP
       2023-05-08 11:21:56 +08:00
    @lrh3321
    @RH 老哥代码放上了
    swqslwl
        20
    swqslwl  
    OP
       2023-05-08 11:22:55 +08:00
    @ding2dong
    @fregie

    @ghost024
    @Martens
    @8355 对,这里确实是有问题。但是我加上后发现还是会丢
    leonshaw
        21
    leonshaw  
       2023-05-08 11:24:30 +08:00
    conn 是什么协议?
    把加放锁和处理数据的位置再标一下。
    rrfeng
        22
    rrfeng  
       2023-05-08 11:37:29 +08:00
    channel 里读 N 条出来直接处理掉,不要用切片缓存 /交互数据,就没这个问题了。

    这个切片设计的根本没什么道理。
    pkoukk
        23
    pkoukk  
       2023-05-08 11:39:19 +08:00
    没理由 append B 加锁了还能丢数据啊
    你可能丢数据的地方在 err := json.Unmarshal([]byte(<-A), &fs)
    oldshensheep
        24
    oldshensheep  
       2023-05-08 11:53:53 +08:00 via Android
    看你最终的代码感觉没什么问题。

    建议写个可以复现的 demo ,之前我也是出 bug ,感觉是用的第三方的库的问题。后来写了个可以复现的 demo ,发现是我代码的问题。

    我有很多莫名其妙的 bug 都是在写 demo 的时候发现代码真正错误的地方。

    比如说你这个代码,里面有网络连接,写数据库啥的,都给简化了,最终就是纯粹的逻辑代码,慢慢调试就发现问题了。
    而且也方便别人运行调试。
    ns09005264
        25
    ns09005264  
       2023-05-08 11:55:40 +08:00
    handleData 里加锁处理数据,但是 txData 里 append 却没有加锁,
    所以当 handleData 正在处理数据的时候,txData 还在往里面 append 数据,
    等 handleData 处理完,清空了 B ,txData 在 handleData 处理数据的过程中所添加的数据也就被清除了。
    没有给写入加锁只给读取加锁,等于没加锁。

    另外你想用 handleData 异步处理数据,但是如果在 txData 里给 append 加锁,其实就等于同步处理数据了,没什么意义。考虑在 txData 里对数据进行分块或按时间进行分块,再将分块的数据传给 handleData ,连锁都不用。
    8355
        26
    8355  
       2023-05-08 12:08:29 +08:00
    我的理解 handleData 这里完全没必要 也没必要用锁
    可以把写库代码直接放到 appendB = append(B,fs) 位置执行
    其次 db 本身是支持并发写库的,这里加锁意义不大,加了锁也都是在等待锁反而更慢
    leonshaw
        27
    leonshaw  
       2023-05-08 13:47:13 +08:00
    检查一下发送端的返回值。
    如上面所说的,这样实现并没有并发。如果处理能力大于上游,同步处理就行;如果小于上游,最终结果就是一个协程在处理,一个在等锁,一个在等 channel 缓冲空间。
    reliefe
        28
    reliefe  
       2023-05-08 14:25:00 +08:00
    这个问题根本应该在于多个线程操作同一个切片导致的,这里就会有很大不确定性。我问了 GPT-4 ,它给了很好的建议,把 B 换成 chan 而不是切片试试
    ```
    var A = make(chan string, 1048576)
    var B = make(chan flowStatistic, 1048576) // 使用带缓冲的 channel 而非切片
    ...

    func txData() {
    for {
    var fs flowStatistic
    err := json.Unmarshal([]byte(<-A), &fs)
    ...
    B <- fs // 将 fs 传递给 handleData
    }
    }

    func handleData() {
    var buffer []flowStatistic
    timer := time.NewTimer(5 * time.Second)

    for {
    select {
    case fs := <-B:
    buffer = append(buffer, fs)
    case <-timer.C:
    // 处理 buffer 中的数据
    ...
    buffer = make([]flowStatistic, 0)
    timer.Reset(5 * time.Second)
    }
    }
    }
    ```
    完整回复: https://flowus.cn/share/533684c0-2869-4507-8375-297103f09c77
    PS: 顺便一提在我的小站就可以随时用 GPT-4 了, liaobots.com
    quzard
        29
    quzard  
       2023-05-08 14:39:20 +08:00
    ```go
    var fmutex = sync.Mutex{}
    var A = make(chan string, 1048576)
    var B = sync.Pool{
    New: func() interface{} {
    return make([]flowStatistic, 0, 10000)
    },
    }

    func foo(){
    go getData()
    go txData()
    go handleData()
    }

    // 接收数据
    func getData(){
    for {
    // ...
    data := conn.Read()
    A<-data
    }
    }


    func txData() {
    for {
    var fs flowStatistic
    err := json.Unmarshal([]byte(<-A), &fs)
    // ...

    fmutex.Lock()
    currB := B.Get().([]flowStatistic)
    currB = append(currB, fs)
    B.Put(currB)
    fmutex.Unlock()
    }
    }

    func handleData() {
    for {
    time.Sleep(5 * time.Second)
    fmutex.Lock()
    currB := B.Get().([]flowStatistic)

    // 进行数据聚合和存储操作
    // ...
    // 清空 B
    currB = currB[:0]
    B.Put(currB)
    fmutex.Unlock()
    }
    }

    ```
    quzard
        30
    quzard  
       2023-05-08 14:40:52 +08:00
    @quzard #29 怎么发送后格式就乱了呢
    Anivial
        31
    Anivial  
       2023-05-08 15:21:25 +08:00
    感觉可以换一种思路,通过 time.Ticker 和 select 来代替锁保证缓存数据不会被互相抢占影响
    for {
    select {
    case data := <-A:
    ...
    B = append(B,fs)
    case t := <-ticker.C: // ticker := time.NewTicker(5 * time.Second)
    // 聚合处理数据
    process(B)

    // 清空 B 保留容量
    B = B[:0:cap(B)]
    }
    }
    piaodazhu
        32
    piaodazhu  
       2023-05-08 15:28:16 +08:00
    在楼主给的第二份代码其实也没有解决上面我提的那个问题,因为 goroutine1 在等待 goroutine2 放锁的时候,它栈里面的变量 B 就是旧的 B (底层指针不会变成你清空后新赋值的指针),所以 goroutine2 的清空操作 goroutine1 在这一次执行中是不可见的。

    试试这样修改?
    ```
    var fmutex = sync.Mutex{}
    var A = make(chan string, 1048576)
    var B_array = make([]flowStatistic,0) // <------
    var B = &B // <------

    func foo(){
    go getData()
    go txData()
    go handleData()
    }

    //接受数据
    func getData(){
    for {
    ...
    addr, err := net.ResolveUnixAddr("unixgram", sock)
    conn, err := net.ListenUnixgram("unixgram", addr)
    data := conn.ReadFromUnix()
    A<-data
    }
    }

    func txData(){
    for{
    var fs flowStatistic
    err := json.Unmarshal([]byte(<-A), &fs) //这里不断解析 A 传过来的数据
    ...
    fmutex.Lock()
    *B = append(*B,fs) // <------
    fmutex.Unlock()
    }
    }

    func handleData(){
    //这里每 5 秒钟对 B 中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住 B ,处理完后清空 B 中数据并解锁
    for{
    time.Sleep(5 * time.Second)
    fmutex.Lock()
    ...
    *B = make([]flowStatistic,0) // <------
    fmutex.Unlock()
    }
    }
    ```

    感觉大概率是这里的问题
    piaodazhu
        33
    piaodazhu  
       2023-05-08 15:43:16 +08:00
    @piaodazhu 不好意思看错了,B 不在栈上,上面这个请忽略。。。

    另外,在 handleData()里面,可以在加锁之后:
    fmutex.Lock()
    tmp := B
    B = make([]flowStatistic, 0)
    fmutex.Unlock()
    ... // processing tmp

    可以减少加锁时间,看不能减少或者消除数据丢失?
    PythonYXY
        34
    PythonYXY  
       2023-05-08 16:05:37 +08:00
    数据量也不小了,感觉还是上 Flink 吧,基于滚动窗口+RocksDB 状态后端做实时分析。
    picone
        35
    picone  
       2023-05-08 16:44:18 +08:00
    感觉代码没有问题,但是有些能优化的地方,可以改成无锁化
    ```go
    func txData() {
    ticker := time.NewTicker()
    for {
    select {
    case <- ticker.C:
    go func() // report your data
    B = make()
    case evt <- A:
    B = append(B, evet)
    case <-ctx.Done():
    return
    }
    }
    }
    ```
    liuxu
        36
    liuxu  
       2023-05-08 17:19:19 +08:00
    第二条附言的代码应该没问题了,golang 所有基础类型都不是线程安全的,txData()在不断自动扩容 B ,而 handleData()拿到的是旧指针,处理完旧指针的数据清空新 B 指针,导致了旧指针和新 B 指针这段时间 append()的数据丢失

    第一个附言等于没锁,handleData()内部没有线程安全问题,是单线程的,竞态出在 txData()的 append()和 handleData()的 B = make([]flowStatistic,0)之间
    ccde8259
        37
    ccde8259  
       2023-05-08 20:33:03 +08:00 via iPhone
    这种地方 mutex 写 slice 不如写 chan……
    doraf
        38
    doraf  
       2023-05-09 09:43:00 +08:00
    如果还有问题,能不能试试 atomic.Value 来存取 B 。
    txData 和 handleData 之间,能不能使用 chan 来传递 flowStatistic 。
    5 秒处理一次的话,在 txData 缓存数据,每 5 秒调用一次 go handleData 行不行(传递缓存数据给 handleData ),不知道语义还对不对。
    要不要考虑考虑 kafka 、flink 这种。
    xurh
        39
    xurh  
       2023-05-09 11:03:17 +08:00
    我之前做爬虫收集数据也遇到过类似的问题,把数据聚合进行批量插入减少 io 。

    我采用的 chan ,然后启动一个协程监听 chan ,当收集一定数量的数据或者时间满足,就把数据写入 db

    ```go

    type DBWriter[T any] struct {
    Size int
    Interval time.Duration
    done chan struct{}
    ch chan T
    insertDB func([]T, int)
    }

    func (w *DBWriter[T]) Start() {
    ticker := time.NewTicker(w.Interval)
    records := make([]T, 0, w.Size)
    insert := func() {
    if len(records) == 0 {
    return
    }
    w.insertDB(records, w.Size)
    records = make([]T, 0, w.Size)
    }

    for {
    select {
    case <-w.done:
    insert()
    return

    case <-ticker.C:
    insert()

    case data := <-w.ch:
    records = append(records, data)

    if len(records) == w.Size {
    insert()
    }
    }
    }
    }

    ```
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5566 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 37ms UTC 07:13 PVG 15:13 LAX 00:13 JFK 03:13
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86