一些多进程多线程对文件操作的一些总结 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
firejoke
V2EX    Python

一些多进程多线程对文件操作的一些总结

  •  
  •   firejoke 2018-05-12 22:16:10 +08:00 3902 次点击
    这是一个创建于 2721 天前的主题,其中的信息可能已经有所发展或是发生改变。
    读写文件,多进程和多线程

    # coding:utf-8
    """
    把大文件分块
    big_file.txt 是一个 500M 的 5 位数的乱序文档
    多线程并没有提升速度
    """

    import time
    txtfile = ''
    import threading
    def txtmap(txtup):
    with open('big_file.txt','r') as f:
    i = 0
    while i < 100000:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    # txtmap(txtfile)
    start = time.time()
    for i in range(100):
    txti = threading.Thread(target = txtmap(txtfile))
    txti.start()
    txti.join()
    print(time.time() - start)
    def txtmap2(txtup):
    with open('big_file.txt','r') as f:
    i = 0
    while i < 1000000:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    start = time.time()
    for i in range(10):
    txtmap2(txtfile)
    print(time.time() - start)


    多进程和队列初试
    import time
    from multiprocessing import Process
    from multiprocessing import Queue
    num = 0
    qnum = Queue()
    qnum.put(num)
    def testnum(num):
    num += 1
    qnum.put(num)
    for i in range(10):
    p = Process(target = testnum,args = (qnum.get(),))
    p.start()
    p.join()
    # testnum(num)
    print(qnum.get(),qnum.empty())
    在这里,qnum 属于实例化对象,不需要用 global 标记



    多次测试,发现多进程加队列,必须把文件指针位置也放进去,不然下一个读取位置就会乱跳
    with open('big_file.txt','r') as f:
    q.put ((txtfile3,f.tell()))
    def txtmap(qget):
    txtup = qget[0]
    i = 0
    f.seek(qget[1])
    while i < 10:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    q.put((txtup,f.tell()))
    start = time.time()
    for i in range(10):
    txtp2i = Process(target = txtmap,args =(q.get(),))
    txtp2i.start()
    txtp2i.join()
    print('多进程加队列',time.time() - start,'\n',q.get())

    以及这个 args 的赋值真是烦人,明明从 q.get()出来的就是元组,它非要你=后面必须是一个元组的形式才行


    # coding:utf-8
    """
    把大文件分块
    big_file.txt 是一个 500M 的 5 位数的乱序文档
    多进加队列速度 < 多进程全局变量 < 多线程加队列 < 多线程加全局变量 < 普通全局变量
    多进程加队列不稳定 多进程因为进程间通讯必须借助队列 Queue 或者管道 pipe 否则改变不了全局变量
    """

    import os
    import time
    from multiprocessing import Process
    from multiprocessing import Queue
    import threading
    txtfile = ''
    txtfile2 = ''
    txtfile3 = ''
    txtfile4 = ''
    q = Queue()

    #因为本子是 4 核的,所以我只创建了 4 个进程,python 的 GIL 的限制决定 4 核只能同时跑 4 个 python 进程,多了就要等待
    with open('big_file.txt','r') as f:
    def txtmap():
    i = 0
    global txtfile
    while i < 25:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtfile += txt
    print(txt)
    print (os.getpid ())
    print(txtfile)
    start = time.time()
    for i in range(4):
    txtpi = Process(target = txtmap)
    txtpi.start()
    txtpi.join()
    print('多进程全局变量',time.time() - start,'\n',txtfile)
    if txtfile:
    print(True)
    else:
    print(False)

    with open('big_file.txt','r') as f:
    def txtmap():
    i = 0
    global txtfile2
    while i < 10:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtfile2 += txt
    start = time.time()
    for i in range(10):
    txtti = threading.Thread(target = txtmap)
    txtti.start()
    txtti.join()
    print('多线程全局变量',time.time() - start,'\n',txtfile2)


    with open('big_file.txt','r') as f:
    q.put ((txtfile3,f.tell()))
    def txtmap(qget):
    txtup = qget[0]
    i = 0
    f.seek(qget[1])
    while i < 25:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    print (os.getpid ())
    q.put((txtup,f.tell()))
    start = time.time()
    for i in range(4):
    txtp2i = Process(target = txtmap,args =(q.get(),))
    txtp2i.start()
    txtp2i.join()
    print('多进程加队列',time.time() - start,'\n',q.get()[0])

    #因为队列 q 内的消息已被取完,所以再放进去一次,不然会一直处于阻塞状态等待插入消息
    q.put(txtfile3)
    with open('big_file.txt','r') as f:
    def txtmap(txtup):
    i = 0
    while i < 10:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    q.put(txtup)
    start = time.time()
    for i in range(10):
    txtt2i = threading.Thread(target txtmap,args = (q.get(),))
    txtt2i.start()
    txtt2i.join()
    print('多线程加队列',time.time() - start,'\n',q.get())

    with open('big_file.txt','r') as f:
    def txtmap2():
    i = 0
    global txtfile4
    while i < 10:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtfile4+= txt
    start = time.time()
    for i in range(10):
    txtmap2()
    print('普通全局变量',time.time() - start,'\n',txtfile4)

    #os.path.geisize 返回的文件大小就是 seek 指针的最后一个位置
    print(os.path.getsize('big_file.txt'))

    with open('big_file.txt','r') as f:
    print(f.seek(0,2))



    #终于看到了多进程同时进行,哦呼!甚是欢喜!而且和文件的指针并不会冲突
    from multiprocessing import Process
    import time
    with open('test.txt', 'r') as f:
    def readseek(num):
    f.seek(num)
    print(time.time(),f.read(10))
    time.sleep(10)
    p1 = Process(target = readseek, args = (20,))
    p2 = Process(target = readseek, args = (60,))
    p3 = Process(target = readseek, args = (120,))
    p4 = Process(target = readseek, args = (160,))
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p1.join()
    p2.join()
    p3.join()
    p4.join()
    print(time.time())
    第 1 条附言    2018-05-14 08:20:23 +08:00
    放到了 GitHub 上,看着会舒服很多
    第 2 条附言    2018-05-17 14:12:15 +08:00
    下面的链接放错了
    是这个 https://github.com/firejoke/python_test.git
    16 条回复    2018-05-18 08:47:57 +08:00
    firejoke
        1
    firejoke  
    OP
       2018-05-12 22:37:45 +08:00
    第一次发帖,改了大概 3 次后发现不能更改了......
    想再多进程全局变量那里加一个:多进程并不能改变全局变量,只能通过队列 Queue,或者管道 Pipe
    testsec
        2
    testsec  
       2018-05-12 22:39:51 +08:00 via iPhone
    Python 多线程只适用 io 密集场景
    firejoke
        3
    firejoke  
    OP
       2018-05-12 22:59:48 +08:00
    @testsec 是的,但是同一时刻也是只能运行一个线程,都是在等待,所以我觉得,要是文件真的超过 1G 的那种,仍然要用和核心数一样多的进程数,这样就可以同时运行和核心数一样多的线程
    neoblackcap
        4
    neoblackcap  
       2018-05-12 23:11:56 +08:00
    @firejoke 我建议你贴到外面的网站上,比如 github gist。你贴出来的代码实在影响大家的交流,缩进丢失,非常难以理解。
    我觉得你说的多线程没有用大概是实现的一些地方不是那么好,有其他因素抵消了多线程的优势。IO 本身是会释放 GIL,跟是不是多进程没关系
    lukefan
        5
    lukefan  
       2018-05-12 23:12:23 +08:00   1
    Python 的 IO 操作以及部分在底层释放 GIL 锁的类库并不受 GIL 限制.

    一般起 CPU 核数的进程针对的是 CPU 密集型操作, 并不是 IO 密集型操作.

    本地的文件 IO 一般情况下搞个单独的消费者线 /进程就够了, 多了性能反而差.

    一般爆协程、线程处理的 IO 密集型操作的 IO 主要是 socket、PIPE 这类, 并不是文件 IO.
    laqow
        6
    laqow  
       2018-05-12 23:14:02 +08:00 via Android
    感觉 python 的文本处理性能就是屎,特别是 utf8 这种变长编码的,既然不用管内容的话按二进制文件处理会快很多
    Applenice
        7
    Applenice  
       2018-05-12 23:58:09 +08:00
    像楼上们说的 Python 线程还是在 I/O 密集型应用上用的多,另外没有缩进看起来真的难受....0.0
    wspsxing
        8
    wspsxing  
       2018-05-13 04:50:31 +08:00 via Android
    依赖缩进的代码不要乱贴,如果不支持代码块就放到 github 之类,gist 在国内被 q 了。

    另外,不要用随机读写来加速磁盘访问。
    firejoke
        9
    firejoke  
    OP
       2018-05-13 12:51:35 +08:00
    @neoblackcap 第一次发帖,改了几次发现还是改不了,我贴到 GitHub 看下
    firejoke
        10
    firejoke  
    OP
       2018-05-13 12:53:13 +08:00
    @lukefan 我设想的是把大文件分四个块分别给四个进程处理,这样不就可以缩短时间吗?
    firejoke
        11
    firejoke  
    OP
       2018-05-13 12:53:47 +08:00
    @laqow 这个我没想过,我试试用 pickle 弄一下
        12
    firejoke  
    OP
       2018-05-13 12:54:39 +08:00
    @Applenice 第一贴 第一贴 第一贴,谅解一下下~我来发到 GitHub
    firejoke
        13
    firejoke  
    OP
       2018-05-13 12:57:52 +08:00
    @wspsxing 随机读写?读倒不是随机,就是命名的时候用的随机
    RicardoScofileld
        14
    RicardoScofileld  
       2018-05-14 13:24:29 +08:00
    可以试试 Dpark
    firejoke
        15
    firejoke  
    OP
       2018-05-17 14:02:33 +08:00
    firejoke
        16
    firejoke  
    OP
       2018-05-18 08:47:57 +08:00
    @RicardoScofileld 第一次知道这个框架,好像很好用,我仔细看看
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5395 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 01:33 PVG 09:33 JFK 21:33
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86