
情况: 在 main()
from multiprocessing import Manager from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # 建立全局变量字典 GOLVAR = Manager().dict() # SQL 处理队列 SQLQueue = Manager().Queue() # 处理 SQL 队列功能,一个单独进程在运行 ProcessSQLQueue = futures.ProcessPoolExecutor(max_workers=1) # 启动 ProcessSQLQueueRet = ProcessSQLQueue.submit(procSQLcmd, SQLServerInfo, SQLQueue, GOLVAR) def AA(someData,sqlqueue): #略 XXX sqlCommand = XXX sqlqueue.put(sqlCommand) retrun def BB(someData,sqlqueue): #略,和 AA 结构一样,最后往队列里 put(sqlCommand) sqlqueue.put(sqlCommand) retrun def CC(someData,sqlqueue): #略 sqlqueue.put(sqlCommand) retrun # 开动制造 while True: # AA,BB,CC,DD 等处理函数按顺序,循环制造 SQL 语句,运行 AA,BB,CC,DD 等处理数据的函数处理上,其实几乎都不怎么占 CPU,I/O,最后向 Manager().Queue() put 入大量 SQL 语句 # 进去的 SQL 语句只有四种, # INSERT INTO tblname (x) VALUE (x); # INSERT INTO ... SELECT FROM XXX(最复杂也就嵌了 3 层); # UPDATE SET... # DELETE FROM... # SQLQueue 量高的时候 1 秒进 4 万条,低的时候,200 秒不进 1 条 time.sleep(100) # 处理 SQL 队列 def procSQLcmd(sqlinfo, sqlqueue, golvar): import time import datetime from dbutils.pooled_db import PooledDB import pymysql from concurrent.futures import ThreadPoolExecutor from MYFunc import SQLcmdData from myFunc import colrRedB from myFunc import TranDicttoSQLcmd from warnings import filterwarnings filterwarnings("error", category=pymysql.Warning) POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxcOnnections=600, # 连接池允许的最大连接数,0 和 None 表示不限制连接数 mincached=5, # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建 maxcached=5, # 链接池中最多闲置的链接,0 和 None 不限制 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待; False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None 表示无限制 setsession=[], # 开始会话前执行的命令列表。 ping=1, # ping MySQL 服务端,检查是否服务可用。 host=sqlinfo['ip'], port=sqlinfo['port'], user=sqlinfo['user'], password=sqlinfo['password'], database=sqlinfo['database'], charset=sqlinfo['charset'] ) DBcOnn= POOL.connection() def exeCu(conn, sqltext): try: cur = conn.cursor() cur.execute(sqltext) # cur.commit() cur.close() except pymysql.Warning as e: # print(f'#detial:{str(e)}\n',colrRedB(f"SQL ERR: {sqltext}")) sqlsqlcmd = sqltext.replace("'","\\'").replace('"','\\"') resOnsql= str(e).replace("'","\\'").replace('"','\\"') SQLErrorDict = {'sqlcmd': sqlsqlcmd, 'reson': resonsql, 'UpdateTime': datetime.datetime.now().replace(microsecOnd=0)} SQLCmd = TranDicttoSQLcmd('MYSQLERRLog', SQLErrorDict, None) SQLcmdData(sqlinfo, SQLCmd) return while True: if sqlqueue.qsize() == 0: # 开关 if golvar['stopsqlflag'] == False: time.sleep(2) break # SQL 语句执行,必须按队列 FIFO 顺序写入 while not sqlqueue.empty(): with ThreadPoolExecutor(1) as executor: executor.submit(exeCu, DBconn, sqlqueue.get()) DBconn.close() return 请教问题: 1 、这样的设计,写入每秒是 800 ~ 2500 条左右,虽然能做到对 MySQL 服务器写入浪涌的削峰填谷,但 SQLQueue 在峰值的时候,很容易一下就超了 17 万,太多的未写入,也影响了 main()的大循环 2 、从 MySQL 的服务器的性能判断来看, SHOW STATUS WHERE (Variable_name like '%thre%' OR Variable_name like '%conn%' OR Variable_name like '%cache%'); SHOW PROCESSLIST; MySQL 服务器其实跟睡着了没区别,瞬时链接数 3,4 个,没有感受到什么事情(是对 PooledDB 的用法有问题?) 3 、以前以为是服务器 I/O 的问题,换 8 核 16 线程 CPU 的机器,换上 SSD,内存 64GB,my.cnf 的 cache 调到 65%,都没有太大改善 4 、请教如何调整做法,从 SQLQueue 取出 SQL 语句怼服务器,可以拉满拉爆? 5 、小范围,小应用,上大工业架构的方式就算了,折腾不起。。。 1 BBCCBB 2021-08-28 20:07:18 +08:00 看你这个貌似没用到批量写入? 可以尝试批量 |
2 heyjei 2021-08-28 20:37:14 +08:00 代码没细看,但思路其实很简单,攒一波数据,到 1 千条或者 1 千条没到但 1 秒钟到了,再批量输入。如果批量写入的方案还是不满足,可以把数据写入到文件里,然后再定时调用 load data infile,load data infile 的写入速度可以达到磁盘的最大 IO 速度(前提是使用 MyISAM,并且没有索引) |
3 heyjei 2021-08-28 20:45:00 +08:00 还有一种改动最小的一种方式: 我们的 SQL 语句是 insert into table_name (column1, column2) values (value1, value2) 在下面的语句中,你不要把整个语句 put 进去,把 (value1, value2) put 进去 sqlqueue.put(sqlCommand) 在下面的语句,get 之后,不要立即执行,攒够 1000 个数据,或者 1 秒超时,然后拼接 SQL 成完整的语句并执行。 # SQL 语句执行,必须按队列 FIFO 顺序写入 while not sqlqueue.empty(): with ThreadPoolExecutor(1) as executor: executor.submit(exeCu, DBconn, sqlqueue.get()) |
4 uti6770werty OP |
5 F281M6Dh8DXpD1g2 2021-08-28 21:19:46 +08:00 你的 mysql 服务器有几个核心? 600 个 connection 太多了 |
6 uti6770werty OP 上面忘了说一个事情,就是就算是峰值 17W 条数据里也好,平时 5,6 千条也好,很多时候都队列的数据,是表里已经有的了,表的索引机制已经避免了重复插入数据,所以存表里的数据量其实不多的。。。 @liprais 8 核,16 线程,CentOS 6 + MySQL 5.5,按月份分表,最多的表数据不过 800 万 |
7 uti6770werty OP @liprais PooledDB 的 connection 很奇怪的,它这里 600 只是最高允许 600 而已,我现在只是一台数据处理电脑向 MySQL 写数据而已,SHOW PROCESSLIST 看,也就 7,8 条连接 |
8 noparking188 2021-08-29 09:01:51 +08:00 所以这段代码真是生产上用的嘛? 看这段代码是用线程做并发,线程开销比较大,这种 IO 密集任务不大行,可以换协成程测下效果,更轻量级,几年前写过类似脚本用的 gevent,ayncio 我没研究过,楼主也可以看看,个人感觉主要是想办法提高并发处理能力吧,和 MySQL 没啥关系 说的不对还望指正 |
9 noparking188 2021-08-29 09:15:39 +08:00 个人感觉比较简单的做法就是换 redis 做队列,生成 SQL 单独一个程序跑,消费队列数据发 SQL 请求的一个程序,要提高并发起简单地多个进程就行了,多进程+多线程(协程)的方式,用 supervisor 托管更好 当然这样的方案是建立在示例代码用线程并发造成网络 IO 请求瓶颈的猜测上 |
10 todd7zhang 2021-08-30 10:16:35 +08:00 没动啊,既然严格要求 FIFO, 为啥去执行 SQL 的时候,还要开 ThreadPoolExecutor ?这种情况,从 sqlqueue 拿出来虽然是顺序的,但是感觉执行过去就可能乱序啊,毕竟 while 里面在不停的开 executor 。 我感觉都不用 POOL, 就一个 conn 不停的执行 sql 就好了? with conn.cursor() as cr: while not sqlqueue.empty(): cr.execute(sqlqueue.get()) |