Python MySQLdb模块简单封装的方法源码,pythonmysqldb,日常工作中可能会需要把M
Python MySQLdb模块简单封装的方法源码,pythonmysqldb,日常工作中可能会需要把M
日常工作中可能会需要把MySQLdb封装起来,方便查询等功能使用,今天就来给大家提供一份关于Python MySQLdb模块简单封装的方法源码,供大家参考使用。
Python MySQL数据库相关文章推荐:Python mysql数据库连接、判断、创建表的方法
MySQLdb模块简单封装方法源码如下:
#-*- encoding: utf-8 -*-__author__ = "wentianle"__version__ = "$Revision: 1.0 $"__date__ = "$Date: 2009/02/19 $"__license__ = "Python"import MySQLdbfrom db.DBPool import DBPoolfrom util import func_extfrom conf.config import Configimport stringclass DB: fields = None def open(self,db_name,autoCommit=True): pass def query(self,sql): pass def error(self,e): print "Error %d: %s" % (e.args[0], e.args[1]) def close(self): """close the mysql connect""" self.cur.close() self.db_conn.close() def next_record(self): self.cur.nextset() def insert(self,p_table_name,p_data): for key in p_data: p_data[key] = func_ext.addslashes(func_ext.uni_str(p_data[key])) key = string.join(p_data.keys(),"`,`") value = string.join(p_data.values(),"','") real_sql = "INSERT INTO " + p_table_name + " (`" + key + "`) VALUES ('" + value + "')" self.query("set names 'utf8'") return self.query(real_sql) def replace(self,p_table_name,p_data): for key in p_data: p_data[key] = func_ext.addslashes(func_ext.uni_str(p_data[key])) key = string.join(p_data.keys(),"`,`") value = string.join(p_data.values(),"','") real_sql = "REPLACE INTO " + p_table_name + " (`" + key + "`) VALUES ('" + value + "')" self.query("set names 'utf8'") return self.query(real_sql) def update(self,p_table_name,p_data,p_where): for key in p_data: p_data[key] = func_ext.addslashes(func_ext.uni_str(p_data[key])) for key in p_where: p_where[key] = func_ext.addslashes(func_ext.uni_str(p_where[key])) edit_sql = string.join([str(x[0])+"='"+str(x[1])+"'" for x in p_data.items()],",") where_sql = string.join([str(x[0])+"='"+str(x[1])+"'" for x in p_where.items()]," AND ") real_sql = "UPDATE "+p_table_name+" SET "+edit_sql+" WHERE "+where_sql self.query("set names 'utf8'") return self.query(real_sql) def delete(self,p_table_name,p_where): for key in p_where: p_where[key] = func_ext.addslashes(func_ext.uni_str(p_where[key])) where_sql = string.join([x[0]+"='"+x[1]+"'" for x in p_where.items()]," AND ") real_sql = "DELETE FROM "+p_table_name+" WHERE "+where_sql self.query("set names 'utf8'") return self.query(real_sql) def isinstance(self,p_table_name,p_where): for key in p_where: p_where[key] = func_ext.addslashes(func_ext.uni_str(p_where[key])) where_sql = string.join([x[0]+"='"+x[1]+"'" for x in p_where.items()]," AND ") real_sql = "SELECT count(*) as cnt FROM "+p_table_name+" WHERE "+where_sql if self.query(real_sql): res = self.fetch_assoc() return res[0] else: return 0 def select(self,sql): self.query("set names 'utf8'") self.query(sql) return self.fetch_all() def selectRow(self,sql): self.query("set names 'utf8'") self.query(sql) return self.fetch_assoc() def fetch_all(self,upper=0): if self.get_num_rows(): d = [] result=self.cur.fetchall() desc = self.cur.description for inv in result: _d = {} if upper: for i in xrange(0,len(inv)): if isinstance(inv[i],(unicode)): _d[desc[i][0].upper()] = str(inv[i]) else: _d[desc[i][0].upper()] = inv[i] elif not upper: for i in xrange(0,len(inv)): if isinstance(inv[i],(unicode)): _d[desc[i][0].lower()] = str(inv[i]) else: _d[desc[i][0].lower()] = inv[i] else: for i in xrange(0,len(inv)): if isinstance(inv[i],(unicode)): _d[desc[i][0]] = str(inv[i]) else: _d[desc[i][0]] = inv[i]#www.iplaypy.com 2000 d.append(_d) return d else: return None def fetch_assoc(self,upper=0): if self.get_num_rows(): d = {}# i = 0 desc = self.cur.description self.fields = self.cur.fetchone() if upper: for i in xrange(0,len(self.fields)): if isinstance(self.fields[i],(unicode)): d[desc[i][0].upper()] = str(self.fields[i]) else: d[desc[i][0].upper()] = self.fields[i] elif not upper: for i in xrange(0,len(self.fields)): if isinstance(self.fields[i],(unicode)): d[desc[i][0].lower()] = str(self.fields[i]) else: d[desc[i][0].lower()] = self.fields[i] else: for i in xrange(0,len(self.fields)): if isinstance(self.fields[i],(unicode)): d[desc[i][0]] = str(self.fields[i]) else: d[desc[i][0]] = self.fields[i] return d else: return None def get_num_rows(self): return self.cur.rowcount def autoCommit(self,flag): self.query("Set AUTOCOMMIT = "+str(flag)) def debug_sql(self,sql): if Config.getConf("public","debug_sql") == "True": pass #数据库操作类配合(dbpool)使用class Mysql(DB): def get_mysql_version(self): """return the mysql version""" return MySQLdb.get_client_info() def open(self,db_name,autoCommit=True): """mysql connect""" pool = DBPool.getInstance(db_name) if pool: self.db_conn = pool.connection(0) self.cur = self.db_conn.cursor() if autoCommit == True: self.autoCommit(1) self.charset = DBPool.getInstance(db_name).charset self.set_name() return True else: raise Exception,"open database %s Failure" % (db_name) def set_name(self): self.query("SET NAMES "+self.charset) self.query("SET CHARACTER SET "+self.charset) def query(self,sql): if isinstance(sql, unicode): sql = sql.encode("utf8") self.debug_sql(sql); return self.cur.execute(sql) def getInsertId(self): self.query("SELECT LAST_INSERT_ID() AS lid") rs = self.cur.fetchone() return rs[0] def commit(self): self.db_conn.commit()#if __name__ == '__main__':# myTest = Mysql()# if myTest.open("b2b_platform"):# sql = "select * from sc_goods"# myTest.query(sql)# aa = myTest.fetch_assoc()# print aa
关于Python MySQLdb模块简单封装的方法:第二段源码(数据库连接池句柄)
#-*- encoding: utf-8 -*-__author__ = "wentianle (wenzaile@163.com)"__version__ = "$Revision: 1.0 $"__date__ = "$Date: 2009/02/19 $"__license__ = "Python"from DBUtils.PooledDB import PooledDBimport MySQLdbimport timefrom conf.config import Configfrom util import func_ext#数据库连接池句柄class DBPool: pool = {}; @staticmethod def getInstance(db_name): try: if isinstance(db_name, unicode): db_name = db_name.encode("utf8") if DBPool.pool.get(db_name): return DBPool.pool.get(db_name) else:# print Config.getConf('db_'+db_name,"dbhost") tmp_pool = PooledDB(MySQLdb, \ int(Config.getConf("dbpool_config","db_mincached")),\ int(Config.getConf("dbpool_config","db_maxcached")),\ int(Config.getConf("dbpool_config","db_maxshared")),\ int(Config.getConf("dbpool_config","db_maxconnections")),\ int(Config.getConf("dbpool_config","db_blocking")),\ int(Config.getConf("dbpool_config","db_maxusage")),\ ["SET NAMES utf8","SET CHARACTER SET utf8"],\ host = Config.getConf('db_'+db_name,"dbhost"), \ user = Config.getConf('db_'+db_name,"dbuser"), \ passwd = Config.getConf('db_'+db_name,"dbpass"), \ db = Config.getConf('db_'+db_name,"dbname"),\ charset = Config.getConf('db_'+db_name,"charset"),\ port = int(Config.getConf('db_'+db_name,"dbport"))) tmp_pool.charset = Config.getConf('db_'+db_name,"charset") DBPool.pool[db_name] = tmp_pool return DBPool.pool[db_name] except Exception,e: func_ext.error_log(e) time.sleep(5) return DBPool.getInstance(db_name) if __name__ == '__main__': while True: pool = DBPool.getInstance("plat") time.sleep(2) db_conn = pool.connection() cur = db_conn.cursor() cur.execute("select * from sc_supplier") res = cur.fetchone() print res time.sleep(2)
编橙之家文章,
相关内容
- Python Google talk聊天机器人源码,python聊天机器人,Pytho
- Python语言判断输入的是否是回文数的方法,,Python语言如
- Python localtime()方法计算今天是一年中第几周,pythonloc
- Python实现自动提取国家地理每日图片,,用Python urll
- Python自动输出文件夹下符合条件的全路径名,,Python自动
- Python urllib2发送即时消息到twitter的实现方法,urllib2tw
- Python 按月增加datetime月份的问题,pythondatetime,今天要为
- Python方法解决Url与Tinyurl地址互换问题,pythontinyurl,Pyt
- Python简易邮件查看器源码示例详解,python查看器,Pytho
- Python将汉字数字转换成阿拉伯数字的方法,python阿拉伯
评论关闭