生产者消费者模型,在生产和消费同时进行的状态下,如何判断消费者应该结束(没有产品可以消费) - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Hopetree
V2EX    Python

生产者消费者模型,在生产和消费同时进行的状态下,如何判断消费者应该结束(没有产品可以消费)

  •  
  •   Hopetree
    Hopetree 2018-05-10 10:59:17 +08:00 5270 次点击
    这是一个创建于 2709 天前的主题,其中的信息可能已经有所发展或是发生改变。

    说一下我的使用场景:

    首先,我有一个列表,放了很多 URL,用来测试请求的,然后使用 queue 队列把它们添加到一个队列 q 里面备用(这在生产者消费者模型里面算是原材料吧)

    然后,设置一个生产者,它其实也是一个消费者,消费原材料,然后把请求的结果添加到另一个队列 w 中,到这一步我觉得还没有什么问题,因为原材料的数量是一开始就订好了,所以当生产者消费的时候,我判断一下原材料是不是为空就可以退出生产,具体的代码是:

     def run(self): while not self.q.empty(): url = self.q.get() do something 

    现在,生产者一直在生产,直到发现原材料为空,就停止了 while 循环

    我又有一个消费者,它会消费生产者产生的请求,去解析请求的信息,但是问题来了,如果生产者和消费者同时开启线程的时候,消费者要怎么判断生产者已经没有生产产品了,该退出 while 循环了呢?我的消费者的 run 是这样的:

     while True: if self.w.empty(): pass else: info = self.w.get() do something 

    很明显,消费者跟生产者不同,它不能去判断当 w 队列为空的时候跳出循环,因为 w 为空有可能只是消费者把当前的产品消费完了,生产者可能还在继续生产,需要等一下而已

    所以,现在的问题是,我如果设置多个生产者和消费者线程的话,最后会完成所有的任务,但是消费者线程一直在 while 循环中,不会退出(因为没有退出的条件)

    21 条回复    2018-05-10 17:29:23 +08:00
    troycheng
        1
    troycheng  
       2018-05-10 11:07:51 +08:00   1
    消费者没有内容消费的时候,应该阻塞等待,你这样写就变成了死循环的轮训。至于阻塞等待如何实现,方法很多,教科书上的概念叫 PV 原语,具体实现可以是锁,信号量等手段
    Hopetree
        2
    Hopetree  
    OP
       2018-05-10 11:21:00 +08:00
    @troycheng 我看到的例子要么就是设定了指定的生产数量,要么就是跟我的一样,一直在生产和消费,并没有看到可以判断生产者已经停止生产的例子,如果方便,麻烦给我发个例子我看看,谢谢
    jfry
        3
    jfry  
       2018-05-10 11:36:48 +08:00
    通过生产者和消费者共享一个变量即可,这个模型之前我实现过一个,你可以看一下这里:https://github.com/toaco/carry/blob/6d46dd65f3539ba9767281637bb59ad96b0a4b97/carry/task.py#L381
    381 行是生产者生产结束之后的逻辑
    404 行是消费者等待的时候处理的逻辑
    这里也实现了生产者发生异常后通知消费者让消费者自行推出的逻辑.有需要的话也可以参考一下.
    CSM
        4
    CSM  
       2018-05-10 11:40:47 +08:00 via Android
    赞同一楼。
    生产者可以在生产结束后往队列里添加一个特殊的标志,消费者收到后就可以退出了。
    jfry
        5
    jfry  
       2018-05-10 11:45:43 +08:00
    @CSM 好想法,不过该方法有个局限是无法让消费者立刻收到消息.比如生产者有紧急的信息需要通知给消费者,但是队列里面仍然有许多未消费的内容,这样消费者只能在一段时间后才能收到紧急信息.
    troycheng
        6
    troycheng  
       2018-05-10 11:59:36 +08:00
    写 c++的,不过随手搜了一下,semphore,mutex 一类 IPC 的东西,python 里也有,https://blog.csdn.net/u014595589/article/details/53288168,可以看看是否有所帮助
    CSM
        7
    CSM  
       2018-05-10 12:17:25 +08:00 via Android   1
    @jfry 是有这个问题。还有就是需要消费者在拿到标志之后在退出前需要将标志重新加入队列,以通知其他正在等待的消费者退出。
    hcymk2
        8
    hcymk2  
       2018-05-10 12:19:49 +08:00
    @CSM
    消费者要这么做才能在拿到标志之后在退出前将标志重新加入队列?
    Wicked
        9
    Wicked  
       2018-05-10 12:34:10 +08:00 via iPhone   2
    @jfry 这些上层逻辑都可以通过 producer 和 consumer 之间的通讯方式实现
    最简单的,用个共享的 atomic 标记,在 costumer 循环里面每处理完一条请求就检查一下这个标记,这就是你所说的“紧急消息”
    本质上就是构建异步通讯模型,开源解决方案都很多,asio,zeromq,了解一下
    CSM
        10
    CSM  
       2018-05-10 12:34:20 +08:00 via Android
    @hcymk2 就和生产者往队列添加的方法一样啊 q.put
    xuxueli
        11
    xuxueli  
       2018-05-10 12:52:39 +08:00 via Android
    我有一款爬虫框架,异步多线程方式并行采集网页。遇到的问题和你一模一样。

    这个问题我解决了,可以通过判断采集线程状态与队列状态来判断。源码如下:

    http://www.xuxueli.com/xxl-crawler/#/
    CSM
        12
    CSM  
       2018-05-10 12:59:58 +08:00
    Hopetree
        13
    Hopetree  
    OP
       2018-05-10 14:17:48 +08:00
    @CSM 你发的我打不开,不过我在 so 上面找了一个例子,这里的做法是在消费者线程 start()之后在队列里面插入一个元素,这个元素就是用来判断的,不过我有点没弄清楚这里为什么可以这样操作,可能是我对 python 的线程的阻塞不够清晰吧,还在找相关资料理解
    Hopetree
        14
    Hopetree  
    OP
       2018-05-10 14:18:10 +08:00
    CSM
        15
    CSM  
       2018-05-10 14:28:49 +08:00 via Android
    @Hopetree gist 需要梯子。
    你发的链接里那个回答是在生产者结束之后( producer.join()之后)才插入那个 None 作为标志的。
    我估计你是没理解队列那个模块,如果 queue 为空,queue.get()是会阻塞的,直到有新的数据加入才会返回。
    ilucio
        16
    ilucio  
       2018-05-10 15:36:13 +08:00
    生产者生产完了后给消费者发送一个 None,然后消费者者会退出执行了
    ilucio
        17
    ilucio  
       2018-05-10 15:38:11 +08:00
    消费者那里的代码要改一下:
    while True:
    if self.w.empty():
    pass
    else:
    info = self.w.get()
    if info is not None:
    do something
    bany
        18
    bany  
       2018-05-10 15:56:47 +08:00
    一个通用的解决方法是在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行。详细可以参考《 python cookbook 》第十二章 21.3 节 线程间通信。
    Hopetree
        19
    Hopetree  
    OP
       2018-05-10 16:16:55 +08:00
    @ilucio
    @bany
    感谢,我现在是使用的这种方式,只不过没有搞懂为什么是这样使用,目前还在理解
    lyc1116
        20
    lyc1116  
       2018-05-10 17:24:11 +08:00
    毒丸对象了解一下
    q397064399
        21
    q397064399  
       2018-05-10 17:29:23 +08:00
    @Hopetree #19 搜索一下 生产者 投毒
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1098 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 23:16 PVG 07:16 LAX 16:16 JFK 19:16
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86