Last active
October 14, 2022 07:54
-
-
Save lefred/73fa7ed87acf3c9c4510c21a63850ef5 to your computer and use it in GitHub Desktop.
Fn Application to store slow query log to Object Storage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import json | |
import logging | |
import oci | |
import base64 | |
from mysql.connector import connection | |
from datetime import datetime | |
from fdk import response | |
def handler(ctx, data: io.BytesIO = None): | |
try: | |
cfg = ctx.Config() | |
obs_bucket = cfg["bucket"] | |
obs_namespace = cfg["namespace"] | |
oci_fingerprint = cfg["oci_fingerprint"] | |
oci_region = cfg["oci_region"] | |
oci_user = cfg["oci_user"] | |
oci_tenancy = cfg["oci_tenancy"] | |
oci_key = cfg["oci_key"] | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Missing configuration key", ex) | |
raise | |
try: | |
body = json.loads(data.getvalue()) | |
mds_host = body.get("mds_host") | |
mds_port = body.get("mds_port") | |
mds_user = body.get("mds_user") | |
mds_pwd = body.get("mds_password") | |
mds_name = body.get("mds_name") | |
slowlog_name = mds_name.replace(".", "_") | |
slowlog_name = slowlog_name.replace(" ", "_") | |
slowlog_name = "slow_{}_{}.json".format( | |
slowlog_name, datetime.utcnow().strftime("%Y%m%d%H%M") | |
) | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Missing parameter", ex) | |
raise | |
try: | |
with open("/tmp/.oci_config", "w") as f: | |
f.write("[DEFAULT]\n") | |
f.write("user={}\n".format(oci_user)) | |
f.write("fingerprint={}\n".format(oci_fingerprint)) | |
f.write("tenancy={}\n".format(oci_tenancy)) | |
f.write("region={}\n".format(oci_region)) | |
f.write("key_file=/tmp/key.pem\n") | |
with open("/tmp/key.pem", "w") as g: | |
g.write(base64.b64decode(oci_key.encode("ascii")).decode("ascii")) | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Problem creating OCI config", ex) | |
raise | |
logging.getLogger().info("Inside Python MySQL Slow Log function") | |
try: | |
cnx = connection.MySQLConnection( | |
user=mds_user, | |
password=mds_pwd, | |
host=mds_host, | |
port=mds_port, | |
database="performance_schema", | |
) | |
cursor = cnx.cursor(dictionary=True) | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Problem connecting to MySQL", ex) | |
raise | |
query = "update performance_schema.setup_consumers set ENABLED='YES' where NAME like 'events_statements_histo%'" | |
logging.getLogger().info("Enabling consumers") | |
try: | |
cursor.execute(query) | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Problem executing the query", ex) | |
raise | |
query = """select *, concat(date_sub(now(), | |
INTERVAL ( | |
select VARIABLE_VALUE from performance_schema.global_status | |
where variable_name='UPTIME')-TIMER_START*10e-13 second)) start_time, | |
concat(timer_wait/1e+9) timer_wait_ms, concat(round(timer_wait/1e+12,6)) timer_wait_s, | |
concat(round(lock_time/1e+12,6)) lock_time_s, | |
format_pico_time(timer_wait) wait_human, | |
concat(round(unix_timestamp(date_sub(now(),INTERVAL ( | |
select VARIABLE_VALUE from performance_schema.global_status | |
where variable_name='UPTIME')-TIMER_START*10e-13 second)))) timestamp_rnd, | |
concat(unix_timestamp(date_sub(now(),INTERVAL ( | |
select VARIABLE_VALUE from performance_schema.global_status | |
where variable_name='UPTIME')-TIMER_START*10e-13 second))) timestamp | |
from performance_schema.events_statements_history_long""" | |
cursor.execute(query) | |
logging.getLogger().info("Query to retrieve all statements") | |
content_log = json.dumps(cursor.fetchall()) | |
logging.getLogger().info( | |
"Data fetched from Performance_Schema... let's truncate it now" | |
) | |
query = "truncate table performance_schema.events_statements_history_long" | |
cursor.execute(query) | |
config = oci.config.from_file("/tmp/.oci_config") | |
logging.getLogger().info("Connecting to Object Storage") | |
object_storage_client = oci.object_storage.ObjectStorageClient(config) | |
namespace = object_storage_client.get_namespace().data | |
logging.getLogger().info("Storing to Object Storage {}".format(slowlog_name)) | |
object_storage_client.put_object(namespace, obs_bucket, slowlog_name, content_log) | |
return response.Response( | |
ctx, | |
response_data=json.dumps( | |
{"message": "MySQL Slow Log saved: {}".format(slowlog_name)} | |
), | |
headers={"Content-Type": "application/json"}, | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import json | |
import logging | |
import oci | |
import base64 | |
from mysql.connector import connection | |
from datetime import datetime | |
from fdk import response | |
def handler(ctx, data: io.BytesIO = None): | |
try: | |
cfg = ctx.Config() | |
obs_bucket = cfg["bucket"] | |
obs_namespace = cfg["namespace"] | |
oci_fingerprint = cfg["oci_fingerprint"] | |
oci_region = cfg["oci_region"] | |
oci_user = cfg["oci_user"] | |
oci_tenancy = cfg["oci_tenancy"] | |
oci_key = cfg["oci_key"] | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Missing configuration key", ex) | |
raise | |
try: | |
body = json.loads(data.getvalue()) | |
mds_host = body.get("mds_host") | |
mds_port = body.get("mds_port") | |
mds_user = body.get("mds_user") | |
mds_pwd = body.get("mds_password") | |
mds_name = body.get("mds_name") | |
slowlog_name = mds_name.replace(".", "_") | |
slowlog_name = slowlog_name.replace(" ", "_") | |
slowlog_name = "slow_{}_{}.log".format( | |
slowlog_name, datetime.utcnow().strftime("%Y%m%d%H%M") | |
) | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Missing parameter", ex) | |
raise | |
try: | |
with open("/tmp/.oci_config", "w") as f: | |
f.write("[DEFAULT]\n") | |
f.write("user={}\n".format(oci_user)) | |
f.write("fingerprint={}\n".format(oci_fingerprint)) | |
f.write("tenancy={}\n".format(oci_tenancy)) | |
f.write("region={}\n".format(oci_region)) | |
f.write("key_file=/tmp/key.pem\n") | |
with open("/tmp/key.pem", "w") as g: | |
g.write(base64.b64decode(oci_key.encode("ascii")).decode("ascii")) | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Problem creating OCI config", ex) | |
raise | |
logging.getLogger().info("Inside Python MySQL Slow Log function") | |
try: | |
cnx = connection.MySQLConnection( | |
user=mds_user, | |
password=mds_pwd, | |
host=mds_host, | |
port=mds_port, | |
database="performance_schema", | |
) | |
cursor = cnx.cursor(dictionary=True) | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Problem connecting to MySQL", ex) | |
raise | |
query = "update performance_schema.setup_consumers set ENABLED='YES' where NAME like 'events_statements_histo%'" | |
logging.getLogger().info("Enabling consumers") | |
try: | |
cursor.execute(query) | |
except (Exception, ValueError) as ex: | |
logging.getLogger().error("ERROR: Problem executing the query", ex) | |
raise | |
query = """select *, concat(date_sub(now(), | |
INTERVAL ( | |
select VARIABLE_VALUE from performance_schema.global_status | |
where variable_name='UPTIME')-TIMER_START*10e-13 second)) start_time, | |
concat(timer_wait/1e+9) timer_wait_ms, concat(round(timer_wait/1e+12,6)) timer_wait_s, | |
concat(round(lock_time/1e+12,6)) lock_time_s, | |
format_pico_time(timer_wait) wait_human, | |
concat(round(unix_timestamp(date_sub(now(),INTERVAL ( | |
select VARIABLE_VALUE from performance_schema.global_status | |
where variable_name='UPTIME')-TIMER_START*10e-13 second)))) timestamp_rnd, | |
concat(unix_timestamp(date_sub(now(),INTERVAL ( | |
select VARIABLE_VALUE from performance_schema.global_status | |
where variable_name='UPTIME')-TIMER_START*10e-13 second))) timestamp | |
from performance_schema.events_statements_history_long""" | |
cursor.execute(query) | |
logging.getLogger().info("Query to retrieve all statements") | |
content_log = "" | |
rows = cursor.fetchall() | |
logging.getLogger().info( | |
"Data fetched from Performance_Schema... let's truncate it now" | |
) | |
query = "truncate table performance_schema.events_statements_history_long" | |
cursor.execute(query) | |
for row in rows: | |
if row["SQL_TEXT"]: | |
log_time = datetime.strptime(row["start_time"], "%Y-%m-%d %H:%M:%S.%f") | |
content_log = content_log + "# Time: {}Z\n".format(log_time.isoformat("T")) | |
content_log = content_log + "# User@Host: n/a [] @ n/a [] Id: {}\n".format( | |
row["THREAD_ID"] | |
) | |
content_log = ( | |
content_log | |
+ "# Query_time: {} Lock_time: {} Rows_sent: {} Rows_examined: {} Rows_affected: {}\n".format( | |
row["timer_wait_s"], | |
row["lock_time_s"], | |
row["ROWS_SENT"], | |
row["ROWS_EXAMINED"], | |
row["ROWS_AFFECTED"], | |
) | |
) | |
content_log = ( | |
content_log | |
+ "# Bytes_sent: n/a Tmp_tables: {} Tmp_disk_tables: {} Tmp_table_sizes: n/a\n".format( | |
row["CREATED_TMP_TABLES"], row["CREATED_TMP_DISK_TABLES"] | |
) | |
) | |
content_log = ( | |
content_log | |
+ "# Full_scan: {} Full_join: {} Tmp_table: {} Tmp_table_on_disk: {}\n".format( | |
["no", "yes"][int(row["SELECT_SCAN"]) > 0], | |
["no", "yes"][int(row["SELECT_FULL_JOIN"]) > 0], | |
["no", "yes"][int(row["CREATED_TMP_TABLES"]) > 0], | |
["no", "yes"][int(row["CREATED_TMP_DISK_TABLES"]) > 0], | |
) | |
) | |
content_log = ( | |
content_log | |
+ "# Merge_passes: {} Execution_engine: {}\n".format( | |
row["SORT_MERGE_PASSES"], row["EXECUTION_ENGINE"] | |
) | |
) | |
content_log = ( | |
content_log | |
+ "# No_index_used: {} Cpu_time: {}\n".format( | |
["no", "yes"][int(row["NO_INDEX_USED"]) > 0], | |
row["CPU_TIME"], | |
) | |
) | |
if 'MAX_TOTAL_MEMORY' in row.keys(): | |
content_log = ( | |
content_log | |
+ "# Max_memory: {}\n".format( | |
row["MAX_TOTAL_MEMORY"], | |
) | |
) | |
content_log = content_log + "SET timestamp={};\n".format( | |
row["timestamp_rnd"] | |
) | |
content_log = content_log + "{};\n".format(row["SQL_TEXT"]) | |
config = oci.config.from_file("/tmp/.oci_config") | |
logging.getLogger().info("Connecting to Object Storage") | |
object_storage_client = oci.object_storage.ObjectStorageClient(config) | |
namespace = object_storage_client.get_namespace().data | |
logging.getLogger().info("Storing to Object Storage {}".format(slowlog_name)) | |
object_storage_client.put_object(namespace, obs_bucket, slowlog_name, content_log) | |
return response.Response( | |
ctx, | |
response_data=json.dumps( | |
{"message": "MySQL Slow Log saved: {}".format(slowlog_name)} | |
), | |
headers={"Content-Type": "application/json"}, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment