Created
September 27, 2022 18:55
-
-
Save ology/09528dec47985541c9f83e7f055846d4 to your computer and use it in GitHub Desktop.
Code taken from the athena.pl Paws distro example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Compliance::Athena; | |
# Abstract: AWS Athena database connection | |
use strict; | |
use warnings; | |
use Compliance::Config; | |
use Carp qw/croak/; | |
use Data::GUID qw/guid_string/; | |
use DateTime (); | |
use DateTime::Format::RFC3339 (); | |
use Log::Any (); | |
use Paws (); | |
use Paws::Credential::Explicit (); | |
use Types::Standard qw/Any Dict Str/; | |
use Moo; | |
has log => ( | |
is => 'ro', | |
default => sub { Log::Any->get_logger }, | |
isa => Any, | |
); | |
has rfc3339 => ( | |
is => 'ro', | |
default => sub { DateTime::Format::RFC3339->new }, | |
isa => Any, | |
); | |
has config => ( | |
is => 'ro', | |
isa => Dict [ | |
access_key => Str, | |
secret_key => Str, | |
region => Str, | |
schema => Str, | |
s3_uri => Str, | |
], | |
default => sub { Compliance::Config->instance()->content->{database}->{athena} }, | |
); | |
has athena => ( | |
is => 'lazy', | |
isa => Any, | |
); | |
sub _build_athena { | |
my ($self) = @_; | |
$self->log->debug('Building Athena...'); | |
my $key = $self->config->{access_key}; | |
my $auth = Paws::Credential::Explicit->new( | |
access_key => $key, | |
secret_key => $self->config->{secret_key}, | |
); | |
my $athena = Paws->service( | |
'Athena', | |
region => $self->config->{region}, | |
credentials => $auth, | |
max_attempts => 10, | |
); | |
unless ( $athena ) { | |
my $msg = sprintf 'Failed to connect to Athena with %s', $key; | |
$self->log->error($msg); | |
croak $msg; | |
} | |
$self->log->debug('Athena built'); | |
return $athena; | |
} | |
sub get_resultset { | |
my ( $self, $select_sql ) = @_; | |
$self->log->debugf( 'QUERY: %s', $select_sql ); | |
my $query = $self->athena->StartQueryExecution( | |
QueryString => $select_sql, | |
ResultConfiguration => { OutputLocation => $self->config->{s3_uri} }, | |
QueryExecutionContext => { Database => $self->config->{schema} }, | |
ClientRequestToken => guid_string(), | |
); | |
warn __PACKAGE__,' L',__LINE__,' ',,"HELLO?\n"; | |
my $status; | |
do { | |
$status = $self->athena->GetQueryExecution( | |
QueryExecutionId => $query->QueryExecutionId | |
); | |
sleep 1; | |
} until _is_query_complete($status); | |
my $state = $status->QueryExecution->Status; | |
my $start = DateTime->from_epoch( epoch => $state->SubmissionDateTime ); | |
my $end = DateTime->from_epoch( epoch => $state->CompletionDateTime ); | |
$self->log->infof( 'QUERY START TIME: %s', $start ); | |
$self->log->infof( 'QUERY END TIME: %s', $end ); | |
if ( $state->State eq 'FAILED' ) { | |
$self->log->errorf( 'QUERY FAILED: query_execution_id=%s, query=%s, reason=%s', | |
$query->QueryExecutionId, $select_sql, $state->StateChangeReason | |
); | |
} elsif ( $state->State eq 'CANCELLED' ) { | |
$self->log->debug( 'QUERY CANCELLED: query_execution_id=%s, query=%s', | |
$query->QueryExecutionId, $select_sql | |
); | |
} | |
$self->log->infof( 'Athena results stored at %s', | |
$status->QueryExecution->ResultConfiguration->OutputLocation | |
); | |
my $query_results = $self->athena->GetQueryResults( | |
QueryExecutionId => $query->QueryExecutionId | |
); | |
my $result_set = $query_results->ResultSet; | |
my @columns = @{ $result_set->ResultSetMetadata->ColumnInfo }; | |
my @rows = @{ $result_set->Rows }; | |
my $results = $self->_build_results(\@columns, \@rows); | |
return $results; | |
} | |
# Return an arrayref of hashrefs of column => value pairs | |
sub _build_results { | |
my ( $self, $columns, $rows ) = @_; | |
my @results = (); | |
my $row_counter = 0; | |
shift @$rows; # remove header row | |
for my $row ( @$rows ) { | |
my $column_counter = 0; | |
my $result = {}; | |
for my $row_data ( @{ $row->Data } ) { | |
my $column_name = $columns->[ $column_counter++ ]->Name; | |
$result->{$column_name} = $row_data->VarCharValue; | |
} | |
$self->_fixup_result_row($result); | |
push @results, $result; | |
$row_counter++; | |
} | |
return \@results; | |
} | |
sub _fixup_result_row { | |
my ( $self, $result ) = @_; | |
$result->{start_date} = $self->_to_rfc3339($result->{start_date}); | |
$result->{end_date} = $self->_to_rfc3339($result->{end_date}); | |
# dates with year 9002 are CB's default NULL date | |
$result->{cancelled_date} = $result->{cancelled_date} =~ m/^9002-/ | |
? undef : $self->_to_rfc3339($result->{cancelled_date}); | |
$result->{currency_code} = uc $result->{currency_code}; | |
return; | |
} | |
sub _to_rfc3339 { | |
my ( $self, $timestamp ) = @_; | |
return undef unless $timestamp; | |
my $dt = $self->rfc3339->parse_datetime($timestamp); | |
return $self->rfc3339->format_datetime($dt); | |
} | |
sub _is_query_complete { | |
my ($status) = @_; | |
my $state = $status->QueryExecution->Status->State =~ m/^(?:succeeded|failed|cancelled)$/i; | |
return $state; | |
} | |
sub build_jobs_sql { | |
my ( $self, $where, $group_by ) = @_; | |
unless ( $where || $group_by ) { | |
my $msg = 'Incomplete SQL'; | |
$self->log->error($msg); | |
croak $msg; | |
} | |
return <<"SQL"; | |
SELECT j.nice_name AS company_nice_name, | |
a.company AS company, | |
k.name AS parent_name, | |
a.jobref AS jobref, | |
a.advert_id AS advert_id, | |
a.jobtitle AS jobtitle, | |
a.brand_id AS brand_id, | |
a.brand_name AS brand_nice_name, | |
e.raw_location AS raw_location, | |
d.level4_name AS location_state, | |
d.level6_name AS location_city, | |
a.applyonline_url AS apply_link, | |
d.level6_latitude AS latitude, | |
d.level6_longitude AS longitude, | |
coalesce(b.post_datekey, f.failure_datekey) AS status_date_fk | |
FROM adc_dm.d_advert a | |
INNER JOIN adc_dm.d_company j | |
ON j.name = a.company | |
AND j.yyyymmdd_key = CAST(DATE_FORMAT(DATE_ADD('day', -1, a.sys_insert_date), '%Y%m%d') AS INT) | |
LEFT JOIN adc_dm.f_posting b | |
ON b.dw_advert_key = a.dw_advert_key | |
AND b.board_id NOT IN ('9901','12280','9679','12261','11234','12453','12524','12523') | |
AND (b.post_datetime BETWEEN DATE_ADD('month', -1, NOW()) AND NOW()) | |
AND b.p_yearmonth >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYYMM') | |
LEFT JOIN adc_dm.d_location_new d | |
ON ( | |
a.dw_advert_location_key = d.dw_location_key | |
AND d.level3_name = 'USA' | |
AND d.level6_type IS NOT NULL | |
) | |
LEFT JOIN adc_extract.compliance_raw_location e | |
ON a.company = e.company | |
AND a.advert_id = e.advert_id | |
AND e.p_year >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYY') | |
AND e.p_date >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYYMMDD') | |
LEFT JOIN adc_dm.f_failure f | |
ON a.dw_advert_key = f.dw_advert_key | |
AND f.board_id NOT IN ('9901','12280','9679','12261','11234','12453','12524','12523') | |
AND f.p_yearmonth >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYYMM') | |
LEFT JOIN adc_extract.parent_groups k | |
ON k.parent_group_id = j.parent_group_id | |
WHERE $where | |
AND a.p_yearmonth >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYYMM') | |
AND a.jobref = '28671/37427/201639' | |
GROUP BY $group_by | |
SQL | |
} | |
1; | |
__END__ | |
=pod | |
=encoding UTF-8 | |
=head1 NAME | |
Compliance::Athena - AWS Athena database connection | |
=head1 SYNOPSIS | |
use Compliance::Athena; | |
TODO | |
=head1 DESCRIPTION | |
Connect to AWS Athena, issue SQL to it, and get results from it. | |
=head1 ATTRIBUTES | |
=head2 log | |
The L<Log::Any> object. | |
=head2 rfc3339 | |
The L<DateTime::Format::RFC3339> object. | |
=head2 config | |
The configuration hash of AWS Athena authorization bits. | |
=head2 athena | |
The L<Paws> C<service> object. | |
=head1 METHODS | |
=head2 new | |
Create a new C<Compliance::Athena> object. | |
=head2 get_resultset | |
Return the results of an SQL select statement. | |
=head2 build_jobs_sql | |
Construct and return an SQL select statement given B<where> and | |
B<group_by> clauses. | |
=cut |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment