Last active
September 9, 2017 09:43
-
-
Save nskeip/333855f0247b1aade4b68617b26275fa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import sys | |
import sqlite3 | |
import sqlparse | |
from PyQt5.QtWidgets import * | |
from PyQt5.QtCore import QAbstractTableModel, Qt | |
class SQLite3TableModel(QAbstractTableModel): | |
def __init__(self, db, queriesText, *argv, **kwargs): | |
super().__init__(*argv, **kwargs) | |
# We set _rowCount and _columnCount to 0 | |
# assuming that zero values are for non-select queries | |
# and we will not display anything. | |
# If there is at least one select query in the text, | |
# these values will be changed to None | |
# (see the cycle below) | |
self.lastSelectQuery = None | |
self.lastSelectQueryIsExecuted = False | |
self._rowCount = 0 | |
self._columnCount = 0 | |
self.db = db | |
self.cur = db.cursor() | |
self._data = [] | |
self._maxRowFetched = 0 | |
self.lastError = None | |
self._headerData = None | |
manyQueries = sqlparse.parse(queriesText) | |
for parsedQuery in manyQueries: | |
query = self.removeSemicolonAndTrailingSpaces(str(parsedQuery)) | |
if self.queryIsSelect(query): | |
# if it is select: | |
# we will not execute it | |
# | |
# None in _rowCount, _rowCount will be the mark that | |
# the numbers of rows/cols are yet to be defined | |
# and _queryIsSelect will mean that there is likely something to show | |
self.lastSelectQuery = query | |
self._rowCount = None | |
self._columnCount = None | |
else: | |
try: | |
self.cur.execute(query) | |
self.db.commit() # and we can commit a non-select transaction now | |
except sqlite3.OperationalError as e: | |
self.lastError = e | |
break | |
@staticmethod | |
def removeSemicolonAndTrailingSpaces(query): | |
return re.sub(r"\s*;\s*$", "", query.strip()) | |
@staticmethod | |
def queryIsSelect(query): | |
try: | |
return str(sqlparse.parse(query)[0].tokens[0].value).lower() == 'select' | |
except IndexError: | |
return False | |
@staticmethod | |
def queryIsSelect(query): | |
try: | |
return str(sqlparse.parse(query)[0].tokens[0].value).lower() == 'select' | |
except IndexError: | |
return False | |
def data(self, index, role): | |
if self.lastSelectQuery is not None and role == Qt.DisplayRole: | |
# check if we already have the needed data in memory | |
# we will need to load data if | |
# - NO data has been cached in self._data | |
# or | |
# - there are some rows to fetch from DB | |
# and the index requested by index.row() | |
# is more than that one we have fetched before | |
if not self._data \ | |
or (self._rowCount > self._maxRowFetched | |
and index.row() >= self._maxRowFetched): | |
# how many rows are left to fetch from the table? | |
rowsLeft = self._rowCount - self._maxRowFetched | |
# how many rows to fetch does index wants from us? | |
rowsIndexWants = index.row() + 1 - self._maxRowFetched | |
# number of records we will load | |
numberToLoad = min(rowsLeft, rowsIndexWants) | |
if not self.lastSelectQueryIsExecuted: | |
self.cur.execute(self.lastSelectQuery) | |
self.lastSelectQueryIsExecuted = True | |
self._data += self.cur.fetchmany(size=numberToLoad) | |
self._maxRowFetched += numberToLoad | |
return self._data[index.row()][index.column()] | |
def rowCount(self, parent): | |
if self._rowCount is None: | |
try: | |
# we need to use separate cursor here | |
_cur = self.db.cursor() | |
_cur.execute("SELECT COUNT (*) FROM (%s)" % self.lastSelectQuery) | |
self._rowCount = _cur.fetchone()[0] | |
except sqlite3.OperationalError as e: | |
self.lastError = e | |
self._rowCount = 0 | |
return self._rowCount | |
def columnCount(self, parent): | |
if self._columnCount is None: | |
if self.rowCount(parent) == 0: | |
self._columnCount = 0 # if no rows fetched, show no columns | |
else: | |
# we need to use separate cursor here | |
_cur = self.db.cursor() | |
_cur.execute(self.lastSelectQuery) | |
fetched = _cur.fetchone() | |
if fetched: | |
self._columnCount = len(fetched) | |
else: | |
self._columnCount = 0 | |
return self._columnCount | |
def headerData(self, section, orientation, role=None): | |
if self.lastSelectQuery is not None and \ | |
orientation == Qt.Horizontal and \ | |
role == Qt.DisplayRole: | |
if self._headerData is None: | |
_cur = self.db.cursor() | |
_cur.execute(self.lastSelectQuery) | |
self._headerData = [d[0] for d in _cur.description] | |
return self._headerData[section] | |
return super().headerData(section,orientation,role) | |
class SqlExecutor(QWidget): | |
def __init__(self, *argv, **kwargs): | |
super().__init__(*argv, **kwargs) | |
self.con = None | |
self.initUI() | |
def center(self): | |
qr = self.frameGeometry() | |
cp = QDesktopWidget().availableGeometry().center() | |
qr.moveCenter(cp) | |
self.move(qr.topLeft()) | |
def initUI(self): | |
self.setGeometry(300, 300, 600, 400) | |
self.setWindowTitle('Sqlite query executor') | |
self.center() | |
grid = QGridLayout() | |
grid.setSpacing(10) | |
# connection string: label, input and button | |
connectionStringLabel = QLabel('Connection string:') | |
connectionStringEdit = QLineEdit('books.db') | |
self.connectionStringEdit = connectionStringEdit | |
connectButton = QPushButton('Connect', self) | |
connectButton.setCheckable(True) | |
connectButton.setDefault(False) | |
connectButton.clicked.connect(self.connectButtonClicked) | |
self.connectButton = connectButton | |
grid.addWidget(connectionStringLabel, 1, 0) | |
grid.addWidget(connectionStringEdit, 1, 1) | |
grid.addWidget(connectButton, 1, 2) | |
# sql query string: label, input and button | |
sqlLabel = QLabel('Query text:') | |
sqlEdit = QPlainTextEdit('select * from books') | |
self.sqlEdit = sqlEdit | |
executeButton = QPushButton('Run', self) | |
executeButton.setDefault(True) | |
executeButton.clicked.connect(self.executeButtonClicked) | |
self.executeButton = executeButton | |
self.setReadyToTakeConnectionString(True) | |
self.setReadyToTakeSQLText(False) | |
grid.addWidget(sqlLabel, 2, 0) | |
grid.addWidget(sqlEdit, 2, 1) | |
grid.addWidget(executeButton, 2, 2) | |
table = QTableView() | |
grid.addWidget(table, 3, 0, -1, -1) | |
self.table = table | |
self.setLayout(grid) | |
self.show() | |
def isConnected(self): | |
return self.con is not None | |
def createConnect(self, dbname): | |
self.con = sqlite3.connect(dbname) | |
self.setReadyToTakeSQLText(True) | |
def closeConnect(self): | |
self.con.close() | |
self.con = None | |
self.setReadyToTakeSQLText(False) | |
def setReadyToTakeSQLText(self, b): | |
self.sqlEdit.setEnabled(b) | |
self.executeButton.setEnabled(b) | |
def setReadyToTakeConnectionString(self, b): | |
self.connectionStringEdit.setEnabled(b) | |
def connectButtonClicked(self): | |
if not self.isConnected(): | |
# connected | |
self.createConnect(self.connectionStringEdit.text()) | |
else: | |
# disconnected | |
self.closeConnect() | |
self.setReadyToTakeConnectionString(not self.isConnected()) | |
self.setReadyToTakeSQLText(self.isConnected()) | |
def mb(self, title, message, icon=QMessageBox.Information): | |
msg = QMessageBox() | |
msg.setIcon(icon) | |
msg.setWindowTitle(title) | |
msg.setText(message) | |
msg.exec_() | |
def executeButtonClicked(self): | |
model = SQLite3TableModel(self.con, self.sqlEdit.toPlainText()) | |
self.table.setModel(model) | |
if model.lastError is not None: | |
self.mb("Error", str(model.lastError), QMessageBox.Critical) | |
else: | |
self.mb("Query status", "Success") | |
if __name__ == '__main__': | |
app = QApplication(sys.argv) | |
ex = SqlExecutor() | |
sys.exit(app.exec_()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment