wxPython实现sqlite3数据库的gui界面,wxpythonsqlite3,sql.py#!/usr


sql.py

#!/usr/bin/env python# -*- coding: utf-8 -*-# ------------------------------------------------------------------------------## filename:     sql.py# version:      1.0.5# description:  The APIs for sqlite3.# first create: ppxxyy1110# history:#       2012-09-22 | 0.0.1#                    ppxxyy1110 create#       2012-10-01 | 1.0.0#                    ppxxyy1110 modified# ------------------------------------------------------------------------------"""this is my sql kernal for sqlite3"""import loggingimport sqlite3logging.basicConfig(level=logging.DEBUG)class MyDB:    """ this a database class with suply the api for database """    def __init__(self):        self.logger = logging.getLogger(self.__class__.__name__)        self.databasename = None        self.cxn = None  #connection        self.tables = None    def get_table_value(self, tablename):        """here is a value"""        cur = self.cxn.cursor()        cur.execute("select * from %s" % tablename)        result = cur.fetchall()        cur.close()        return result    def connect_db(self, databasename):        """create a connect to database        """        self.databasename = databasename        self.logger.debug("opening database %s" % (self.databasename))        try:            self.cxn = sqlite3.connect(self.databasename)        except IOError:            self.logger.error('could not open database file')        else:            self.logger.info('open database sucess')        self.tables = self.gettables()    def close_db(self):        pass    def gettables(self):        """get all the tables for on the current opened database"""        self.logger.debug( "cxn = %s" % self.cxn)        cur = self.cxn.cursor()        cur.execute("select name from sqlite_master where type='table'")        tmp = cur.fetchall()        cur.close()        result = []        for item in tmp:            result.append(item[0])        return result    def addatable(self, newtable, cols):        """add a table to the database            newtable----new table name            cols--- dictionary as colname:coltype        """        cur = self.cxn.cursor()        fields = ""        for item in cols.items():            if len(fields) > 0:                fields = "%s,%s %s" % (fields, item[0], item[1])            else:                fields = "%s %s" % (item[0], item[1])        statment = "CREATE TABLE %s (%s)" % (newtable, fields)        self.logger.debug("statement = %s" % statment)        cur.execute(statment)        cur.close()    def deleteatable(self, tblname):        """delete a table from database"""        statement = "DROP TABLE %s" % tblname        cur = self.cxn.cursor()        cur.execute(statement)        cur.close()    def renametablename(self, oldname, newname):        """renname a table"""        statement = 'ALTER TABLE %s RENAME TO %s' % (oldname, newname)        cur = self.cxn.cursor()        cur.execute(statement)        cur.close()    def additem(self, tblname, dic):        """ add a item to table            tblname --- table name            dic ---- a dictionnray of colname:colvalue        """        tableinfo = self.gettableinfo(tblname)        name_type = {}        for item in tableinfo:            name_type[item[1]] = item[2]        cols = ""        vals = ""        for col in dic.keys():            if cols == "":                cols = col            else:                cols = "%s,%s" % (cols, col)            val = ""            if name_type[col] == 'integer' or name_type[col] == 'int':                val = int(dic[col])            else:                val = "'%s'" % dic[col]            if vals == "":                vals = val            else:                vals = "%s,%s" % (vals, val)        statement = u"INSERT INTO %s (%s) VALUES (%s)" % (tblname, cols, vals) #`tuple(dic.values())`)        self.logger.debug("statement = %s" % statement)        cur = self.cxn.cursor()        cur.execute(statement)        cur.close()        self.cxn.commit()    def deleteitem(self, tblname, dic):        """delete items from table            dic --- a dictionary used to make where statement         """#        name_type = self.gettablecoltype(tblname)        self.logger.debug( "dic = %s" % dic)        self.logger.debug("dic type = %s" % [(type(item),item) for item in dic.values()])        cols = ""        for col in dic.keys():            if dic[col] == None:                continue            if type(dic[col]) == int:                tmp = "%s = %s" % (col, dic[col])            else:                tmp = "%s = '%s'" % (col, dic[col])            if cols == "":                cols = tmp            else:                cols = "%s and %s" % (cols, tmp)#        self.logger.debug( "cols = %s " % cols )#        self.logger.debug( "dic = %s"% dic )        statement = "DELETE FROM %s WHERE %s" % (tblname, cols)        self.logger.debug( "statement = %s " % statement)        cur = self.cxn.cursor()        cur.execute(statement)        cur.close()        self.cxn.commit()    def updateitem(self, tblname, dic, setdic):        """update items from table        dic --- a dictionary of where sub statement as colname:colvalue        setdic --- a dictionary of set sub statement as colname:colvalue        """        cols = ""        for col in dic.keys():            if dic[col] == None:                continue            if type(dic[col]) == int:                tmp = "%s = %s" % (col, dic[col])            else:                tmp = "%s = '%s'" % (col, dic[col])            if cols == "":                cols = tmp            else:                cols = "%s and %s" % (cols, tmp)        self.logger.debug( "cols = %s " % cols)        self.logger.debug( "dic = %s" % dic)        setcols = ""        for col in setdic.keys():            tmp = "%s = '%s'" % (col, setdic[col])            if setcols == "":                setcols = tmp            else:                setcols = "%s ,%s" % (setcols, tmp)        self.logger.debug( "setcols = %s " % setcols )        self.logger.debug( "setdic = %s" % setdic)        statement = "UPDATE %s SET %s WHERE %s" % (tblname, setcols, cols)        self.logger.debug( "statement = %s" % statement)        cur = self.cxn.cursor()        cur.execute(statement)        cur.close()        self.cxn.commit()    def addkey(self, tblname, colname):        """ add a primary key to table"""        cur = self.cxn.cursor()        #get the table create sql statement        cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = '%s'" % tblname)        create_statement = cur.fetchall()[0][0]#        self.logger.debug( "create_statement = %s" % create_statement)        import re        field_statement = re.search(r"\(.*\)", create_statement).group()[1:-1] #Removed both sides of the brackets#        self.logger.debug( "field_statement =", field_statement#        field_list = field_statement.split(",") #re.split("[,\W]",field_statement)#        self.logger.debug( "field_list = %s" % field_list)        keyname = self.get_primary_key(tblname)        if keyname != None:            self.logger.error( "this table already has a key as (%s), if you must add a key, delete this key first"                               % keyname)            return        #this table has no primary key        index = field_statement.find(",", field_statement.find(colname))        field_statement_new = ""        if index >= 0:            field_statement_new = "%s PRIMARY KEY %s" % (field_statement[:index], field_statement[index:])        else:            field_statement_new = "%s PRIMARY KEY" % field_statement        cur.execute("CREATE TEMPORARY TABLE TEMP_TABLE (%s)" % field_statement_new)        cur.execute("INSERT INTO TEMP_TABLE SELECT * FROM %s" % tblname)        cur.execute("DROP TABLE %s" % tblname)        cur.execute("CREATE TABLE %s (%s)" % (tblname, field_statement_new))        cur.execute("INSERT INTO %s SELECT * FROM TEMP_TABLE" % tblname)        cur.execute("DROP TABLE TEMP_TABLE")        cur.close()        self.cxn.commit()    def deletekey(self, tblname):        """ delete primary key from table"""        cur = self.cxn.cursor()        cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = '%s'" % tblname)        create_statement = cur.fetchall()[0][0]#        self.logger.debug( "create_statement = %s" % create_statement)        import re        field_statement = re.search(r"\(.*\)", create_statement).group()[1:-1] #Removed both sides of the brackets#        self.logger.debug( "field_statement = %s" % field_statement)#        field_list = field_statement.split(",") #re.split("[,\W]",field_statement)#        self.logger.debug( "field_list = %s" % field_list)        keyname = self.get_primary_key(tblname)        if keyname == None:            self.logger.info( "this table has no key")            return        else:            indexs = field_statement.upper().find('PRIMARY KEY')            primarykey = field_statement[ indexs : indexs + len('PRIMARY KEY')]            field_statement_new = field_statement.replace(primarykey, "")            self.logger.debug( "field_statement_new = %s" % field_statement_new)            cur.execute("CREATE TEMPORARY TABLE TEMP_TABLE (%s)" % field_statement_new)            cur.execute("INSERT INTO TEMP_TABLE SELECT * FROM %s" % tblname)            cur.execute("DROP TABLE %s" % tblname)            cur.execute("CREATE TABLE %s (%s)" % (tblname, field_statement_new))            cur.execute("INSERT INTO %s SELECT * FROM TEMP_TABLE" % tblname)            cur.execute("DROP TABLE TEMP_TABLE")            cur.close()            self.cxn.commit()    def gettableinfo(self, tablename):        """get the table infomation"""        cur = self.cxn.cursor()        cur.execute("PRAGMA table_info(%s)" % tablename)        info = cur.fetchall()        self.logger.debug( "info = %s" % info)        cur.close()        return info    def gettablecoltype(self, tblname):        """ get the table's colname:coltype dictionary"""        tableinfo = self.gettableinfo(tblname)        name_type = {}        for item in tableinfo:            name_type[item[1]] = item[2]        return name_type    def get_primary_key(self, tblname):        """detective whether table already has a primary key,            if this table already has a primary key, return the key name            if not return None        """        cur = self.cxn.cursor()        #get the table create sql statement        cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = '%s'" % tblname)        create_statement = cur.fetchall()[0][0]        import re        field_statement = re.search(r"\(.*\)", create_statement).group()[1:-1] #Removed both sides of the brackets        field_list = field_statement.split(",") #re.split("[,\W]",field_statement)        for item in field_list:            if item.upper().find('PRIMARY KEY') >= 0:                indexs = item.upper().find('PRIMARY KEY')                primarykey = item[ : indexs]                #self.logger.debug(                 #"this table already has a key as (%s), if you must add a key, delete this key first" % item )                return primarykey        return None

