先安装pymysql
import pymysql
# 连接到数据库
connection = pymysql.connect(host='localhost',port=3306,
user='root',
password='123',
db='slb',
charset='utf8',
)
try:
# with connection.cursor() as cursor:
# # 插入新的数据
# sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
# cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
#
# # 默认情况下,连接不是自动提交。所以你必须自行保存好
# connection.commit()
with connection.cursor() as cursor:
# 读取单个记录
sql = "SELECT * FROM slb.slb_machine_maintain"
cursor.execute(sql)
result = cursor.fetchone()
print(result)
finally:
connection.close()
from DBUtils.PooledDB import PooledDB
pool = PooledDB(pymysql,30, host="127.0.0.1", user='root',passwd='root1234', db='slb', port=3306, charset="utf8")
def exec_sql(method, sql):
if len(method) == 0 or len(sql) == 0:
return {'status': -1, 'msg': 'sql or method 不能为空'}
conn = pool.connection()
try:
with conn.cursor() as cursor:
if method == 'select':
cursor.execute(sql)
data = cursor.fetchall() #data =('ding', 28), ('shaf', 25), ('akif', 32) select返回查询结果的元组
if len(data) == 0:
res = {'status': 0, 'msg': '查询结果为空'}
return res
res = {'status': 1, 'msg': data}
return res
elif method == 'insert' or 'update' or 'delete':
data = cursor.execute(sql)
print('返回的data:', data)
conn.commit()
if method == 'insert': # data结果为int, insert操作返回影响的行数
print('data = %s'%data)
if method == 'update':
print('data = %s'%data) #data结果为int, update操作返回影响的行数
if method == 'delete':
print(type(data)) #<class 'int'>
print('data = %s'%data) #data结果为int, update操作返回影响的行数
res = {'status': 1, 'msg': data}
return res
else:
res = {'status': -1, 'msg': '无此操作'}
return res
conn.close()
except Exception as e:
conn.rollback() # 发生错误时回滚
res = {'status': -1, 'msg': '执行sql:%s 发生异常!!:%s' % (sql, e)}
return res
print(exec_sql('delete', 'delete from dyf_test_table1 where age = 100'))