Skip to content

Instantly share code, notes, and snippets.

@ology
Created September 27, 2022 18:55
Show Gist options
  • Save ology/09528dec47985541c9f83e7f055846d4 to your computer and use it in GitHub Desktop.
Save ology/09528dec47985541c9f83e7f055846d4 to your computer and use it in GitHub Desktop.
Code taken from the athena.pl Paws distro example
package Compliance::Athena;
# Abstract: AWS Athena database connection
use strict;
use warnings;
use Compliance::Config;
use Carp qw/croak/;
use Data::GUID qw/guid_string/;
use DateTime ();
use DateTime::Format::RFC3339 ();
use Log::Any ();
use Paws ();
use Paws::Credential::Explicit ();
use Types::Standard qw/Any Dict Str/;
use Moo;
has log => (
is => 'ro',
default => sub { Log::Any->get_logger },
isa => Any,
);
has rfc3339 => (
is => 'ro',
default => sub { DateTime::Format::RFC3339->new },
isa => Any,
);
has config => (
is => 'ro',
isa => Dict [
access_key => Str,
secret_key => Str,
region => Str,
schema => Str,
s3_uri => Str,
],
default => sub { Compliance::Config->instance()->content->{database}->{athena} },
);
has athena => (
is => 'lazy',
isa => Any,
);
sub _build_athena {
my ($self) = @_;
$self->log->debug('Building Athena...');
my $key = $self->config->{access_key};
my $auth = Paws::Credential::Explicit->new(
access_key => $key,
secret_key => $self->config->{secret_key},
);
my $athena = Paws->service(
'Athena',
region => $self->config->{region},
credentials => $auth,
max_attempts => 10,
);
unless ( $athena ) {
my $msg = sprintf 'Failed to connect to Athena with %s', $key;
$self->log->error($msg);
croak $msg;
}
$self->log->debug('Athena built');
return $athena;
}
sub get_resultset {
my ( $self, $select_sql ) = @_;
$self->log->debugf( 'QUERY: %s', $select_sql );
my $query = $self->athena->StartQueryExecution(
QueryString => $select_sql,
ResultConfiguration => { OutputLocation => $self->config->{s3_uri} },
QueryExecutionContext => { Database => $self->config->{schema} },
ClientRequestToken => guid_string(),
);
warn __PACKAGE__,' L',__LINE__,' ',,"HELLO?\n";
my $status;
do {
$status = $self->athena->GetQueryExecution(
QueryExecutionId => $query->QueryExecutionId
);
sleep 1;
} until _is_query_complete($status);
my $state = $status->QueryExecution->Status;
my $start = DateTime->from_epoch( epoch => $state->SubmissionDateTime );
my $end = DateTime->from_epoch( epoch => $state->CompletionDateTime );
$self->log->infof( 'QUERY START TIME: %s', $start );
$self->log->infof( 'QUERY END TIME: %s', $end );
if ( $state->State eq 'FAILED' ) {
$self->log->errorf( 'QUERY FAILED: query_execution_id=%s, query=%s, reason=%s',
$query->QueryExecutionId, $select_sql, $state->StateChangeReason
);
} elsif ( $state->State eq 'CANCELLED' ) {
$self->log->debug( 'QUERY CANCELLED: query_execution_id=%s, query=%s',
$query->QueryExecutionId, $select_sql
);
}
$self->log->infof( 'Athena results stored at %s',
$status->QueryExecution->ResultConfiguration->OutputLocation
);
my $query_results = $self->athena->GetQueryResults(
QueryExecutionId => $query->QueryExecutionId
);
my $result_set = $query_results->ResultSet;
my @columns = @{ $result_set->ResultSetMetadata->ColumnInfo };
my @rows = @{ $result_set->Rows };
my $results = $self->_build_results(\@columns, \@rows);
return $results;
}
# Return an arrayref of hashrefs of column => value pairs
sub _build_results {
my ( $self, $columns, $rows ) = @_;
my @results = ();
my $row_counter = 0;
shift @$rows; # remove header row
for my $row ( @$rows ) {
my $column_counter = 0;
my $result = {};
for my $row_data ( @{ $row->Data } ) {
my $column_name = $columns->[ $column_counter++ ]->Name;
$result->{$column_name} = $row_data->VarCharValue;
}
$self->_fixup_result_row($result);
push @results, $result;
$row_counter++;
}
return \@results;
}
sub _fixup_result_row {
my ( $self, $result ) = @_;
$result->{start_date} = $self->_to_rfc3339($result->{start_date});
$result->{end_date} = $self->_to_rfc3339($result->{end_date});
# dates with year 9002 are CB's default NULL date
$result->{cancelled_date} = $result->{cancelled_date} =~ m/^9002-/
? undef : $self->_to_rfc3339($result->{cancelled_date});
$result->{currency_code} = uc $result->{currency_code};
return;
}
sub _to_rfc3339 {
my ( $self, $timestamp ) = @_;
return undef unless $timestamp;
my $dt = $self->rfc3339->parse_datetime($timestamp);
return $self->rfc3339->format_datetime($dt);
}
sub _is_query_complete {
my ($status) = @_;
my $state = $status->QueryExecution->Status->State =~ m/^(?:succeeded|failed|cancelled)$/i;
return $state;
}
sub build_jobs_sql {
my ( $self, $where, $group_by ) = @_;
unless ( $where || $group_by ) {
my $msg = 'Incomplete SQL';
$self->log->error($msg);
croak $msg;
}
return <<"SQL";
SELECT j.nice_name AS company_nice_name,
a.company AS company,
k.name AS parent_name,
a.jobref AS jobref,
a.advert_id AS advert_id,
a.jobtitle AS jobtitle,
a.brand_id AS brand_id,
a.brand_name AS brand_nice_name,
e.raw_location AS raw_location,
d.level4_name AS location_state,
d.level6_name AS location_city,
a.applyonline_url AS apply_link,
d.level6_latitude AS latitude,
d.level6_longitude AS longitude,
coalesce(b.post_datekey, f.failure_datekey) AS status_date_fk
FROM adc_dm.d_advert a
INNER JOIN adc_dm.d_company j
ON j.name = a.company
AND j.yyyymmdd_key = CAST(DATE_FORMAT(DATE_ADD('day', -1, a.sys_insert_date), '%Y%m%d') AS INT)
LEFT JOIN adc_dm.f_posting b
ON b.dw_advert_key = a.dw_advert_key
AND b.board_id NOT IN ('9901','12280','9679','12261','11234','12453','12524','12523')
AND (b.post_datetime BETWEEN DATE_ADD('month', -1, NOW()) AND NOW())
AND b.p_yearmonth >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYYMM')
LEFT JOIN adc_dm.d_location_new d
ON (
a.dw_advert_location_key = d.dw_location_key
AND d.level3_name = 'USA'
AND d.level6_type IS NOT NULL
)
LEFT JOIN adc_extract.compliance_raw_location e
ON a.company = e.company
AND a.advert_id = e.advert_id
AND e.p_year >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYY')
AND e.p_date >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYYMMDD')
LEFT JOIN adc_dm.f_failure f
ON a.dw_advert_key = f.dw_advert_key
AND f.board_id NOT IN ('9901','12280','9679','12261','11234','12453','12524','12523')
AND f.p_yearmonth >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYYMM')
LEFT JOIN adc_extract.parent_groups k
ON k.parent_group_id = j.parent_group_id
WHERE $where
AND a.p_yearmonth >= FORMAT_DATETIME(DATE_ADD('month', -1, NOW()), 'YYYYMM')
AND a.jobref = '28671/37427/201639'
GROUP BY $group_by
SQL
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Compliance::Athena - AWS Athena database connection
=head1 SYNOPSIS
use Compliance::Athena;
TODO
=head1 DESCRIPTION
Connect to AWS Athena, issue SQL to it, and get results from it.
=head1 ATTRIBUTES
=head2 log
The L<Log::Any> object.
=head2 rfc3339
The L<DateTime::Format::RFC3339> object.
=head2 config
The configuration hash of AWS Athena authorization bits.
=head2 athena
The L<Paws> C<service> object.
=head1 METHODS
=head2 new
Create a new C<Compliance::Athena> object.
=head2 get_resultset
Return the results of an SQL select statement.
=head2 build_jobs_sql
Construct and return an SQL select statement given B<where> and
B<group_by> clauses.
=cut
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment