请教,从 FIFO 队列中取出 SQL 语句,写入到 MySQL,如何可以拉满拉爆性能? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
uti6770werty
V2EX    Python

请教,从 FIFO 队列中取出 SQL 语句,写入到 MySQL,如何可以拉满拉爆性能?

  •  
  •   uti6770werty 2021-08-28 19:17:06 +08:00 2999 次点击
    这是一个创建于 1595 天前的主题,其中的信息可能已经有所发展或是发生改变。

    情况: 在 main()

     from multiprocessing import Manager from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # 建立全局变量字典 GOLVAR = Manager().dict() # SQL 处理队列 SQLQueue = Manager().Queue() # 处理 SQL 队列功能,一个单独进程在运行 ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1) # 启动 ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd, SQLServerInfo, SQLQueue, GOLVAR) def AA(someData,sqlqueue): #略 XXX sqlCommand = XXX sqlqueue.put(sqlCommand) retrun def BB(someData,sqlqueue): #略,和 AA 结构一样,最后往队列里 put(sqlCommand) sqlqueue.put(sqlCommand) retrun def CC(someData,sqlqueue): #略 sqlqueue.put(sqlCommand) retrun # 开动制造 while True: # AA,BB,CC,DD 等处理函数按顺序,循环制造 SQL 语句,运行 AA,BB,CC,DD 等处理数据的函数处理上,其实几乎都不怎么占 CPU,I/O,最后向 Manager().Queue() put 入大量 SQL 语句 # 进去的 SQL 语句只有四种, # INSERT INTO tblname (x) VALUE (x); # INSERT INTO ... SELECT FROM XXX(最复杂也就嵌了 3 层); # UPDATE SET... # DELETE FROM... # SQLQueue 量高的时候 1 秒进 4 万条,低的时候,200 秒不进 1 条 time.sleep(100) 
    # 处理 SQL 队列 def procSQLcmd(sqlinfo, sqlqueue, golvar): import time import datetime from dbutils.pooled_db import PooledDB import pymysql from concurrent.futures import ThreadPoolExecutor from MYFunc import SQLcmdData from myFunc import colrRedB from myFunc import TranDicttoSQLcmd from warnings import filterwarnings filterwarnings("error", category=pymysql.Warning) POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxcOnnections=600, # 连接池允许的最大连接数,0 和 None 表示不限制连接数 mincached=5, # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建 maxcached=5, # 链接池中最多闲置的链接,0 和 None 不限制 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None 表示无限制 setsession=[], # 开始会话前执行的命令列表。 ping=1, # ping MySQL 服务端,检查是否服务可用。 host=sqlinfo['ip'], port=sqlinfo['port'], user=sqlinfo['user'], password=sqlinfo['password'], database=sqlinfo['database'], charset=sqlinfo['charset'] ) DBcOnn= POOL.connection() def exeCu(conn, sqltext): try: cur = conn.cursor() cur.execute(sqltext) # cur.commit() cur.close() except pymysql.Warning as e: # print(f'#detial:{str(e)}\n',colrRedB(f"SQL ERR: {sqltext}")) sqlsqlcmd = sqltext.replace("'","\\'").replace('"','\\"') resOnsql= str(e).replace("'","\\'").replace('"','\\"') SQLErrorDict = {'sqlcmd': sqlsqlcmd, 'reson': resonsql, 'UpdateTime': datetime.datetime.now().replace(microsecOnd=0)} SQLCmd = TranDicttoSQLcmd('MYSQLERRLog', SQLErrorDict, None) SQLcmdData(sqlinfo, SQLCmd) return while True: if sqlqueue.qsize() == 0: # 开关 if golvar['stopsqlflag'] == False: time.sleep(2) break # SQL 语句执行,必须按队列 FIFO 顺序写入 while not sqlqueue.empty(): with ThreadPoolExecutor(1) as executor: executor.submit(exeCu, DBconn, sqlqueue.get()) DBconn.close() return 
    请教问题: 1 、这样的设计,写入每秒是 800 ~ 2500 条左右,虽然能做到对 MySQL 服务器写入浪涌的削峰填谷,但 SQLQueue 在峰值的时候,很容易一下就超了 17 万,太多的未写入,也影响了 main()的大循环 2 、从 MySQL 的服务器的性能判断来看, SHOW STATUS WHERE (Variable_name like '%thre%' OR Variable_name like '%conn%' OR Variable_name like '%cache%'); SHOW PROCESSLIST; MySQL 服务器其实跟睡着了没区别,瞬时链接数 3,4 个,没有感受到什么事情(是对 PooledDB 的用法有问题?) 3 、以前以为是服务器 I/O 的问题,换 8 核 16 线程 CPU 的机器,换上 SSD,内存 64GB,my.cnf 的 cache 调到 65%,都没有太大改善 4 、请教如何调整做法,从 SQLQueue 取出 SQL 语句怼服务器,可以拉满拉爆? 5 、小范围,小应用,上大工业架构的方式就算了,折腾不起。。。 
    10 条回复    2021-08-30 10:16:35 +08:00
    BBCCBB
        1
    BBCCBB  
       2021-08-28 20:07:18 +08:00
    看你这个貌似没用到批量写入? 可以尝试批量
    heyjei
        2
    heyjei  
       2021-08-28 20:37:14 +08:00
    代码没细看,但思路其实很简单,攒一波数据,到 1 千条或者 1 千条没到但 1 秒钟到了,再批量输入。如果批量写入的方案还是不满足,可以把数据写入到文件里,然后再定时调用 load data infile,load data infile 的写入速度可以达到磁盘的最大 IO 速度(前提是使用 MyISAM,并且没有索引)
    heyjei
        3
    heyjei  
       2021-08-28 20:45:00 +08:00
    还有一种改动最小的一种方式:

    我们的 SQL 语句是 insert into table_name (column1, column2) values (value1, value2)

    在下面的语句中,你不要把整个语句 put 进去,把 (value1, value2) put 进去
    sqlqueue.put(sqlCommand)

    在下面的语句,get 之后,不要立即执行,攒够 1000 个数据,或者 1 秒超时,然后拼接 SQL 成完整的语句并执行。
    # SQL 语句执行,必须按队列 FIFO 顺序写入
    while not sqlqueue.empty():
    with ThreadPoolExecutor(1) as executor:
    executor.submit(exeCu, DBconn, sqlqueue.get())
    uti6770werty
        4
    uti6770werty  
    OP
       2021-08-28 21:10:00 +08:00
    @BBCCBB

    @heyjei

    队列里,不全是 INSERT INTO 。。,也许还有偶然一两个 ALERT 也不一定,要按 FIFO 顺序,所以就不好套批量模板了。。。

    by the way,有试过 sqlcmd + ";" + sqlcmd,这样操作过,但似乎 PooledDB.conn.cursor().excute 不支持这种多语句组装命令执行? 前几天有试过,当时没成功,没研究下去,后面去研究如何高并发去了,结果更迷糊,就这个场合用,现有的高并发非常折腾。。。
    F281M6Dh8DXpD1g2
        5
    F281M6Dh8DXpD1g2  
       2021-08-28 21:19:46 +08:00
    你的 mysql 服务器有几个核心?
    600 个 connection 太多了
    uti6770werty
        6
    uti6770werty  
    OP
       2021-08-28 21:37:09 +08:00
    上面忘了说一个事情,就是就算是峰值 17W 条数据里也好,平时 5,6 千条也好,很多时候都队列的数据,是表里已经有的了,表的索引机制已经避免了重复插入数据,所以存表里的数据量其实不多的。。。

    @liprais 8 核,16 线程,CentOS 6 + MySQL 5.5,按月份分表,最多的表数据不过 800 万
    uti6770werty
        7
    uti6770werty  
    OP
       2021-08-28 21:40:36 +08:00
    @liprais PooledDB 的 connection 很奇怪的,它这里 600 只是最高允许 600 而已,我现在只是一台数据处理电脑向 MySQL 写数据而已,SHOW PROCESSLIST 看,也就 7,8 条连接
    noparking188
        8
    noparking188  
       2021-08-29 09:01:51 +08:00
    所以这段代码真是生产上用的嘛?
    看这段代码是用线程做并发,线程开销比较大,这种 IO 密集任务不大行,可以换协成程测下效果,更轻量级,几年前写过类似脚本用的 gevent,ayncio 我没研究过,楼主也可以看看,个人感觉主要是想办法提高并发处理能力吧,和 MySQL 没啥关系
    说的不对还望指正
    noparking188
        9
    noparking188  
       2021-08-29 09:15:39 +08:00
    个人感觉比较简单的做法就是换 redis 做队列,生成 SQL 单独一个程序跑,消费队列数据发 SQL 请求的一个程序,要提高并发起简单地多个进程就行了,多进程+多线程(协程)的方式,用 supervisor 托管更好
    当然这样的方案是建立在示例代码用线程并发造成网络 IO 请求瓶颈的猜测上
    todd7zhang
        10
    todd7zhang  
       2021-08-30 10:16:35 +08:00
    没动啊,既然严格要求 FIFO, 为啥去执行 SQL 的时候,还要开 ThreadPoolExecutor ?这种情况,从 sqlqueue 拿出来虽然是顺序的,但是感觉执行过去就可能乱序啊,毕竟 while 里面在不停的开 executor 。

    我感觉都不用 POOL, 就一个 conn 不停的执行 sql 就好了?
    with conn.cursor() as cr:
    while not sqlqueue.empty():
    cr.execute(sqlqueue.get())
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2792 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 26ms UTC 02:48 PVG 10:48 LAX 18:48 JFK 21:48
    Do have faith in what you'e doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86