本文介绍python中操作mysql的方法
更新于 2021-06-14
python操作mysql的模块 python操作mysql常用的模块有如下的几种:
原生sql :
PyMySQL:支持python2,3;
MySQLdb:支持python2;
ORM框架 :
pymysql操作数据库 安装
连接到数据库 使用connect()
方法,传入mysql相关的配置即可连接到数据库:
1 2 3 4 5 6 7 8 9 10 11 12 import pymysqlmysql_info = { "host" : "127.0.0.1" , "prot" : 3306 , "user" : "root" , "passowrd" : "root123" , "db" : "testdb" , "charset" : "utf8mb4" } conn = pymysql.connect(**mysql_info)
创建游标 连接到数据库后,需要获取一个游标,游标的作用是用于进行后续的操作:
1 2 3 4 5 cursor = conn.cursor() cursor = conn.cursor(pymysql.cursors.DictCursor)
执行sql语句 事先写好sql语句,传给游标的execute
方法即可,返回值为受影响的行数:
1 2 3 4 5 effect_row1 = cursor.execute("select * from USER" ) effect_row2 = cursor.executemany("insert into USER (NAME) values(%s)" , [("jack" ), ("boom" ), ("lucy" )])
execute和executemany方法返回的是受影响的行数。
如果是增删改查的sql,则执行后需要调用commit()
方法进行提交保存:
获取数据 1 2 3 4 5 6 7 8 9 10 11 result = cursor.fetchall() result = cursor.fetchone() result = cursor.fetchmany(5 ) new_id = cursor.lastrowid
关闭连接和游标 最后需要关闭连接和游标,释放资源:
1 2 cursor.close() conn.close()
pymysql使用连接池 上述方式可能会在一些情况下频繁创建和断开数据库连接,可以使用DBUtils
来实现一个数据库连接池。连接池有两种连接方式:
为每个线程创建一个连接,线程调用了close()
方法也不会关闭,会把连接重新放到池中;
创建一批连接到连接池,供所有线程共享使用(推荐方式);
方式一、为每个线程创建连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from DBUtils.PersistentDB import PersistentDBimport pymysqlPOOL = PersistentDB( creator=pymysql, maxusage=None , setsession=[], ping=0 , closeable=False , threadlocal=None , host='127.0.0.1' , port=3306 , user='root' , password='root123' , database='testdb' , charset='utf8' , ) def func (): conn = POOL.connection(shareable=False ) cursor = conn.cursor() cursor.execute('select * from USER' ) result = cursor.fetchall() cursor.close() conn.close() return result result = func() print(result)
方式二、创建连接到连接池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import timeimport pymysqlimport threadingfrom DBUtils.PooledDB import PooledDB, SharedDBConnectionPOOL = PooledDB( creator=pymysql, maxconnections=6 , mincached=2 , maxcached=5 , maxshared=3 , blocking=True , maxusage=None , setsession=[], ping=0 , host='127.0.0.1' , port=3306 , user='root' , password='root123' , database='testdb' , charset='utf8' , ) def func (): conn = POOL.connection() cursor = conn.cursor() cursor.execute('select * from USER' ) result = cursor.fetchall() conn.close() return result result = func() print(result)
pymysql加锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import pymysqlimport threadingfrom threading import RLockLOCK = RLock() CONN = pymysql.connect(host='127.0.0.1' , port=3306 , user='root' , password='root123' , database='testdb' , charset='utf8' ) def task (arg ): with LOCK: cursor = CONN.cursor() cursor.execute('select * from USER ' ) result = cursor.fetchall() cursor.close() print(result) for i in range (10 ): t = threading.Thread(target=task, args=(i,)) t.start()
实例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 import pymysqlimport threadingfrom DBUtils.PooledDB import PooledDB, SharedDBConnectionPOOL = PooledDB( creator=pymysql, maxconnections=20 , mincached=2 , maxcached=5 , blocking=True , maxusage=None , setsession=[], ping=0 , host='127.0.0.1' , port=3306 , user='root' , password='root123' , database='testdb' , charset='utf8' , ) def connect (): conn = POOL.connection() cursor = conn.cursor(pymysql.cursors.DictCursor) return conn,cursor def close (conn, cursor ): cursor.close() conn.close() def fetch_one (sql ): conn,cursor = connect() effect_row = cursor.execute(sql) result = cursor.fetchone() close(conn,cursor) return result def fetch_all (sql ): conn, cursor = connect() cursor.execute(sql) result = cursor.fetchall() close(conn, cursor) return result def insert (sql ): """ 创建数据 :param sql: 含有占位符的SQL :return: """ conn, cursor = connect() effect_row = cursor.execute(sql) conn.commit() close(conn, cursor) def delete (sql ): """ 创建数据 :param sql: 含有占位符的SQL :return: """ conn, cursor = connect() effect_row = cursor.execute(sql) conn.commit() close(conn, cursor) return effect_row def update (sql ): conn, cursor = connect() effect_row = cursor.execute(sql) conn.commit() close(conn, cursor) return effect_row
SQLAlchemy ORM操作数据库 安装
检查是否安装成功:
1 2 3 >>> import sqlalchemy >>> sqlalchemy.__version__ '1.3.20'
连接到数据库 1 2 3 from sqlalchemy import create_engineengine = create_engine('mysql://root:root123@172.21.1.100:3306/news?charset=utf8' )
使用create_engine
连接数据库,格式为:
1 mysql://<用户名>:<密码>@<数据库地址>:<端口>/库名?charset=<编码>
定义模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, DateTime, Booleanengine = create_engine('mysql://root:root123@172.21.1.100:3306/news?charset=utf8' ) Base = declarative_base() class News (Base ): __tablename__ = 'news' id = Column(Integer, primary_key=True ) title = Column(String(200 ), nullable=False ) content = Column(String(2000 ), nullable=False ) types = Column(String(10 ), nullable=False ) image = Column(String(300 )) author = Column(String(20 )) view_count = Column(Integer) created_at = Column(DateTime) is_valid = Column(Boolean)
创建表 在定义好表结构后,可以这样进行创建表结构:
1 2 >>> from mysql_orm import News, engine>>> News.metadata.create_all(engine)
数据增删改查 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, DateTime, Booleanfrom sqlalchemy.orm import sessionmakerengine = create_engine('mysql://root:root123@172.21.1.100:3306/news?charset=utf8' ) Base = declarative_base() Session = sessionmaker(bind=engine) class News (Base ): ...... class ORM (object ): def __init__ (self ): self.session = Session() def insert (self ): '''新增数据''' new_obj = News( title = "this is title" , content = "this is content" types = "type1" ) self.session.add(new_obj) self.session.commit() return new_obj def select_on (self ): '''查询一条数据''' result = self.session.query(News).get(id =1 ) return result def select_more (self ): '''查询多条数据''' result = self.session.query(News).filter_by(is_valid=True ).order_by(id ) return result def update (self ): '''更新数据''' obj = self.session.query(News).filter_by(id =10 ) for item in obj: item.is_valid = 0 self.session.add(item) self.session.commit() return obj def delete (self ): '''删除数据''' data = self.session.query(News).filter_by(id =11 ) for item in data: self.session.delete(data) self.session.commit()
操作已存在的表 首先连接到数据库:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from sqlalchemy import create_engine, MetaDatafrom sqlalchemy.orm import Sessionclass Mysql (object ): def __init__ (self ): self.host = "127.0.0.1" self.pwd = "root123" self.port = "3306" self.user = "root" def connect (self, db ): uri = 'mysql://{}:{}@{}:{}/{}' .format (self.user, self.pwd, self.host, self.port, db) engine = create_engine(uri, echo=True ) metadata = MetaData(engine) session = Session(engine) return {'session' : session, 'metadata' : metadata, 'engine' : engine}
查询数据:
1 2 3 4 5 6 7 8 9 from sqlalchemy import Tabletable = "t_test_table" db_meta = Mysql().connect(table) table_obj = Table(self.table, db_meta['metadata' ], autoload=True , autoload_with=db_meta['engine' ]) rest = db_meta['session' ].query(table_obj).filter_by(id =1 ).value('name' ) print(rest)
修改数据:
1 2 3 4 5 6 7 8 9 10 11 from sqlalchemy import Tabletable = "t_test_table" db_meta = Mysql().connect(table) table_obj = Table(self.table, db_meta['metadata' ], autoload=True , autoload_with=db_meta['engine' ]) db_meta['session' ].execute(table_obj.update().where(table_obj.c.id ==2 ).values(name="jack" )) db_meta['session' ].commit() rest = db_meta['session' ].query(table_obj).filter_by(id =2 ).value('name' ) print(rest)
删除数据:
1 2 3 4 5 6 7 8 from sqlalchemy import Tabletable = "t_test_table" db_meta = Mysql().connect(table) table_obj = Table(self.table, db_meta['metadata' ], autoload=True , autoload_with=db_meta['engine' ]) db_meta['session' ].execute(table_obj.delete().where(table_obj.c.id ==3 ))
新增数据:
1 2 3 4 5 6 7 8 from sqlalchemy import Tabletable = "t_test_table" db_meta = Mysql().connect(table) table_obj = Table(self.table, db_meta['metadata' ], autoload=True , autoload_with=db_meta['engine' ]) db_meta['session' ].execute(table_obj.insert(),{"name" :"mike" })