sql_char.py

#!/usr/bin/env python# ------------------------------------------------------------------------------## filename:     sql_char.py# version:      1.0.5# description:  The Command User Interface for sqlite3.# first create: ppxxyy1110# history:#       2012-10-01 | 0.0.1#                    ppxxyy1110 create# ------------------------------------------------------------------------------"""this a sql char interface"""import sqlERRORMSG = 'please enter a correct cmd, you can enter help'DATA_BASE = sql.MyDB()def shell_show():    """ show all tables"""    ret = DATA_BASE.gettables()    print retdef shell_table():    """in this shell you can add,delete,rename a table"""    msgs = {'add':'add--- add a table',            'delete':'delete--- delete a table',            'rename':'rename--- rename a table',            'r':'r--- return to up level menu',            'exit':'exit --- exit'    }    funs = {'add':'add_a_table()',           'delete':'delete_a_table()',           'rename':'rename_a_table()',           'r':'return',           'exit':'exit()',           'help':'myhelp()'           }    def myhelp():        """get help info"""        print "%" + ("="*40) + "%"        for msg in msgs.values():            print msg        print "%" + ("="*40) + "%"    myhelp()    cmd = raw_input("sqlite/table>>")    while True:        if cmd not in funs.keys():            print ERRORMSG        else:            eval(funs[cmd])        cmd = raw_input("sqlite/table>>")def shell_item():    """in this shell you can add,delete,update a item from a table"""    msgs = {'add':'add--- add a item',            'delete':'delete--- delete a item',            'update':'update--- update a item',            'r':'r--- return to up level menu',            'exit':'exit --- exit'            }    funs = {'add':'add_a_item()',           'delete':'delete_a_item()',           'rename':'update_a_item()',           'exit':'exit()',           'help':'myhelp()'           }    def myhelp():        """get help info"""        print "%" + ("="*40) + "%"        for msg in msgs.values():            print msg        print "%" + ("="*40) + "%"    myhelp()    cmd = raw_input("sqlite/item>>")    while True:        if cmd not in funs.keys():            print ERRORMSG        else:            eval(funs[cmd])        cmd = raw_input("sqlite/item>>")def shell_key():    """in this shell you can add,delete a key from a table"""    msgs = {'add':'add--- add primary key',            'delete':'delete--- delete primary key',            'r':'r--- return to up level menu',            'exit':'exit --- exit'            }    funs = {'add':'add_primary_key()',           'delete':'delete_primary_key()',           'exit':'exit()',           'help':'myhelp()'           }    def myhelp():        """get help info"""        print "%" + ("="*40) + "%"        for msg in msgs.values():            print msg        print "%" + ("="*40) + "%"    myhelp()    cmd = raw_input("sqlite/key>>")    while True:        if cmd not in funs.keys():            print ERRORMSG        else:            eval(funs[cmd])        cmd = raw_input("sqlite/key>>")def add_a_table():    """this function allow you add a table to current database"""    newtable = raw_input('please input the new table name')    newtable = newtable.strip()    cols = {}    print 'please input fieldname and type, if no more field, just input end'    while True:        text = raw_input('format filename,type or "end": ')        if text == "end":            break        else:            tmp = text.strip().split(":")            cols[tmp[0]] = tmp[1]    DATA_BASE.addatable(newtable, cols)def delete_a_table():    """this function allow you delete a table from database"""    tblname = raw_input('please input the table name you want to delete')    tblname = tblname.strip()    DATA_BASE.deleteatable(tblname)def rename_a_table():    """rename a table"""    oldname = raw_input('please input the old table name: ')    newname = raw_input('please input the new table name: ')    DATA_BASE.renametablename(oldname, newname)def add_a_item():    """insert a row or item to a table"""    tblname = raw_input('please input the tablename: ')    tableinfo = DATA_BASE.gettableinfo(tblname)    colname = [tp[1] for tp in tableinfo]    coltype = [tp[2] for tp in tableinfo]    print zip(colname, coltype)    dic = {}    i = 0    for col in colname:        val = raw_input("please input the value of %s, if no value just input 'None': " % col)        if val != 'None':            if coltype[i] == 'integer':                dic[col] = int(val)            else:                dic[col] = val        i = i + 1    DATA_BASE.additem(tblname, dic)def delete_a_item():    """delete a row or a item"""    tblname = raw_input('please input the tablename: ')    tableinfo = DATA_BASE.gettableinfo(tblname)    colname = [tp[1] for tp in tableinfo]    coltype = [tp[2] for tp in tableinfo]    print zip(colname, coltype)    dic = {}    i = 0    for col in colname:        val = raw_input("please input the value of %s, if no value just input 'None'" % col)        if val != 'None':            if coltype[i] == 'integer' or coltype[i] == 'int':                dic[col] = int(val)            else:                dic[col] = val        i += 1    DATA_BASE.deleteitem(tblname, dic)def update_a_item():    """update a row or a item"""    tblname = raw_input('please input the tablename: ')    tableinfo = DATA_BASE.gettableinfo(tblname)    colname = [tp[1] for tp in tableinfo]    coltype = [tp[2] for tp in tableinfo]    print zip(colname, coltype)    print "please input the where"    dic = {}    i = 0    for col in colname:        val = raw_input("please input the value of %s, if no value just input 'None': " % col)        if val != 'None':            if coltype[i] == 'integer':                dic[col] = int(val)            else:                dic[col] = val        i = i + 1    #make set    print "please input the set statement"    setdic = {}    i = 0    for col in colname:        val = raw_input("please input the value of %s, if no value just input 'None': " % col)        if val != 'None':            if coltype[i] == 'integer':                setdic[col] = int(val)            else:                setdic[col] = val        i = i + 1    DATA_BASE.updateitem(tblname, dic, setdic)def add_primary_key():    """add a primary key to table"""    tblname = raw_input('please input the tablename: ')    tableinfo = DATA_BASE.gettableinfo(tblname)    colname = [tp[1] for tp in tableinfo]    coltype = [tp[2] for tp in tableinfo]    print zip(colname, coltype)    keyname = DATA_BASE.get_primary_key(tblname)    if keyname != None:        print "table %s already has a key as '%s', if you must add a key, delete this key first" \        % (tblname, keyname)    else:        colname = raw_input("Enter the fieldname as a primary key: ")        DATA_BASE.addkey(tblname, colname)def delete_primary_key():    """delete a primary key from a table"""    tblname = raw_input('please input the tablename: ')    tableinfo = DATA_BASE.gettableinfo(tblname)    colname = [tp[1] for tp in tableinfo]    coltype = [tp[2] for tp in tableinfo]    print zip(colname, coltype)    DATA_BASE.deletekey(tblname)def shell_top():    """if this module run as top module this method would be called"""    import sys    database = DATA_BASE    if len(sys.argv) < 2:        dbname = raw_input("Enter a database name: ")    else:        dbname = sys.argv[1]    database.connect_db(dbname)    msgs = {'show':'show --- show all tables',            'table':'table --- add,delete,rename a table',            'item':'item --- add,delete,update a item in a table',            'key':'key --- add,delete a key',            'exit':'exit --- exit'            }    funs = {'show':'shell_show()',           'table':'shell_table()',           'item':'shell_item()',           'key':'shell_key()',           'exit':'exit()',           'help':'myhelp()'           }    def myhelp():        """get help info"""        print "%" + ("="*40) + "%"        for msg in msgs.values():            print msg        print "%" + ("="*40) + "%"    myhelp()    while True:        cmd = raw_input("sqlite>>")        if cmd not in funs.keys():            print ERRORMSG        else:            eval(funs[cmd])if __name__ == "__main__":    shell_top()

Screenshot from 2012-10-20 21:37:44.png

imgs/asCode/20213929_T7C9.png

评论关闭