Last active
December 6, 2021 00:35
-
-
Save rtempleton/d0df58383831cf22f6b9c7b2c46374f0 to your computer and use it in GitHub Desktop.
Source code for WITSML processing in Snowflake referenced in
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--set up your environment | |
create database my_test_db; | |
create schema my_test_db.witsml; | |
use schema my_test_db.witsml; | |
--create the staging table where all WITSML files are loaded to by Snowpipe | |
create table witsml_temp (col1 variant); | |
--define the stage object where the witsml file will be uploaded to | |
CREATE OR REPLACE STAGE my_test_db.witsml.witsml_stage | |
URL='<PutYourS3BucketPathHere>' | |
CREDENTIALS=(aws_key_id='xxxx' | |
aws_secret_key='xxxx'); | |
--define the pipe object used in conjunction with Snowpipe to load the file | |
create or replace pipe witsml_pipe as copy into witsml_temp from @my_test_db.witsml.witsml_stage FILE_FORMAT = (type = XML); | |
--create a stream on the staging table | |
create or replace stream witsml_stream on table witsml_temp; | |
--create the WELLS table | |
create or replace TABLE WELLS ( | |
WELL_UID STRING, | |
WELL_NAME STRING, | |
TIME_ZONE STRING, | |
CREATION_TIME TIMESTAMP_NTZ(9), | |
LAST_CHANGE_TIME TIMESTAMP_NTZ(9), | |
constraint WELL_PK primary key (WELL_UID) | |
); | |
--create the WELLBORES table | |
create or replace TABLE WELLBORES ( | |
WELLBORE_UID STRING, | |
WELLBORE_NAME STRING, | |
WELL_UID STRING, | |
IS_ACTIVE BOOLEAN, | |
CREATION_TIME TIMESTAMP_NTZ(9), | |
LAST_CHANGE_TIME TIMESTAMP_NTZ(9), | |
constraint WELLBORE_PK primary key (WELLBORE_UID), | |
constraint WELL_FK foreign key (WELL_UID) references MY_TEST_DB.WITSML.WELLS(WELL_UID) | |
); | |
--create the LOG_HEADER table | |
create or replace TABLE LOG_HEADER ( | |
LOG_UID STRING, | |
WELLBORE_UID STRING, | |
LOG_CREATIONDATE TIMESTAMP_NTZ(9), | |
LOG_DIRECTION STRING, | |
LOG_ENDDATETIMEINDEX TIMESTAMP_NTZ(9), | |
LOG_INDEXCURVE STRING, | |
LOG_INDEXTYPE STRING, | |
LOG_NAME STRING, | |
LOG_NAMEWELL STRING, | |
LOG_NAMEWELLBORE STRING, | |
LOG_OBJECTGROWING STRING, | |
LOG_RUNNUMBER STRING, | |
LOG_SERVICECOMPANY STRING, | |
LOG_STARTDATETIMEINDEX TIMESTAMP_NTZ(9), | |
constraint LOG_HEADER_PK primary key (LOG_UID), | |
constraint WELLBORES_FK foreign key (WELLBORE_UID) references MY_TEST_DB.WITSML.WELLBORES(WELLBORE_UID) | |
); | |
--create LOG_CURVE table | |
create or replace TABLE LOG_CURVE ( | |
LOG_UID STRING, | |
LOG_LOGCURVE_UID STRING, | |
LOG_LOGCURVE_MNEMONIC STRING, | |
LOG_LOGCURVE_UNIT STRING, | |
LOG_LOGCURVE_MINDATETIMEINDEX TIMESTAMP_NTZ(9), | |
LOG_LOGCURVE_MAXDATETIMEINDEX TIMESTAMP_NTZ(9), | |
LOG_LOGCURVE_CURVEDESCRIPTION STRING, | |
LOG_LOGCURVE_DATASOURCE STRING, | |
LOG_LOGCURVE_TYPELOGDATA STRING, | |
constraint LOG_CURVE_PK primary key (LOG_UID, LOG_LOGCURVE_UID), | |
constraint LOG_HEADER_FK foreign key (LOG_UID) references MY_TEST_DB.WITSML.LOG_HEADER(LOG_UID) | |
); | |
--create LOG_DATA table | |
create or replace TABLE LOG_DATA ( | |
LOG_UID STRING, | |
LOG_DATA OBJECT, | |
constraint LOG_DATA_FK foreign key (LOG_UID) references MY_TEST_DB.WITSML.LOG_HEADER(LOG_UID) | |
); | |
--create the UDF we use to reconstruct the LOG_DATA details with its mnemonic | |
create or replace function wits_log_combiner(A string, B string) | |
returns object | |
language javascript | |
as | |
$$ | |
var keys = A.split(','); | |
var values = B.split(','); | |
return keys.reduce((obj, key, index) => ({ ...obj, [key]: values[index] }), {}); | |
$$ | |
; | |
--create the stored proc to generate the views over the LOG_DATA table | |
create or replace procedure wits_view_generator(log_uid string, autogen boolean) | |
returns string | |
language javascript | |
comment = 'Generates a view over the LOG_DATA table for a given log_uid' | |
as | |
$$ | |
var name = LOG_UID.replace(/-/g,'_'); | |
var view = "create or replace view "+name+"_vw as select \n" | |
var res = snowflake.execute( {sqlText: "select distinct LOG_LOGCURVE_MNEMONIC, LOG_LOGCURVE_TYPELOGDATA from log_curve where log_uid = '"+LOG_UID+"' order by LOG_LOGCURVE_MNEMONIC"} ); | |
while(res.next()){ | |
var nm = res.getColumnValue(1).replace(/[-\(\)]/g,'_');; | |
var dt = res.getColumnValue(2); | |
switch(dt) { | |
case 'double': | |
var view = view.concat(" to_double(iff(log_data:"+nm+"::string='', null, log_data:"+nm+"::string)) "+nm+", \n"); | |
break; | |
case 'date time': | |
var view = view.concat(" log_data:"+nm+"::timestamp "+nm+", \n"); | |
break; | |
default: | |
var view = view.concat(" log_data:"+nm+"::string "+nm+", \n"); | |
} | |
} | |
var view = view.substring(0, view.length-3); | |
var view = view.concat("\n from log_data where log_uid = \'"+LOG_UID+"\'"); | |
if(AUTOGEN) | |
snowflake.execute( {sqlText: view} ); | |
return view; | |
$$ | |
; | |
--create the stored proc to automate the processing of the staging data to the each target table by file type | |
create or replace procedure witsml_process() | |
returns string | |
language javascript | |
EXECUTE AS CALLER | |
as | |
$$ | |
//we have to make multiple passes over the stream data so we first need to copy this off to a temp table | |
//use the action='INSERT' so we occasionally truncate the stage table without reprocessing the data there | |
snowflake.execute( {sqlText: `create or replace temp table foo as select col1 from witsml_stream where metadata$action='INSERT'`} ); | |
var res = snowflake.execute( {sqlText: `select current_warehouse()`} ); | |
while(res.next()){ | |
var wh = res.getColumnValue(1); | |
snowflake.createStatement( { sqlText: `alter warehouse ` + wh + ` set warehouse_size = 'large'` } ).execute(); //dynamically resize the cluster up | |
} | |
//process the well records | |
snowflake.execute( {sqlText:` | |
insert into wells( | |
with wellrecs as ( | |
select col1 from foo where col1 like '<wells %' | |
) | |
SELECT | |
wells.value:\"@uid\"::string well_uid | |
,XMLGET(wells.value, 'name'):\"$\"::string well_name | |
,XMLGET(wells.value, 'timeZone'):\"$\"::string time_zone | |
,XMLGET(well.value, 'dTimCreation'):\"$\"::datetime creation_time | |
,XMLGET(well.value, 'dTimLastChange'):\"$\"::datetime last_change_time | |
FROM wellrecs | |
,lateral flatten (col1) wells | |
,lateral flatten(wells.value:\"$\") well | |
WHERE wells.key = '$' | |
AND well.value:\"@\" = 'commonData')` } ); | |
//process the wellbore records | |
snowflake.execute( {sqlText:` | |
insert into wellbores( | |
with wellborerecs as ( | |
select col1 from foo where col1 like '<wellbores %' | |
) | |
SELECT | |
wellbores.value:\"@uid\"::string wellbore_uid | |
,XMLGET(wellbores.value, 'name'):\"$\"::string wellbore_name | |
,wellbores.value:\"@uidWell\"::string well_uid | |
,XMLGET(wellbores.value, 'isActive'):\"$\"::boolean is_active | |
,XMLGET(wellbore.value, 'dTimCreation'):\"$\"::datetime creation_time | |
,XMLGET(wellbore.value, 'dTimLastChange'):\"$\"::datetime last_change_time | |
FROM wellborerecs | |
,lateral flatten (col1) wellbores | |
,lateral flatten(wellbores.value:\"$\") wellbore | |
WHERE wellbores.key = '$' | |
AND wellbore.value:\"@\" = 'commonData')` } ); | |
//process the log_header records | |
snowflake.execute( {sqlText:` | |
insert into log_header( | |
with logrecs as ( | |
select col1 from foo where col1 like '<logs %' | |
) | |
SELECT | |
logs_header.value:\"@uid\"::string log_uid | |
,logs_header.value:\"@uidWellbore\"::string wellbore_uid | |
,XMLGET(logs_header.value,'creationDate'):\"$\"::datetime log_creationDate | |
,XMLGET(logs_header.value,'direction'):\"$\"::string log_direction | |
,XMLGET(logs_header.value,'endDateTimeIndex'):\"$\"::datetime log_endDateTimeIndex | |
,XMLGET(logs_header.value,'indexCurve'):\"$\"::string log_indexCurve | |
,XMLGET(logs_header.value,'indexType'):\"$\"::string log_indexType | |
,XMLGET(logs_header.value,'name'):\"$\"::string log_name | |
,XMLGET(logs_header.value,'nameWell'):\"$\"::string log_nameWell | |
,XMLGET(logs_header.value,'nameWellbore'):\"$\"::string log_nameWellbore | |
,XMLGET(logs_header.value,'objectGrowing'):\"$\"::string log_objectGrowing | |
,XMLGET(logs_header.value,'runNumber'):\"$\"::string log_runNumber | |
,XMLGET(logs_header.value,'serviceCompany'):\"$\"::string log_serviceCompany | |
,XMLGET(logs_header.value,'startDateTimeIndex'):\"$\"::datetime log_startDateTimeIndex | |
FROM logrecs | |
,lateral flatten (col1) logs_header | |
WHERE logs_header.value:\"@\"::string in ('log'))` } ); | |
//process the log_curve records | |
snowflake.execute( {sqlText:` | |
insert into log_curve ( | |
with logrecs as ( | |
select col1 from foo where col1 like '<logs %' | |
) | |
SELECT | |
logs_header.value:\"@uid\"::string log_uid | |
,log_curve.value:\"@uid\"::string log_LogCurve_uid | |
,XMLGET(log_curve.value,'mnemonic'):\"$\"::string log_logCurve_menmonic | |
,XMLGET(log_curve.value,'unit'):\"$\"::string log_logCurve_unit | |
,XMLGET(log_curve.value,'minDateTimeIndex'):\"$\"::datetime log_logCurve_minDateTimeIndex | |
,XMLGET(log_curve.value,'maxDateTimeIndex'):\"$\"::datetime log_logCurve_maxDateTimeIndex | |
,XMLGET(log_curve.value,'curveDescription'):\"$\"::string log_logCurve_curveDescription | |
,XMLGET(log_curve.value,'dataSource'):\"$\"::string log_logCurve_dataSource | |
,XMLGET(log_curve.value,'typeLogData'):\"$\"::string log_logCurve_typeLogData | |
FROM logrecs | |
,lateral flatten (col1) logs_header | |
,lateral flatten (logs_header.value:\"$\") log_curve | |
WHERE log_curve.value:\"@\"::string in ('logCurveInfo'))` } ); | |
//process the log_data records | |
snowflake.execute( {sqlText:` | |
insert into log_data ( | |
with logrecs as ( | |
select col1 from foo where col1 like '<logs %' | |
) | |
SELECT | |
logs_header.value:\"@uid\"::string log_uid | |
,wits_log_combiner(XMLGET(log_data.value,'mnemonicList'):\"$\"::string, data.value:\"$\"::string) as log_data | |
FROM logrecs | |
,lateral flatten (col1) logs_header | |
,lateral flatten (logs_header.value:\"$\") log_data | |
,lateral flatten (log_data.value:\"$\") data | |
WHERE data.value:\"@\"::string in ('data'))` } ); | |
snowflake.createStatement( { sqlText: `alter warehouse `+ wh +` set warehouse_size = 'xsmall'` } ).execute(); //dynamically resize the cluster down | |
return 'done!' | |
$$ | |
; | |
--create a task to run the stored proc | |
create or replace task witsml_task | |
warehouse = elt_wh | |
schedule = '5 minute' | |
when SYSTEM$STREAM_HAS_DATA('WITSML_STREAM') | |
as call my_test_db.witsml.witsml_process(); | |
--activate the task | |
alter task witsml_task resume; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment