Skip to content

Instantly share code, notes, and snippets.

@Daniel-Abrecht
Last active February 4, 2024 20:51
Show Gist options
  • Save Daniel-Abrecht/a0baffe3025bfadefbc8b45512b9ff1d to your computer and use it in GitHub Desktop.
Save Daniel-Abrecht/a0baffe3025bfadefbc8b45512b9ff1d to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import sys
import json
import pymysql
from datetime import datetime, timezone
mariadb=pymysql
a = [*sys.argv[1:]]
ignore = False
update = False
format = None
usage = False
while a and a[0][0] == '-':
o = a.pop(0)
if o == '--':
break
if o == '--ignore' or o == '-i':
ignore = True
elif o == '--update' or o == '-u':
update = True
elif o == '--json' or o == '-j':
format = 'json'
elif o.startswith('--format='):
format = o[9:]
else:
usage = True
r = None
if '--' in a:
i = a.index('--')
r = a[i+1:]
a = a[0:i]
if usage or len(a) < 5 or len(a) % 2 != 1 or ( update and ignore ) or format not in ('json','keyvalue','value','tsv',None):
sys.exit("Usage: "+sys.argv[0]+" [--update|--ignore|--format=[json|keyvalue|value|tsv]] [--] host user password db table [key value]... [-- key1 key2...]")
conf={
'host': a.pop(0),
'user': a.pop(0),
'password': a.pop(0),
'database': a.pop(0),
'autocommit': True,
}
table = a.pop(0).replace('`','').replace('\\','')
keys = ['`'+x.replace('`','').replace('\\','')+'`' for x in a[::2]]
if r != None:
if not format:
format='value'
r = ['`'+x.replace('`','').replace('\\','')+'`' for x in r]
else:
if not format:
format='json'
r = ['*']
if format == 'value':
cu = pymysql.cursors.Cursor
else:
cu = pymysql.cursors.DictCursor
dku = ''
if ignore and len(keys):
dku = ' ON DUPLICATE KEY UPDATE '+keys[0]+'='+keys[0]
if update and len(keys):
dku = ' ON DUPLICATE KEY UPDATE '+(','.join(k+'=VALUES('+k+')' for k in keys))
sql = (
'INSERT INTO `'+table+'` (' +
','.join(keys) +
') VALUES (' +
','.join(['%s' for x in a[1::2]]) +
')' + dku + (' RETURNING ' + ','.join(r) if r else '')
)
def str_serialize(x):
if isinstance(x, datetime):
return x.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
if isinstance(x, bytes):
return x.decode('utf-8',"replace")
return str(x)
def bin_serialize(x):
if isinstance(x, datetime):
x = x.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
if isinstance(x, str):
return x.encode('utf-8')
if isinstance(x, bytes):
return x;
return str(x).encode('utf-8')
def dump_results(x):
i = -1
for result in x:
i += 1
if result:
match format:
case 'json':
json.dump(result, sys.stdout, default=str_serialize)
print()
case 'value':
if i:
sys.stdout.buffer.write(b'\n')
for v in result:
sys.stdout.buffer.write(bin_serialize(v).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t')+b'\n')
case 'keyvalue':
if i:
sys.stdout.buffer.write(b'\n')
for k, v in result.items():
sys.stdout.buffer.write( bin_serialize(k).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t')
+ b'\t' + bin_serialize(v).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t')
+ b'\n'
)
case 'tsv':
s=''
if not i:
sys.stdout.buffer.write((b'\t'.join(bin_serialize(k).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t') for k in result.keys()))+b'\n')
sys.stdout.buffer.write((b'\t'.join(bin_serialize(v).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t') for v in result.values()))+b'\n')
with pymysql.connect(**conf) as db:
with db.cursor(cu) as c:
c.execute(sql, a[1::2])
dump_results(c.fetchall())
#!/usr/bin/python3
import sys
import json
import pymysql
from datetime import datetime, timezone
mariadb=pymysql
a = [*sys.argv[1:]]
format = 'json'
usage = False
while a and a[0][0] == '-':
o = a.pop(0)
if o == '--':
break
if o == '--json' or o == '-j':
format = 'json'
elif o.startswith('--format='):
format = o[9:]
else:
usage=True
r = None
if '--' in a:
i = a.index('--')
r = a[i+1:]
a = a[0:i]
if usage or len(a) < 5 or format not in ('json','keyvalue','value','tsv'):
sys.exit("Usage: "+sys.argv[0]+" [--format=[json|keyvalue|value|csv]] [--] host user password db query [param...]" )
conf={
'host': a.pop(0),
'user': a.pop(0),
'password': a.pop(0),
'database': a.pop(0),
'autocommit': True,
}
sql = a.pop(0)
if format == 'value':
cu = pymysql.cursors.Cursor
else:
cu = pymysql.cursors.DictCursor
def str_serialize(x):
if isinstance(x, datetime):
return x.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
if isinstance(x, bytes):
return x.decode('utf-8',"replace")
return str(x)
def bin_serialize(x):
if isinstance(x, datetime):
x = x.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
if isinstance(x, str):
return x.encode('utf-8')
if isinstance(x, bytes):
return x;
return str(x).encode('utf-8')
def dump_results(x):
i = -1
for result in x:
i += 1
if result:
match format:
case 'json':
json.dump(result, sys.stdout, default=str_serialize)
print()
case 'value':
if i:
sys.stdout.buffer.write(b'\n')
for v in result:
sys.stdout.buffer.write(bin_serialize(v).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t')+b'\n')
case 'keyvalue':
if i:
sys.stdout.buffer.write(b'\n')
for k, v in result.items():
sys.stdout.buffer.write( bin_serialize(k).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t')
+ b'\t' + bin_serialize(v).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t')
+ b'\n'
)
case 'tsv':
s=''
if not i:
sys.stdout.buffer.write((b'\t'.join(bin_serialize(k).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t') for k in result.keys()))+b'\n')
sys.stdout.buffer.write((b'\t'.join(bin_serialize(v).replace(b'\\',b'\\\\').replace(b'\n',b'\\n').replace(b'\t',b'\\t') for v in result.values()))+b'\n')
with pymysql.connect(**conf) as db:
with db.cursor(cu) as c:
c.execute("SET sql_mode = TRIM(BOTH ',' FROM REPLACE(REPLACE(@@sql_mode,'STRICT_TRANS_TABLES',''),',,',','));")
c.execute(sql.replace('?', '%s'), a)
dump_results(c.fetchall())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment