Skip to content

Instantly share code, notes, and snippets.

@rtempleton
Last active December 6, 2021 00:35
Show Gist options
  • Save rtempleton/d0df58383831cf22f6b9c7b2c46374f0 to your computer and use it in GitHub Desktop.
Save rtempleton/d0df58383831cf22f6b9c7b2c46374f0 to your computer and use it in GitHub Desktop.
Source code for WITSML processing in Snowflake referenced in
--set up your environment
create database my_test_db;
create schema my_test_db.witsml;
use schema my_test_db.witsml;
--create the staging table where all WITSML files are loaded to by Snowpipe
create table witsml_temp (col1 variant);
--define the stage object where the witsml file will be uploaded to
CREATE OR REPLACE STAGE my_test_db.witsml.witsml_stage
URL='<PutYourS3BucketPathHere>'
CREDENTIALS=(aws_key_id='xxxx'
aws_secret_key='xxxx');
--define the pipe object used in conjunction with Snowpipe to load the file
create or replace pipe witsml_pipe as copy into witsml_temp from @my_test_db.witsml.witsml_stage FILE_FORMAT = (type = XML);
--create a stream on the staging table
create or replace stream witsml_stream on table witsml_temp;
--create the WELLS table
create or replace TABLE WELLS (
WELL_UID STRING,
WELL_NAME STRING,
TIME_ZONE STRING,
CREATION_TIME TIMESTAMP_NTZ(9),
LAST_CHANGE_TIME TIMESTAMP_NTZ(9),
constraint WELL_PK primary key (WELL_UID)
);
--create the WELLBORES table
create or replace TABLE WELLBORES (
WELLBORE_UID STRING,
WELLBORE_NAME STRING,
WELL_UID STRING,
IS_ACTIVE BOOLEAN,
CREATION_TIME TIMESTAMP_NTZ(9),
LAST_CHANGE_TIME TIMESTAMP_NTZ(9),
constraint WELLBORE_PK primary key (WELLBORE_UID),
constraint WELL_FK foreign key (WELL_UID) references MY_TEST_DB.WITSML.WELLS(WELL_UID)
);
--create the LOG_HEADER table
create or replace TABLE LOG_HEADER (
LOG_UID STRING,
WELLBORE_UID STRING,
LOG_CREATIONDATE TIMESTAMP_NTZ(9),
LOG_DIRECTION STRING,
LOG_ENDDATETIMEINDEX TIMESTAMP_NTZ(9),
LOG_INDEXCURVE STRING,
LOG_INDEXTYPE STRING,
LOG_NAME STRING,
LOG_NAMEWELL STRING,
LOG_NAMEWELLBORE STRING,
LOG_OBJECTGROWING STRING,
LOG_RUNNUMBER STRING,
LOG_SERVICECOMPANY STRING,
LOG_STARTDATETIMEINDEX TIMESTAMP_NTZ(9),
constraint LOG_HEADER_PK primary key (LOG_UID),
constraint WELLBORES_FK foreign key (WELLBORE_UID) references MY_TEST_DB.WITSML.WELLBORES(WELLBORE_UID)
);
--create LOG_CURVE table
create or replace TABLE LOG_CURVE (
LOG_UID STRING,
LOG_LOGCURVE_UID STRING,
LOG_LOGCURVE_MNEMONIC STRING,
LOG_LOGCURVE_UNIT STRING,
LOG_LOGCURVE_MINDATETIMEINDEX TIMESTAMP_NTZ(9),
LOG_LOGCURVE_MAXDATETIMEINDEX TIMESTAMP_NTZ(9),
LOG_LOGCURVE_CURVEDESCRIPTION STRING,
LOG_LOGCURVE_DATASOURCE STRING,
LOG_LOGCURVE_TYPELOGDATA STRING,
constraint LOG_CURVE_PK primary key (LOG_UID, LOG_LOGCURVE_UID),
constraint LOG_HEADER_FK foreign key (LOG_UID) references MY_TEST_DB.WITSML.LOG_HEADER(LOG_UID)
);
--create LOG_DATA table
create or replace TABLE LOG_DATA (
LOG_UID STRING,
LOG_DATA OBJECT,
constraint LOG_DATA_FK foreign key (LOG_UID) references MY_TEST_DB.WITSML.LOG_HEADER(LOG_UID)
);
--create the UDF we use to reconstruct the LOG_DATA details with its mnemonic
create or replace function wits_log_combiner(A string, B string)
returns object
language javascript
as
$$
var keys = A.split(',');
var values = B.split(',');
return keys.reduce((obj, key, index) => ({ ...obj, [key]: values[index] }), {});
$$
;
--create the stored proc to generate the views over the LOG_DATA table
create or replace procedure wits_view_generator(log_uid string, autogen boolean)
returns string
language javascript
comment = 'Generates a view over the LOG_DATA table for a given log_uid'
as
$$
var name = LOG_UID.replace(/-/g,'_');
var view = "create or replace view "+name+"_vw as select \n"
var res = snowflake.execute( {sqlText: "select distinct LOG_LOGCURVE_MNEMONIC, LOG_LOGCURVE_TYPELOGDATA from log_curve where log_uid = '"+LOG_UID+"' order by LOG_LOGCURVE_MNEMONIC"} );
while(res.next()){
var nm = res.getColumnValue(1).replace(/[-\(\)]/g,'_');;
var dt = res.getColumnValue(2);
switch(dt) {
case 'double':
var view = view.concat(" to_double(iff(log_data:"+nm+"::string='', null, log_data:"+nm+"::string)) "+nm+", \n");
break;
case 'date time':
var view = view.concat(" log_data:"+nm+"::timestamp "+nm+", \n");
break;
default:
var view = view.concat(" log_data:"+nm+"::string "+nm+", \n");
}
}
var view = view.substring(0, view.length-3);
var view = view.concat("\n from log_data where log_uid = \'"+LOG_UID+"\'");
if(AUTOGEN)
snowflake.execute( {sqlText: view} );
return view;
$$
;
--create the stored proc to automate the processing of the staging data to the each target table by file type
create or replace procedure witsml_process()
returns string
language javascript
EXECUTE AS CALLER
as
$$
//we have to make multiple passes over the stream data so we first need to copy this off to a temp table
//use the action='INSERT' so we occasionally truncate the stage table without reprocessing the data there
snowflake.execute( {sqlText: `create or replace temp table foo as select col1 from witsml_stream where metadata$action='INSERT'`} );
var res = snowflake.execute( {sqlText: `select current_warehouse()`} );
while(res.next()){
var wh = res.getColumnValue(1);
snowflake.createStatement( { sqlText: `alter warehouse ` + wh + ` set warehouse_size = 'large'` } ).execute(); //dynamically resize the cluster up
}
//process the well records
snowflake.execute( {sqlText:`
insert into wells(
with wellrecs as (
select col1 from foo where col1 like '<wells %'
)
SELECT
wells.value:\"@uid\"::string well_uid
,XMLGET(wells.value, 'name'):\"$\"::string well_name
,XMLGET(wells.value, 'timeZone'):\"$\"::string time_zone
,XMLGET(well.value, 'dTimCreation'):\"$\"::datetime creation_time
,XMLGET(well.value, 'dTimLastChange'):\"$\"::datetime last_change_time
FROM wellrecs
,lateral flatten (col1) wells
,lateral flatten(wells.value:\"$\") well
WHERE wells.key = '$'
AND well.value:\"@\" = 'commonData')` } );
//process the wellbore records
snowflake.execute( {sqlText:`
insert into wellbores(
with wellborerecs as (
select col1 from foo where col1 like '<wellbores %'
)
SELECT
wellbores.value:\"@uid\"::string wellbore_uid
,XMLGET(wellbores.value, 'name'):\"$\"::string wellbore_name
,wellbores.value:\"@uidWell\"::string well_uid
,XMLGET(wellbores.value, 'isActive'):\"$\"::boolean is_active
,XMLGET(wellbore.value, 'dTimCreation'):\"$\"::datetime creation_time
,XMLGET(wellbore.value, 'dTimLastChange'):\"$\"::datetime last_change_time
FROM wellborerecs
,lateral flatten (col1) wellbores
,lateral flatten(wellbores.value:\"$\") wellbore
WHERE wellbores.key = '$'
AND wellbore.value:\"@\" = 'commonData')` } );
//process the log_header records
snowflake.execute( {sqlText:`
insert into log_header(
with logrecs as (
select col1 from foo where col1 like '<logs %'
)
SELECT
logs_header.value:\"@uid\"::string log_uid
,logs_header.value:\"@uidWellbore\"::string wellbore_uid
,XMLGET(logs_header.value,'creationDate'):\"$\"::datetime log_creationDate
,XMLGET(logs_header.value,'direction'):\"$\"::string log_direction
,XMLGET(logs_header.value,'endDateTimeIndex'):\"$\"::datetime log_endDateTimeIndex
,XMLGET(logs_header.value,'indexCurve'):\"$\"::string log_indexCurve
,XMLGET(logs_header.value,'indexType'):\"$\"::string log_indexType
,XMLGET(logs_header.value,'name'):\"$\"::string log_name
,XMLGET(logs_header.value,'nameWell'):\"$\"::string log_nameWell
,XMLGET(logs_header.value,'nameWellbore'):\"$\"::string log_nameWellbore
,XMLGET(logs_header.value,'objectGrowing'):\"$\"::string log_objectGrowing
,XMLGET(logs_header.value,'runNumber'):\"$\"::string log_runNumber
,XMLGET(logs_header.value,'serviceCompany'):\"$\"::string log_serviceCompany
,XMLGET(logs_header.value,'startDateTimeIndex'):\"$\"::datetime log_startDateTimeIndex
FROM logrecs
,lateral flatten (col1) logs_header
WHERE logs_header.value:\"@\"::string in ('log'))` } );
//process the log_curve records
snowflake.execute( {sqlText:`
insert into log_curve (
with logrecs as (
select col1 from foo where col1 like '<logs %'
)
SELECT
logs_header.value:\"@uid\"::string log_uid
,log_curve.value:\"@uid\"::string log_LogCurve_uid
,XMLGET(log_curve.value,'mnemonic'):\"$\"::string log_logCurve_menmonic
,XMLGET(log_curve.value,'unit'):\"$\"::string log_logCurve_unit
,XMLGET(log_curve.value,'minDateTimeIndex'):\"$\"::datetime log_logCurve_minDateTimeIndex
,XMLGET(log_curve.value,'maxDateTimeIndex'):\"$\"::datetime log_logCurve_maxDateTimeIndex
,XMLGET(log_curve.value,'curveDescription'):\"$\"::string log_logCurve_curveDescription
,XMLGET(log_curve.value,'dataSource'):\"$\"::string log_logCurve_dataSource
,XMLGET(log_curve.value,'typeLogData'):\"$\"::string log_logCurve_typeLogData
FROM logrecs
,lateral flatten (col1) logs_header
,lateral flatten (logs_header.value:\"$\") log_curve
WHERE log_curve.value:\"@\"::string in ('logCurveInfo'))` } );
//process the log_data records
snowflake.execute( {sqlText:`
insert into log_data (
with logrecs as (
select col1 from foo where col1 like '<logs %'
)
SELECT
logs_header.value:\"@uid\"::string log_uid
,wits_log_combiner(XMLGET(log_data.value,'mnemonicList'):\"$\"::string, data.value:\"$\"::string) as log_data
FROM logrecs
,lateral flatten (col1) logs_header
,lateral flatten (logs_header.value:\"$\") log_data
,lateral flatten (log_data.value:\"$\") data
WHERE data.value:\"@\"::string in ('data'))` } );
snowflake.createStatement( { sqlText: `alter warehouse `+ wh +` set warehouse_size = 'xsmall'` } ).execute(); //dynamically resize the cluster down
return 'done!'
$$
;
--create a task to run the stored proc
create or replace task witsml_task
warehouse = elt_wh
schedule = '5 minute'
when SYSTEM$STREAM_HAS_DATA('WITSML_STREAM')
as call my_test_db.witsml.witsml_process();
--activate the task
alter task witsml_task resume;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment