wxPython实现sqlite3数据库的gui界面,wxpythonsqlite3,sql.py#!/usr
wxPython实现sqlite3数据库的gui界面,wxpythonsqlite3,sql.py#!/usr
sql.py
#!/usr/bin/env python# -*- coding: utf-8 -*-# ------------------------------------------------------------------------------## filename: sql.py# version: 1.0.5# description: The APIs for sqlite3.# first create: ppxxyy1110# history:# 2012-09-22 | 0.0.1# ppxxyy1110 create# 2012-10-01 | 1.0.0# ppxxyy1110 modified# ------------------------------------------------------------------------------"""this is my sql kernal for sqlite3"""import loggingimport sqlite3logging.basicConfig(level=logging.DEBUG)class MyDB: """ this a database class with suply the api for database """ def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) self.databasename = None self.cxn = None #connection self.tables = None def get_table_value(self, tablename): """here is a value""" cur = self.cxn.cursor() cur.execute("select * from %s" % tablename) result = cur.fetchall() cur.close() return result def connect_db(self, databasename): """create a connect to database """ self.databasename = databasename self.logger.debug("opening database %s" % (self.databasename)) try: self.cxn = sqlite3.connect(self.databasename) except IOError: self.logger.error('could not open database file') else: self.logger.info('open database sucess') self.tables = self.gettables() def close_db(self): pass def gettables(self): """get all the tables for on the current opened database""" self.logger.debug( "cxn = %s" % self.cxn) cur = self.cxn.cursor() cur.execute("select name from sqlite_master where type='table'") tmp = cur.fetchall() cur.close() result = [] for item in tmp: result.append(item[0]) return result def addatable(self, newtable, cols): """add a table to the database newtable----new table name cols--- dictionary as colname:coltype """ cur = self.cxn.cursor() fields = "" for item in cols.items(): if len(fields) > 0: fields = "%s,%s %s" % (fields, item[0], item[1]) else: fields = "%s %s" % (item[0], item[1]) statment = "CREATE TABLE %s (%s)" % (newtable, fields) self.logger.debug("statement = %s" % statment) cur.execute(statment) cur.close() def deleteatable(self, tblname): """delete a table from database""" statement = "DROP TABLE %s" % tblname cur = self.cxn.cursor() cur.execute(statement) cur.close() def renametablename(self, oldname, newname): """renname a table""" statement = 'ALTER TABLE %s RENAME TO %s' % (oldname, newname) cur = self.cxn.cursor() cur.execute(statement) cur.close() def additem(self, tblname, dic): """ add a item to table tblname --- table name dic ---- a dictionnray of colname:colvalue """ tableinfo = self.gettableinfo(tblname) name_type = {} for item in tableinfo: name_type[item[1]] = item[2] cols = "" vals = "" for col in dic.keys(): if cols == "": cols = col else: cols = "%s,%s" % (cols, col) val = "" if name_type[col] == 'integer' or name_type[col] == 'int': val = int(dic[col]) else: val = "'%s'" % dic[col] if vals == "": vals = val else: vals = "%s,%s" % (vals, val) statement = u"INSERT INTO %s (%s) VALUES (%s)" % (tblname, cols, vals) #`tuple(dic.values())`) self.logger.debug("statement = %s" % statement) cur = self.cxn.cursor() cur.execute(statement) cur.close() self.cxn.commit() def deleteitem(self, tblname, dic): """delete items from table dic --- a dictionary used to make where statement """# name_type = self.gettablecoltype(tblname) self.logger.debug( "dic = %s" % dic) self.logger.debug("dic type = %s" % [(type(item),item) for item in dic.values()]) cols = "" for col in dic.keys(): if dic[col] == None: continue if type(dic[col]) == int: tmp = "%s = %s" % (col, dic[col]) else: tmp = "%s = '%s'" % (col, dic[col]) if cols == "": cols = tmp else: cols = "%s and %s" % (cols, tmp)# self.logger.debug( "cols = %s " % cols )# self.logger.debug( "dic = %s"% dic ) statement = "DELETE FROM %s WHERE %s" % (tblname, cols) self.logger.debug( "statement = %s " % statement) cur = self.cxn.cursor() cur.execute(statement) cur.close() self.cxn.commit() def updateitem(self, tblname, dic, setdic): """update items from table dic --- a dictionary of where sub statement as colname:colvalue setdic --- a dictionary of set sub statement as colname:colvalue """ cols = "" for col in dic.keys(): if dic[col] == None: continue if type(dic[col]) == int: tmp = "%s = %s" % (col, dic[col]) else: tmp = "%s = '%s'" % (col, dic[col]) if cols == "": cols = tmp else: cols = "%s and %s" % (cols, tmp) self.logger.debug( "cols = %s " % cols) self.logger.debug( "dic = %s" % dic) setcols = "" for col in setdic.keys(): tmp = "%s = '%s'" % (col, setdic[col]) if setcols == "": setcols = tmp else: setcols = "%s ,%s" % (setcols, tmp) self.logger.debug( "setcols = %s " % setcols ) self.logger.debug( "setdic = %s" % setdic) statement = "UPDATE %s SET %s WHERE %s" % (tblname, setcols, cols) self.logger.debug( "statement = %s" % statement) cur = self.cxn.cursor() cur.execute(statement) cur.close() self.cxn.commit() def addkey(self, tblname, colname): """ add a primary key to table""" cur = self.cxn.cursor() #get the table create sql statement cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = '%s'" % tblname) create_statement = cur.fetchall()[0][0]# self.logger.debug( "create_statement = %s" % create_statement) import re field_statement = re.search(r"\(.*\)", create_statement).group()[1:-1] #Removed both sides of the brackets# self.logger.debug( "field_statement =", field_statement# field_list = field_statement.split(",") #re.split("[,\W]",field_statement)# self.logger.debug( "field_list = %s" % field_list) keyname = self.get_primary_key(tblname) if keyname != None: self.logger.error( "this table already has a key as (%s), if you must add a key, delete this key first" % keyname) return #this table has no primary key index = field_statement.find(",", field_statement.find(colname)) field_statement_new = "" if index >= 0: field_statement_new = "%s PRIMARY KEY %s" % (field_statement[:index], field_statement[index:]) else: field_statement_new = "%s PRIMARY KEY" % field_statement cur.execute("CREATE TEMPORARY TABLE TEMP_TABLE (%s)" % field_statement_new) cur.execute("INSERT INTO TEMP_TABLE SELECT * FROM %s" % tblname) cur.execute("DROP TABLE %s" % tblname) cur.execute("CREATE TABLE %s (%s)" % (tblname, field_statement_new)) cur.execute("INSERT INTO %s SELECT * FROM TEMP_TABLE" % tblname) cur.execute("DROP TABLE TEMP_TABLE") cur.close() self.cxn.commit() def deletekey(self, tblname): """ delete primary key from table""" cur = self.cxn.cursor() cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = '%s'" % tblname) create_statement = cur.fetchall()[0][0]# self.logger.debug( "create_statement = %s" % create_statement) import re field_statement = re.search(r"\(.*\)", create_statement).group()[1:-1] #Removed both sides of the brackets# self.logger.debug( "field_statement = %s" % field_statement)# field_list = field_statement.split(",") #re.split("[,\W]",field_statement)# self.logger.debug( "field_list = %s" % field_list) keyname = self.get_primary_key(tblname) if keyname == None: self.logger.info( "this table has no key") return else: indexs = field_statement.upper().find('PRIMARY KEY') primarykey = field_statement[ indexs : indexs + len('PRIMARY KEY')] field_statement_new = field_statement.replace(primarykey, "") self.logger.debug( "field_statement_new = %s" % field_statement_new) cur.execute("CREATE TEMPORARY TABLE TEMP_TABLE (%s)" % field_statement_new) cur.execute("INSERT INTO TEMP_TABLE SELECT * FROM %s" % tblname) cur.execute("DROP TABLE %s" % tblname) cur.execute("CREATE TABLE %s (%s)" % (tblname, field_statement_new)) cur.execute("INSERT INTO %s SELECT * FROM TEMP_TABLE" % tblname) cur.execute("DROP TABLE TEMP_TABLE") cur.close() self.cxn.commit() def gettableinfo(self, tablename): """get the table infomation""" cur = self.cxn.cursor() cur.execute("PRAGMA table_info(%s)" % tablename) info = cur.fetchall() self.logger.debug( "info = %s" % info) cur.close() return info def gettablecoltype(self, tblname): """ get the table's colname:coltype dictionary""" tableinfo = self.gettableinfo(tblname) name_type = {} for item in tableinfo: name_type[item[1]] = item[2] return name_type def get_primary_key(self, tblname): """detective whether table already has a primary key, if this table already has a primary key, return the key name if not return None """ cur = self.cxn.cursor() #get the table create sql statement cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = '%s'" % tblname) create_statement = cur.fetchall()[0][0] import re field_statement = re.search(r"\(.*\)", create_statement).group()[1:-1] #Removed both sides of the brackets field_list = field_statement.split(",") #re.split("[,\W]",field_statement) for item in field_list: if item.upper().find('PRIMARY KEY') >= 0: indexs = item.upper().find('PRIMARY KEY') primarykey = item[ : indexs] #self.logger.debug( #"this table already has a key as (%s), if you must add a key, delete this key first" % item ) return primarykey return None
sql_char.py
#!/usr/bin/env python# ------------------------------------------------------------------------------## filename: sql_char.py# version: 1.0.5# description: The Command User Interface for sqlite3.# first create: ppxxyy1110# history:# 2012-10-01 | 0.0.1# ppxxyy1110 create# ------------------------------------------------------------------------------"""this a sql char interface"""import sqlERRORMSG = 'please enter a correct cmd, you can enter help'DATA_BASE = sql.MyDB()def shell_show(): """ show all tables""" ret = DATA_BASE.gettables() print retdef shell_table(): """in this shell you can add,delete,rename a table""" msgs = {'add':'add--- add a table', 'delete':'delete--- delete a table', 'rename':'rename--- rename a table', 'r':'r--- return to up level menu', 'exit':'exit --- exit' } funs = {'add':'add_a_table()', 'delete':'delete_a_table()', 'rename':'rename_a_table()', 'r':'return', 'exit':'exit()', 'help':'myhelp()' } def myhelp(): """get help info""" print "%" + ("="*40) + "%" for msg in msgs.values(): print msg print "%" + ("="*40) + "%" myhelp() cmd = raw_input("sqlite/table>>") while True: if cmd not in funs.keys(): print ERRORMSG else: eval(funs[cmd]) cmd = raw_input("sqlite/table>>")def shell_item(): """in this shell you can add,delete,update a item from a table""" msgs = {'add':'add--- add a item', 'delete':'delete--- delete a item', 'update':'update--- update a item', 'r':'r--- return to up level menu', 'exit':'exit --- exit' } funs = {'add':'add_a_item()', 'delete':'delete_a_item()', 'rename':'update_a_item()', 'exit':'exit()', 'help':'myhelp()' } def myhelp(): """get help info""" print "%" + ("="*40) + "%" for msg in msgs.values(): print msg print "%" + ("="*40) + "%" myhelp() cmd = raw_input("sqlite/item>>") while True: if cmd not in funs.keys(): print ERRORMSG else: eval(funs[cmd]) cmd = raw_input("sqlite/item>>")def shell_key(): """in this shell you can add,delete a key from a table""" msgs = {'add':'add--- add primary key', 'delete':'delete--- delete primary key', 'r':'r--- return to up level menu', 'exit':'exit --- exit' } funs = {'add':'add_primary_key()', 'delete':'delete_primary_key()', 'exit':'exit()', 'help':'myhelp()' } def myhelp(): """get help info""" print "%" + ("="*40) + "%" for msg in msgs.values(): print msg print "%" + ("="*40) + "%" myhelp() cmd = raw_input("sqlite/key>>") while True: if cmd not in funs.keys(): print ERRORMSG else: eval(funs[cmd]) cmd = raw_input("sqlite/key>>")def add_a_table(): """this function allow you add a table to current database""" newtable = raw_input('please input the new table name') newtable = newtable.strip() cols = {} print 'please input fieldname and type, if no more field, just input end' while True: text = raw_input('format filename,type or "end": ') if text == "end": break else: tmp = text.strip().split(":") cols[tmp[0]] = tmp[1] DATA_BASE.addatable(newtable, cols)def delete_a_table(): """this function allow you delete a table from database""" tblname = raw_input('please input the table name you want to delete') tblname = tblname.strip() DATA_BASE.deleteatable(tblname)def rename_a_table(): """rename a table""" oldname = raw_input('please input the old table name: ') newname = raw_input('please input the new table name: ') DATA_BASE.renametablename(oldname, newname)def add_a_item(): """insert a row or item to a table""" tblname = raw_input('please input the tablename: ') tableinfo = DATA_BASE.gettableinfo(tblname) colname = [tp[1] for tp in tableinfo] coltype = [tp[2] for tp in tableinfo] print zip(colname, coltype) dic = {} i = 0 for col in colname: val = raw_input("please input the value of %s, if no value just input 'None': " % col) if val != 'None': if coltype[i] == 'integer': dic[col] = int(val) else: dic[col] = val i = i + 1 DATA_BASE.additem(tblname, dic)def delete_a_item(): """delete a row or a item""" tblname = raw_input('please input the tablename: ') tableinfo = DATA_BASE.gettableinfo(tblname) colname = [tp[1] for tp in tableinfo] coltype = [tp[2] for tp in tableinfo] print zip(colname, coltype) dic = {} i = 0 for col in colname: val = raw_input("please input the value of %s, if no value just input 'None'" % col) if val != 'None': if coltype[i] == 'integer' or coltype[i] == 'int': dic[col] = int(val) else: dic[col] = val i += 1 DATA_BASE.deleteitem(tblname, dic)def update_a_item(): """update a row or a item""" tblname = raw_input('please input the tablename: ') tableinfo = DATA_BASE.gettableinfo(tblname) colname = [tp[1] for tp in tableinfo] coltype = [tp[2] for tp in tableinfo] print zip(colname, coltype) print "please input the where" dic = {} i = 0 for col in colname: val = raw_input("please input the value of %s, if no value just input 'None': " % col) if val != 'None': if coltype[i] == 'integer': dic[col] = int(val) else: dic[col] = val i = i + 1 #make set print "please input the set statement" setdic = {} i = 0 for col in colname: val = raw_input("please input the value of %s, if no value just input 'None': " % col) if val != 'None': if coltype[i] == 'integer': setdic[col] = int(val) else: setdic[col] = val i = i + 1 DATA_BASE.updateitem(tblname, dic, setdic)def add_primary_key(): """add a primary key to table""" tblname = raw_input('please input the tablename: ') tableinfo = DATA_BASE.gettableinfo(tblname) colname = [tp[1] for tp in tableinfo] coltype = [tp[2] for tp in tableinfo] print zip(colname, coltype) keyname = DATA_BASE.get_primary_key(tblname) if keyname != None: print "table %s already has a key as '%s', if you must add a key, delete this key first" \ % (tblname, keyname) else: colname = raw_input("Enter the fieldname as a primary key: ") DATA_BASE.addkey(tblname, colname)def delete_primary_key(): """delete a primary key from a table""" tblname = raw_input('please input the tablename: ') tableinfo = DATA_BASE.gettableinfo(tblname) colname = [tp[1] for tp in tableinfo] coltype = [tp[2] for tp in tableinfo] print zip(colname, coltype) DATA_BASE.deletekey(tblname)def shell_top(): """if this module run as top module this method would be called""" import sys database = DATA_BASE if len(sys.argv) < 2: dbname = raw_input("Enter a database name: ") else: dbname = sys.argv[1] database.connect_db(dbname) msgs = {'show':'show --- show all tables', 'table':'table --- add,delete,rename a table', 'item':'item --- add,delete,update a item in a table', 'key':'key --- add,delete a key', 'exit':'exit --- exit' } funs = {'show':'shell_show()', 'table':'shell_table()', 'item':'shell_item()', 'key':'shell_key()', 'exit':'exit()', 'help':'myhelp()' } def myhelp(): """get help info""" print "%" + ("="*40) + "%" for msg in msgs.values(): print msg print "%" + ("="*40) + "%" myhelp() while True: cmd = raw_input("sqlite>>") if cmd not in funs.keys(): print ERRORMSG else: eval(funs[cmd])if __name__ == "__main__": shell_top()
Screenshot from 2012-10-20 21:37:44.png
imgs/asCode/20213929_T7C9.png
相关内容
- 简单的生成html,简单生成html,[Python]代码cl
- Levenshtein字符串相似度,Levenshtein字符串,Levenshtein距
- 抓取网上的小说章节并写入txt文件,抓取网上章节txt
- 批量删除所下载的.git文件夹,批量删除.git文件夹,批量
- 得到N以内的所有的质数,得到N质数,getPrime.pyi
- peewee的简单封装,peewee简单封装,[Python]代码im
- python 生成IP段,python生成ip,[Python]代码#!
- Python 扫描IP段 指定端口是否开放,pythonip,TCP21.py#!/u
- 使用python Tk的实现tablepanel+treeview,pythontablepanel,window
- python BeautifulSoup 抓取网页内指定内容,,# _*_ coding
评论关闭