Skip to content

Instantly share code, notes, and snippets.

@wchristian
Created November 19, 2010 15:05
Show Gist options
  • Save wchristian/706619 to your computer and use it in GitHub Desktop.
Save wchristian/706619 to your computer and use it in GitHub Desktop.
#!/usr/bin/perl
use strict;
use warnings;
BEGIN {
use Cwd;
chdir '..' if getcwd =~ m@/t$@;
use lib 'lib';
}
package arguments;
use Test::Most;
use TFX::ConexConvert::App;
use File::Slurp 'read_file';
use Parallel::Downloader;
use lib 't/lib';
use Test::BinRegression;
run();
done_testing;
exit;
sub run {
no warnings 'redefine';
local *Parallel::Downloader::async_download = sub {
print 'moo';
my $data = read_file( "t/data/integration_test.xml" );
return [$data];
};
local *TFX::ConexConvert::ScanCounterCache::update_with = sub { };
use warnings;
local @ARGV = qw( get --password=vgxysmB4vn --login=033333 --depart_code=FRA --arrive_code=PAR --dates=2010-12-10 );
my $result = TFX::ConexConvert::App->run;
ok_regression( sub { $result }, 't/data/integration_result.csv', 'integration test result matches', 'binmode' );
return;
}
use strict;
use warnings;
package TFX::ConexConvert;
# ABSTRACT: lädt Einweg-Flüge von Conex via XML herunter und konvertiert sie zu Sealtix
no warnings 'once';
use HTTP::Request::Common 'POST';
use XML::Simple;
use Text::CSV::Slurp;
use Parallel::Downloader 'async_download';
use List::Util qw( min reduce );
use lib '..';
use TFX::ConexConvert::ScanCounterCache;
our $conn_per_host = 10;
sub conex_to_sealtix {
my ( $self, @xml_requests ) = @_;
my @xml = $self->retrieve_xml( @xml_requests );
my @half_return_data = map $self->parse_xml( $_ ), @xml;
TFX::ConexConvert::ScanCounterCache->new->update_with( @half_return_data );
my $sealtix = $self->half_return_as_sealtix( @half_return_data );
return $sealtix;
}
sub half_return_as_sealtix {
my ( $self, @half_return_data ) = @_;
my @flights = map $self->conex_to_flight_list( $_ ), @half_return_data;
my $csv = $self->flight_list_to_sealtix( \@flights );
return $csv;
}
sub flight_list_to_sealtix {
my ( $self, $flights ) = @_;
return if !@{$flights};
my @fields = qw(
data_type table_name version operator flight_type depart_code arrive_code via_codes
flight_sequence duration_rules min_advance_days depart_date flight_carrier flight_number
flight_class flight_tariff flight_kind depart_time arrive_time seat_count special_code
currency season_surcharge price_two_way price_one_way combination_id direction rate1_type
rate1_min_age rate1_max_age rate1_price rate2_type rate2_min_age rate2_max_age rate2_price
rate3_type rate3_min_age rate3_max_age rate3_price leg1_flight_carrier leg1_flight_number
leg1_depart_code leg1_arrive_code leg1_depart_date leg1_depart_time leg1_arrive_time
leg2_flight_carrier leg2_flight_number leg2_depart_code leg2_arrive_code leg2_depart_date
leg2_depart_time leg2_arrive_time leg3_flight_carrier leg3_flight_number leg3_depart_code
leg3_arrive_code leg3_depart_date leg3_depart_time leg3_arrive_time
);
my $csv = Text::CSV::Slurp->create(
input => $flights,
field_order => \@fields,
sep_char => ';',
always_quote => 1,
quote_null => 1
);
return $csv;
}
sub conex_to_flight_list {
my ( $self, $half_return_data ) = @_;
return map $self->half_return_fare_list_to_flights( $_ ), @{ $half_return_data->{halfReturnAvailableFareList} };
}
sub half_return_fare_list_to_flights {
my ( $self, $half_return ) = @_;
return if !$half_return->{itineraryList};
return map $self->itinerary_to_flight( $_, $half_return ), @{ $half_return->{itineraryList} };
}
sub itinerary_to_flight {
my ( $self, $itinerary, $half_return ) = @_;
my $air_rule_set = $half_return->{airRuleSet};
my $price = $half_return->{farePassengerTypePriceList}[0]{purchasePrice}; # kann auch ticketingPrice sein
my %flight = (
currency => $price->{currencyIsoCode},
price_one_way => $price->{value},
combination_id => $air_rule_set->{openJawCombinationGroup}{name},
);
$flight{season_surcharge} = $self->calculate_tax_addition( $half_return->{taxQuoteList} );
$flight{duration_rules} = $self->generate_duration_rules( $air_rule_set );
$flight{flight_class} = $self->flight_class( $air_rule_set->{cabinClass}{id} );
my $segments = $itinerary->{segmentList};
$self->extract_generic_flight_data_from_seg( \%flight, $segments->[0] );
$self->process_multi_legs( \%flight, @{ $segments } );
$self->add_default_fields( \%flight );
return \%flight;
}
sub calculate_tax_addition {
my ( $self, $tax_quote_list ) = @_;
my @taxes = values %{$tax_quote_list->[0]{taxList}};
return reduce { $a + $b->{price}{value} } 0, @taxes;
}
sub generate_duration_rules {
my ( $self, $rule_set ) = @_;
my @rules = map $self->convert_duration( $rule_set, $_ ), qw( minimumStay maximumStay );
my $sunday_rule = $rule_set->{minimumStay}{sundayRule};
push @rules, 'SUN' if $sunday_rule and $sunday_rule eq 'true';
return join ',', @rules;
}
sub convert_duration {
my ( $self, $rule_set, $type ) = @_;
my $duration = $rule_set->{$type};
die "duration type missing: '$type'" if !$duration;
my $prefix = $self->get_duration_rule_prefix( $type );
$duration->{value} = min( $duration->{value}, 99 );
return $prefix . "99" if $duration->{value} > 3 and $duration->{unit}{id} eq 'STU_MONTHS';
return $prefix . "99" if $duration->{value} >= 1 and $duration->{unit}{id} eq 'STU_YEARS';
return sprintf( "%s%02d", $prefix, $duration->{value} ) if $duration->{unit}{id} eq 'STU_DAYS';
return sprintf( "%s%02d", $prefix, $duration->{value} * 30 ) if $duration->{unit}{id} eq 'STU_MONTHS';
die "unknown duration unit id '$duration->{unit}{id}' with value '$duration->{value}'";
}
sub get_duration_rule_prefix {
my ( $self, $type ) = @_;
return 'A' if $type eq 'minimumStay';
return 'B' if $type eq 'maximumStay';
die "unknown duration type";
}
sub add_default_fields {
my ( $self, $flight ) = @_;
my %defaults = (
data_type => 'F',
table_name => 'OWF',
version => '3.0',
);
$flight->{$_} = $defaults{$_} for keys %defaults;
return;
}
sub process_multi_legs {
my ( $self, $flight, @segments ) = @_;
return if @segments < 2;
$segments[$_]->{_leg_id} = $_ + 1 for 0 .. $#segments;
$self->add_extra_leg_data( $flight, $_ ) for @segments;
$self->post_process_multi_leg_flight( $flight );
return;
}
sub post_process_multi_leg_flight {
my ( $self, $flight ) = @_;
delete $flight->{$_} for qw( flight_carrier flight_number );
$flight->{via_codes} = join ',', @{ $flight->{via_codes} };
$flight->{$_} = $flight->{_last_leg}{$_} for qw( arrive_time arrive_code );
delete $flight->{_last_leg};
return;
}
sub add_extra_leg_data {
my ( $self, $flight, $seg ) = @_;
push @{ $flight->{via_codes} }, $seg->{departure}{code} if $seg->{_leg_id} > 1;
my %fake_flight;
$self->extract_generic_flight_data_from_seg( \%fake_flight, $seg );
$flight->{"leg$seg->{_leg_id}_$_"} = $fake_flight{$_} for keys %fake_flight;
$flight->{_last_leg} = \%fake_flight;
return;
}
sub extract_generic_flight_data_from_seg {
my ( $self, $flight, $seg ) = @_;
$flight->{depart_code} = $seg->{departure}{code};
$flight->{flight_carrier} = $seg->{operatingAirline}{code};
$flight->{flight_number} = $seg->{flightNumber};
( $flight->{depart_date} = $seg->{departureDate} ) =~ s:(\d+)-(\d+)-(\d+):$3.$2.$1:;
( $flight->{depart_time} = $seg->{departureTimestamp} ) =~ s/^.*T(\d+):(\d+):.*$/$1$2/;
( $flight->{arrive_time} = $seg->{arrivalTimestamp} ) =~ s/^.*T(\d+):(\d+):.*$/$1$2/;
$flight->{arrive_code} = $seg->{destination}{code};
return;
}
sub flight_class {
my ( $self, $class_id ) = @_;
my %classes = (
CAB_ECONOMY => 'Y',
CAB_BUSINESS => 'C',
);
return $classes{$class_id} || die "Unknown class id: $class_id";
}
sub parse_xml {
my ( $self, $xml ) = @_;
$XML::Simple::PREFERRED_PARSER = 'XML::Parser';
my @basic_list_items = qw(
halfReturnAvailableFare itinerary documentType tax taxQuote availableBookingClass farePassengerTypePrice
closedUserGroup
);
my %options = (
ForceArray => [ qw( airSegment ), @basic_list_items ],
KeyAttr => {
tax => "code",
documentType => 'id',
closedUserGroup => 'code',
},
GroupTags => { segmentList => 'airSegment', },
SuppressEmpty => undef,
);
$options{GroupTags}{ $_ . "List" } = $_ for @basic_list_items;
my $half_return_data = XMLin( $xml, %options );
return $half_return_data;
}
sub retrieve_xml {
my ( $self, @xml_requests ) = @_;
my @http_reqs = map $self->make_http_request( $_ ), @xml_requests;
my @responses = async_download( requests => \@http_reqs, conns_per_host => $conn_per_host );
my @xml = map { $_->[0] } @responses;
use File::Slurp;
write_file( "dump/response_" . time . rand( 999 ) . ".xml", $_ ) for @xml;
return @xml;
}
sub make_http_request {
my ( $self, $xml_request ) = @_;
my $url = "https://traffics20.flightconex.de/tourconex/servlet/XmlServlet";
my $req = POST $url, [ request => $xml_request->as_string ];
return $req;
}
1;
use strict;
use warnings;
package Parallel::Downloader;
=head1 SYNOPSIS
use HTTP::Request::Common qw( GET POST );
use Parallel::Downloader 'async_download';
# simple example
my @requests = map GET( "http://google.com" ), ( 1..15 );
my @responses = async_download( requests => \@requests );
# complex example
my @complex_reqs = ( ( map POST( "http://google.com", [ type_id => $_ ] ), ( 1..60 ) ),
( map POST( "http://yahoo.com", [ type_id => $_ ] ), ( 1..60 ) ) );
my $downloader = Parallel::Downloader->new(
requests => \@complex_reqs,
workers => 50,
conns_per_host => 12,
aehttp_args => {
timeout => 30,
on_prepare => sub {
print "download started ($AnyEvent::HTTP::ACTIVE / $AnyEvent::HTTP::MAX_PER_HOST)\n"
}
},
debug => 1,
logger => sub {
my ( $downloader, $message ) = @_;
print "downloader sez [$message->{type}]: $message->{msg}\n";
},
);
my @complex_responses = $downloader->run;
=cut
use Moose;
use MooseX::HasDefaults::RO;
has requests => ( isa => 'ArrayRef', required => 1, initializer => '_interleave_by_host' );
has workers => ( isa => 'Int', default => 10 );
has conns_per_host => ( isa => 'Int', default => 4, initializer => '_init_workers_per_host' );
has aehttp_args => ( isa => 'HashRef', default => sub {{}} );
has debug => ( isa => 'Bool', default => 0 );
has logger => ( isa => 'CodeRef', default => sub { \&_default_log } );
has _responses => ( isa => 'ArrayRef', default => sub {[]} );
has _cv => ( isa => 'AnyEvent::CondVar', default => sub { AnyEvent->condvar } );
use AnyEvent::HTTP;
use Sub::Exporter -setup => { exports => [ 'async_download' ] };
sub async_download {
return __PACKAGE__->new( @_ )->run;
}
sub _init_workers_per_host {
my ( undef, $limit ) = @_;
$AnyEvent::HTTP::MAX_PER_HOST = $limit;
return;
}
sub _interleave_by_host {
my ( $self, $requests, $setter ) = @_;
my %hosts;
for ( @{$requests} ) {
my $host_name = $_->uri->host;
my $host = $hosts{$host_name} ||= [];
push @{$host}, $_;
}
my @interleaved_list;
while ( keys %hosts ) {
push @interleaved_list, shift @{$_} for values %hosts;
for ( keys %hosts ) {
next if @{$hosts{$_}};
delete $hosts{$_};
}
}
$setter->( \@interleaved_list );
return;
}
sub run {
my ( $self ) = @_;
for ( 1 .. $self->sanitize_worker_max ) {
$self->_cv->begin;
$self->_log( msg => "$_ started", type => "WorkerStart", worker_id => $_ );
$self->add_request( $_ );
}
$self->_cv->recv;
return @{$self->_responses};
}
sub add_request {
my ( $self, $worker_id ) = @_;
my $req = shift @{$self->requests};
return $self->end_worker( $worker_id ) if !$req;
my $post_download_sub = $self->make_post_download_sub( $worker_id, $req );
http_request(
$req->method,
$req->uri->as_string,
body => $req->content,
headers => $req->{_headers},
%{$self->aehttp_args},
$post_download_sub
);
my $host_name = $req->uri->host;
$self->_log( msg => "$worker_id accepted new request for $host_name", type => "WorkerRequestAdd", worker_id => $worker_id, req => $req );
return;
}
sub make_post_download_sub {
my ( $self, $worker_id, $req ) = @_;
my $post_download_sub = sub {
push @{$self->_responses}, [ @_, $req ];
my $host_name = $req->uri->host;
$self->_log( msg => "$worker_id completed a request for $host_name", type => "WorkerRequestEnd", worker_id => $worker_id, req => $req );
$self->add_request( $worker_id );
return;
};
return $post_download_sub;
}
sub end_worker {
my ( $self, $worker_id ) = @_;
$self->_log( msg => "$worker_id ended", type => "WorkerEnd", worker_id => $worker_id );
$self->_cv->end;
return;
}
sub sanitize_worker_max {
my ( $self ) = @_;
die "max should be 0 or more" if $self->workers < 0;
my $request_count = @{$self->requests};
return $request_count if !$self->workers; # 0 = as many parallel as possible
return $request_count if $self->workers > $request_count; # not more than the request count
return $self->workers;
}
sub _log {
my ( $self, %msg ) = @_;
return if !$self->debug;
$self->logger->( $self, \%msg );
return;
}
sub _default_log {
my ( $self, $msg ) = @_;
print "$msg->{msg}\n";
return;
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment