34. 自动化测试开发之使用mysql异步连接池实现mysql数据库操作
Python自动化测试之数据库操作封装深度解析
一、核心类结构设计
1.1 DataBase基类实现
class DataBase:
def __init__(self, database: str = 'mysql', autocommit: bool = True, *args, **kwargs):
# 参数存储
self._args, self._kwargs = args, kwargs
self._autocommit = autocommit # ✅ 自动提交设置
if database.lower() == 'mysql':
self._database = create_pool # 🎯 aiomysql异步连接池
self._ini = INIReader(DATABASE_INI_PATH).data # 📖 配置读取
self._loop = asyncio.new_event_loop() # 🔄 创建独立事件循环
asyncio.set_event_loop(self._loop)
self._mysql_pool = self.mysql_pool # ⚡ 立即初始化
elif database.lower() == 'oracle':
self._database = SessionPool # 🗃️ cx_Oracle连接池
self._ini = INIReader(DATABASE_INI_PATH, section='oracle').data
self._oracle_pool = self.oracle_pool # ⚡ 立即初始化
参数配置说明表
参数 | 类型 | 默认值 | 作用域 |
---|---|---|---|
host | str | localhost | MySQL服务器地址 |
port | int | 3306 | 服务端口 |
user | str | root | 数据库用户 |
maxsize | int | 20 | 最大连接数 |
minsize | int | 5 | 最小连接数 |
1.2 MysqlClient子类扩展
class MysqlClient(DataBase):
@classmethod
def setup(cls, *args, **kwargs):
return cls(*args, **kwargs) # 🏭 工厂方法创建实例
async def _select(self, sql: str, param: tuple = (), rows: [int, None] = 1):
async with self._mysql_pool.acquire() as conn: # 🛡️ 连接自动管理
async with conn.cursor(DictCursor) as cur: # 📋 字典游标
await cur.execute(sql.replace('?', '%s'), param) # ⚠️ SQL参数化
return await (cur.fetchmany(rows) if rows else cur.fetchall())
def select(self, *args, **kwargs):
return self._loop.run_until_complete(self._select(*args, **kwargs)) # 🔄 同步化执行
async def _execute(self, sql: str, param: tuple = ()):
async with self._mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql.replace('?', '%s'), param)
return cur.rowcount # 📊 返回影响行数
def execute(self, *args, **kwargs):
return self._loop.run_until_complete(self._execute(*args, **kwargs))
二、实战操作示例
2.1 数据库列表查询
# 初始化客户端
mysql = MysqlClient.setup()
# 查询所有数据库
databases = mysql.select(r'SHOW DATABASES;', rows=None)
print("数据库列表:")
for db in databases:
print(f" - {db['Database']}")
# 示例输出:
"""
数据库列表:
- information_schema
- myemployees
- mysql
- performance_schema
- sys
"""
2.2 条件查询演示
# 带参数查询职位信息
jobs = mysql.select(
r'SELECT * FROM myemployees.jobs WHERE JOB_ID = ?',
('AC_ACCOUNT',),
rows=None
)
print("职位详情:")
print(f"职位ID:{jobs[0]['JOB_ID']}")
print(f"职位名称:{jobs[0]['JOB_TITLE']}")
print(f"薪资范围:{jobs[0]['MIN_SALARY']}-{jobs[0]['MAX_SALARY']}")
# 示例输出:
"""
职位详情:
职位ID:AC_ACCOUNT
职位名称:Public Accountant
薪资范围:4200-9000
"""
2.3 数据更新操作
# 更新职位名称
affected_rows = mysql.execute(
r'UPDATE myemployees.jobs SET JOB_TITLE = ? WHERE JOB_ID = ?',
('高级会计师', 'AC_ACCOUNT')
)
print(f"更新影响行数:{affected_rows}")
# 验证更新结果
updated_job = mysql.select(
r'SELECT JOB_TITLE FROM myemployees.jobs WHERE JOB_ID = ?',
('AC_ACCOUNT',)
)
print(f"更新后职位名称:{updated_job[0]['JOB_TITLE']}")
# 示例输出:
"""
更新影响行数:1
更新后职位名称:高级会计师
"""
三、企业级优化建议
3.1 现存问题清单
问题描述 | 风险等级 | 改进方案 |
---|---|---|
SQL参数替换不安全 | 高 | 使用原生参数化查询 |
缺乏事务管理机制 | 高 | 添加事务上下文管理器 |
未处理连接超时 | 中 | 添加connect_timeout参数 |
同步异步混合使用 | 中 | 统一异步协程实现 |
未实现连接重试机制 | 低 | 集成tenacity重试库 |
3.2 增强型实现方案
from contextlib import asynccontextmanager
from tenacity import retry, stop_after_attempt
class SafeMysqlClient(MysqlClient):
@retry(stop=stop_after_attempt(3))
async def _execute(self, sql: str, param: tuple = ()):
async with self._mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql, param) # ✅ 使用原生参数化
return cur.rowcount
@asynccontextmanager
async def transaction(self):
async with self._mysql_pool.acquire() as conn:
async with conn.begin(): # 🛡️ 事务管理
yield conn
# 使用示例
async def safe_update():
async with SafeMysqlClient.setup().transaction() as conn:
await conn.execute("UPDATE ...")
mysql._loop.run_until_complete(safe_update())
3.3 最佳实践指南
某金融系统数据库操作规范:
- 所有写操作必须使用事务
- 查询结果超过100条需分页处理
- 敏感字段查询必须记录审计日志
- 生产环境禁止使用字符串拼接SQL
- 执行时间超过1秒的操作需添加超时控制
# 安全查询示例
async def secure_query():
async with mysql._mysql_pool.acquire() as conn:
async with conn.cursor(DictCursor) as cur:
await cur.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,), # ✅ 原生参数化
timeout=5.0 # ⏱️ 查询超时设置
)
return await cur.fetchall()
四、完整代码
"""
Python :3.13.3
Selenium: 4.31.0
database.py
"""
import asyncio
from chap5.file_reader import INIReader
from setting import DATABASE_INI_PATH
from aiomysql import create_pool, DictCursor
from cx_Oracle import SessionPool
from asyncio import ensure_future
class DataBase:
def __init__(self, database: str = 'mysql', autocommit: bool = True, *args, **kwargs):
self._args, self._kwargs = args, kwargs
self._autocommit = autocommit
if database.lower() == 'mysql':
self._database = create_pool
self._ini = INIReader(DATABASE_INI_PATH).data
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._mysql_pool = self.mysql_pool
if database.lower() == 'oracle':
self._database = SessionPool
self._ini = INIReader(DATABASE_INI_PATH, section='oracle').data
self._oracle_pool = self.oracle_pool
@property
def oracle_pool(self): # 建立Oracle连接池的方法
return self._database(*self._args, **self._ini, **self._kwargs)
@property
def mysql_pool(self): # 建立Mysql连接池的方法
self._ini['autocommit'] = self._autocommit
pool_task = ensure_future(self._database(*self._args, **self._ini, **self._kwargs))
self._loop.run_until_complete(pool_task)
return pool_task.result()
class MysqlClient(DataBase):
@classmethod
def setup(cls, *args, **kwargs):
return cls(
*args, **kwargs
)
async def _select(self, sql: str, param: tuple = (), rows: [int, None] = 1):
async with self._mysql_pool.acquire() as conn:
async with conn.cursor(DictCursor) as cur:
await cur.execute(sql.replace('?', '%s'), param)
if rows:
rs = await cur.fetchmany(rows)
else:
rs = await cur.fetchall()
return rs
def select(self, *args, **kwargs):
self._loop.run_until_complete(select_task := ensure_future(self._select(*args, **kwargs)))
return select_task.result()
async def _execute(self, sql: str, param: tuple = ()):
async with self._mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql.replace('?', '%s'), param)
return cur.rowcount
def execute(self, *args, **kwargs):
self._loop.run_until_complete(execute_task := ensure_future(self._execute(*args, **kwargs)))
return execute_task.result()
mysql = MysqlClient.setup()
print(mysql.select(r'SHOW DATABASES;', (), rows=None))
print(mysql.select(r'SELECT * FROM myemployees.jobs where JOB_ID=?', ('AC_ACCOUNT'), rows=None))
print(mysql.execute(r'UPDATE myemployees.jobs SET JOB_TITLE = ? WHERE JOB_ID = ?', ('演示', 'AC_ACCOUNT')))
「小贴士」:点击头像→【关注】按钮,获取更多软件测试的晋升认知不迷路! 🚀