基于PyMySQL的数据库操作类使用

一 简介
    Python和MySQL交互的模块有 MySQLdb 和 PyMySQL(pymysql),MySQLdb是基于C 语言编写的,而且Python3 不在支持MySQLdb 。PyMySQL是一个纯Python写的MySQL客户端,它的目标是替代MySQLdb,可以在CPython、PyPy、IronPython和Jython环境下运行,PyMySQL在MIT许可下发布。

    在开发基于Python语言的项目中,为了以后系统能兼容Python3,我们使用了PyMySQL替换了MySQLdb。下面我们来熟悉一下pymysql的使用。
 
二 安装方式
  pymsql的源码 https://github.com/PyMySQL/PyMySQL ,目前还在持续更新。

安装要求:
 
Python -- one of the following:
 
    CPython >= 2.6 or >= 3.3

    PyPy >= 4.0

    IronPython 2.7

MySQL Server -- one of the following:

    MySQL >= 4.1 (tested with only 5.5~)

    MariaDB >= 5.1
安装
  pip install PyMySQL
  相关文档 , 强烈建议大家仔细阅读一遍。其用法和MySQLdb相差无几,核心用法一致。这样使用pymysql替换mysqldb的成本极小。


三 基于pymysql的数据库交互

 

#!/usr/bin/env python
 
# encoding: utf-8

"""
 
author: yangyi@youzan

time: 2015/6/8 上午11:34

func: 基于pymysql的数据库交互类,支持事务提交和回滚,返回结果记录行数,和insert的最新id

"""

import pymysql

from warnings import filterwarnings

filterwarnings('ignore', category=pymysql.Warning)

CONNECT_TIMEOUT = 100

IP = 'localhost'

PORT = 3306

USER = 'root'

PASSSWORD = ''

 

class QueryException(Exception):

    """
 
    """

 

 

class ConnectionException(Exception):

    """
 
    """


class MySQL_Utils():

    def __init__(

            self, ip=IP, port=PORT, user=USER, password=PASSSWORD,

            connect_timeout=CONNECT_TIMEOUT, remote=False, socket='', dbname='test'):

        self.__conn = None

        self.__cursor = None

        self.lastrowid = None

        self.connect_timeout = connect_timeout

        self.ip = ip

        self.port = port

        self.user = user

        self.password = password

        self.mysocket = socket

        self.remote = remote

        self.db = dbname

        self.rows_affected = 0

 

 

    def __init_conn(self):

        try:
            conn = pymysql.connect(

                    host=self.ip,

                    port=int(self.port),

                    user=self.user,

                    db=self.db,

                    connect_timeout=self.connect_timeout,

                    charset='utf8', unix_socket=self.mysocket)

        except pymysql.Error as e:

            raise ConnectionException(e)

        self.__conn = conn

 

 

    def __init_cursor(self):

        if self.__conn:

            self.__cursor = self.__conn.cursor(pymysql.cursors.DictCursor)

 

 

    def close(self):

        if self.__conn:

            self.__conn.close()

            self.__conn = None

 

    #专门处理select 语句
    def exec_sql(self, sql, args=None):

        try:

            if self.__conn is None:

                self.__init_conn()

                self.__init_cursor()

            self.__conn.autocommit = True

            self.__cursor.execute(sql, args)

            self.rows_affected = self.__cursor.rowcount

            results = self.__cursor.fetchall()

            return results

        except pymysql.Error as e:

            raise pymysql.Error(e)

        finally:

            if self.__conn:

                self.close()

 

    # 专门处理dml语句 delete,updete,insert 
    def exec_txsql(self, sql, args=None):

        try:

            if self.__conn is None:

                self.__init_conn()

                self.__init_cursor()

            if self.__cursor is None:

                self.__init_cursor()

 

 

            self.rows_affected=self.__cursor.execute(sql, args)
            self.lastrowid = self.__cursor.lastrowid
            return self.rows_affected
        except pymysql.Error as e:

            raise pymysql.Error(e)

        finally:

            if self.__cursor:

                self.__cursor.close()

                self.__cursor = None

 

    # 提交
    def commit(self):

        try:

            if self.__conn:

                self.__conn.commit()

        except pymysql.Error as e:

            raise pymysql.Error(e)

        finally:

            if self.__conn:

                self.close()

 

    #回滚操作
    def rollback(self):

        try:

            if self.__conn:

                self.__conn.rollback()
        except pymysql.Error as e:

            raise pymysql.Error(e)

        finally:

            if self.__conn:

                self.close()

    # 适用于需要获取插入记录的主键自增id

    def get_lastrowid(self):

        return self.lastrowid

    #获取dml操作影响的行数
    def get_affectrows(self):
        return self.rows_affected
      #MySQL_Utils初始化的实例销毁之后,自动提交
    def __del__(self):
        self.commit()
 
四 小结
  前几天刚刚将我们的系统中的MySQLdb 替换为PyMySQL, 还未遇到问题。欢迎大家使用测试上述脚本,有问题欢迎和我讨论。

相关推荐