Created
November 19, 2010 15:05
-
-
Save wchristian/706619 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/perl | |
use strict; | |
use warnings; | |
BEGIN { | |
use Cwd; | |
chdir '..' if getcwd =~ m@/t$@; | |
use lib 'lib'; | |
} | |
package arguments; | |
use Test::Most; | |
use TFX::ConexConvert::App; | |
use File::Slurp 'read_file'; | |
use Parallel::Downloader; | |
use lib 't/lib'; | |
use Test::BinRegression; | |
run(); | |
done_testing; | |
exit; | |
sub run { | |
no warnings 'redefine'; | |
local *Parallel::Downloader::async_download = sub { | |
print 'moo'; | |
my $data = read_file( "t/data/integration_test.xml" ); | |
return [$data]; | |
}; | |
local *TFX::ConexConvert::ScanCounterCache::update_with = sub { }; | |
use warnings; | |
local @ARGV = qw( get --password=vgxysmB4vn --login=033333 --depart_code=FRA --arrive_code=PAR --dates=2010-12-10 ); | |
my $result = TFX::ConexConvert::App->run; | |
ok_regression( sub { $result }, 't/data/integration_result.csv', 'integration test result matches', 'binmode' ); | |
return; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings; | |
package TFX::ConexConvert; | |
# ABSTRACT: lädt Einweg-Flüge von Conex via XML herunter und konvertiert sie zu Sealtix | |
no warnings 'once'; | |
use HTTP::Request::Common 'POST'; | |
use XML::Simple; | |
use Text::CSV::Slurp; | |
use Parallel::Downloader 'async_download'; | |
use List::Util qw( min reduce ); | |
use lib '..'; | |
use TFX::ConexConvert::ScanCounterCache; | |
our $conn_per_host = 10; | |
sub conex_to_sealtix { | |
my ( $self, @xml_requests ) = @_; | |
my @xml = $self->retrieve_xml( @xml_requests ); | |
my @half_return_data = map $self->parse_xml( $_ ), @xml; | |
TFX::ConexConvert::ScanCounterCache->new->update_with( @half_return_data ); | |
my $sealtix = $self->half_return_as_sealtix( @half_return_data ); | |
return $sealtix; | |
} | |
sub half_return_as_sealtix { | |
my ( $self, @half_return_data ) = @_; | |
my @flights = map $self->conex_to_flight_list( $_ ), @half_return_data; | |
my $csv = $self->flight_list_to_sealtix( \@flights ); | |
return $csv; | |
} | |
sub flight_list_to_sealtix { | |
my ( $self, $flights ) = @_; | |
return if !@{$flights}; | |
my @fields = qw( | |
data_type table_name version operator flight_type depart_code arrive_code via_codes | |
flight_sequence duration_rules min_advance_days depart_date flight_carrier flight_number | |
flight_class flight_tariff flight_kind depart_time arrive_time seat_count special_code | |
currency season_surcharge price_two_way price_one_way combination_id direction rate1_type | |
rate1_min_age rate1_max_age rate1_price rate2_type rate2_min_age rate2_max_age rate2_price | |
rate3_type rate3_min_age rate3_max_age rate3_price leg1_flight_carrier leg1_flight_number | |
leg1_depart_code leg1_arrive_code leg1_depart_date leg1_depart_time leg1_arrive_time | |
leg2_flight_carrier leg2_flight_number leg2_depart_code leg2_arrive_code leg2_depart_date | |
leg2_depart_time leg2_arrive_time leg3_flight_carrier leg3_flight_number leg3_depart_code | |
leg3_arrive_code leg3_depart_date leg3_depart_time leg3_arrive_time | |
); | |
my $csv = Text::CSV::Slurp->create( | |
input => $flights, | |
field_order => \@fields, | |
sep_char => ';', | |
always_quote => 1, | |
quote_null => 1 | |
); | |
return $csv; | |
} | |
sub conex_to_flight_list { | |
my ( $self, $half_return_data ) = @_; | |
return map $self->half_return_fare_list_to_flights( $_ ), @{ $half_return_data->{halfReturnAvailableFareList} }; | |
} | |
sub half_return_fare_list_to_flights { | |
my ( $self, $half_return ) = @_; | |
return if !$half_return->{itineraryList}; | |
return map $self->itinerary_to_flight( $_, $half_return ), @{ $half_return->{itineraryList} }; | |
} | |
sub itinerary_to_flight { | |
my ( $self, $itinerary, $half_return ) = @_; | |
my $air_rule_set = $half_return->{airRuleSet}; | |
my $price = $half_return->{farePassengerTypePriceList}[0]{purchasePrice}; # kann auch ticketingPrice sein | |
my %flight = ( | |
currency => $price->{currencyIsoCode}, | |
price_one_way => $price->{value}, | |
combination_id => $air_rule_set->{openJawCombinationGroup}{name}, | |
); | |
$flight{season_surcharge} = $self->calculate_tax_addition( $half_return->{taxQuoteList} ); | |
$flight{duration_rules} = $self->generate_duration_rules( $air_rule_set ); | |
$flight{flight_class} = $self->flight_class( $air_rule_set->{cabinClass}{id} ); | |
my $segments = $itinerary->{segmentList}; | |
$self->extract_generic_flight_data_from_seg( \%flight, $segments->[0] ); | |
$self->process_multi_legs( \%flight, @{ $segments } ); | |
$self->add_default_fields( \%flight ); | |
return \%flight; | |
} | |
sub calculate_tax_addition { | |
my ( $self, $tax_quote_list ) = @_; | |
my @taxes = values %{$tax_quote_list->[0]{taxList}}; | |
return reduce { $a + $b->{price}{value} } 0, @taxes; | |
} | |
sub generate_duration_rules { | |
my ( $self, $rule_set ) = @_; | |
my @rules = map $self->convert_duration( $rule_set, $_ ), qw( minimumStay maximumStay ); | |
my $sunday_rule = $rule_set->{minimumStay}{sundayRule}; | |
push @rules, 'SUN' if $sunday_rule and $sunday_rule eq 'true'; | |
return join ',', @rules; | |
} | |
sub convert_duration { | |
my ( $self, $rule_set, $type ) = @_; | |
my $duration = $rule_set->{$type}; | |
die "duration type missing: '$type'" if !$duration; | |
my $prefix = $self->get_duration_rule_prefix( $type ); | |
$duration->{value} = min( $duration->{value}, 99 ); | |
return $prefix . "99" if $duration->{value} > 3 and $duration->{unit}{id} eq 'STU_MONTHS'; | |
return $prefix . "99" if $duration->{value} >= 1 and $duration->{unit}{id} eq 'STU_YEARS'; | |
return sprintf( "%s%02d", $prefix, $duration->{value} ) if $duration->{unit}{id} eq 'STU_DAYS'; | |
return sprintf( "%s%02d", $prefix, $duration->{value} * 30 ) if $duration->{unit}{id} eq 'STU_MONTHS'; | |
die "unknown duration unit id '$duration->{unit}{id}' with value '$duration->{value}'"; | |
} | |
sub get_duration_rule_prefix { | |
my ( $self, $type ) = @_; | |
return 'A' if $type eq 'minimumStay'; | |
return 'B' if $type eq 'maximumStay'; | |
die "unknown duration type"; | |
} | |
sub add_default_fields { | |
my ( $self, $flight ) = @_; | |
my %defaults = ( | |
data_type => 'F', | |
table_name => 'OWF', | |
version => '3.0', | |
); | |
$flight->{$_} = $defaults{$_} for keys %defaults; | |
return; | |
} | |
sub process_multi_legs { | |
my ( $self, $flight, @segments ) = @_; | |
return if @segments < 2; | |
$segments[$_]->{_leg_id} = $_ + 1 for 0 .. $#segments; | |
$self->add_extra_leg_data( $flight, $_ ) for @segments; | |
$self->post_process_multi_leg_flight( $flight ); | |
return; | |
} | |
sub post_process_multi_leg_flight { | |
my ( $self, $flight ) = @_; | |
delete $flight->{$_} for qw( flight_carrier flight_number ); | |
$flight->{via_codes} = join ',', @{ $flight->{via_codes} }; | |
$flight->{$_} = $flight->{_last_leg}{$_} for qw( arrive_time arrive_code ); | |
delete $flight->{_last_leg}; | |
return; | |
} | |
sub add_extra_leg_data { | |
my ( $self, $flight, $seg ) = @_; | |
push @{ $flight->{via_codes} }, $seg->{departure}{code} if $seg->{_leg_id} > 1; | |
my %fake_flight; | |
$self->extract_generic_flight_data_from_seg( \%fake_flight, $seg ); | |
$flight->{"leg$seg->{_leg_id}_$_"} = $fake_flight{$_} for keys %fake_flight; | |
$flight->{_last_leg} = \%fake_flight; | |
return; | |
} | |
sub extract_generic_flight_data_from_seg { | |
my ( $self, $flight, $seg ) = @_; | |
$flight->{depart_code} = $seg->{departure}{code}; | |
$flight->{flight_carrier} = $seg->{operatingAirline}{code}; | |
$flight->{flight_number} = $seg->{flightNumber}; | |
( $flight->{depart_date} = $seg->{departureDate} ) =~ s:(\d+)-(\d+)-(\d+):$3.$2.$1:; | |
( $flight->{depart_time} = $seg->{departureTimestamp} ) =~ s/^.*T(\d+):(\d+):.*$/$1$2/; | |
( $flight->{arrive_time} = $seg->{arrivalTimestamp} ) =~ s/^.*T(\d+):(\d+):.*$/$1$2/; | |
$flight->{arrive_code} = $seg->{destination}{code}; | |
return; | |
} | |
sub flight_class { | |
my ( $self, $class_id ) = @_; | |
my %classes = ( | |
CAB_ECONOMY => 'Y', | |
CAB_BUSINESS => 'C', | |
); | |
return $classes{$class_id} || die "Unknown class id: $class_id"; | |
} | |
sub parse_xml { | |
my ( $self, $xml ) = @_; | |
$XML::Simple::PREFERRED_PARSER = 'XML::Parser'; | |
my @basic_list_items = qw( | |
halfReturnAvailableFare itinerary documentType tax taxQuote availableBookingClass farePassengerTypePrice | |
closedUserGroup | |
); | |
my %options = ( | |
ForceArray => [ qw( airSegment ), @basic_list_items ], | |
KeyAttr => { | |
tax => "code", | |
documentType => 'id', | |
closedUserGroup => 'code', | |
}, | |
GroupTags => { segmentList => 'airSegment', }, | |
SuppressEmpty => undef, | |
); | |
$options{GroupTags}{ $_ . "List" } = $_ for @basic_list_items; | |
my $half_return_data = XMLin( $xml, %options ); | |
return $half_return_data; | |
} | |
sub retrieve_xml { | |
my ( $self, @xml_requests ) = @_; | |
my @http_reqs = map $self->make_http_request( $_ ), @xml_requests; | |
my @responses = async_download( requests => \@http_reqs, conns_per_host => $conn_per_host ); | |
my @xml = map { $_->[0] } @responses; | |
use File::Slurp; | |
write_file( "dump/response_" . time . rand( 999 ) . ".xml", $_ ) for @xml; | |
return @xml; | |
} | |
sub make_http_request { | |
my ( $self, $xml_request ) = @_; | |
my $url = "https://traffics20.flightconex.de/tourconex/servlet/XmlServlet"; | |
my $req = POST $url, [ request => $xml_request->as_string ]; | |
return $req; | |
} | |
1; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings; | |
package Parallel::Downloader; | |
=head1 SYNOPSIS | |
use HTTP::Request::Common qw( GET POST ); | |
use Parallel::Downloader 'async_download'; | |
# simple example | |
my @requests = map GET( "http://google.com" ), ( 1..15 ); | |
my @responses = async_download( requests => \@requests ); | |
# complex example | |
my @complex_reqs = ( ( map POST( "http://google.com", [ type_id => $_ ] ), ( 1..60 ) ), | |
( map POST( "http://yahoo.com", [ type_id => $_ ] ), ( 1..60 ) ) ); | |
my $downloader = Parallel::Downloader->new( | |
requests => \@complex_reqs, | |
workers => 50, | |
conns_per_host => 12, | |
aehttp_args => { | |
timeout => 30, | |
on_prepare => sub { | |
print "download started ($AnyEvent::HTTP::ACTIVE / $AnyEvent::HTTP::MAX_PER_HOST)\n" | |
} | |
}, | |
debug => 1, | |
logger => sub { | |
my ( $downloader, $message ) = @_; | |
print "downloader sez [$message->{type}]: $message->{msg}\n"; | |
}, | |
); | |
my @complex_responses = $downloader->run; | |
=cut | |
use Moose; | |
use MooseX::HasDefaults::RO; | |
has requests => ( isa => 'ArrayRef', required => 1, initializer => '_interleave_by_host' ); | |
has workers => ( isa => 'Int', default => 10 ); | |
has conns_per_host => ( isa => 'Int', default => 4, initializer => '_init_workers_per_host' ); | |
has aehttp_args => ( isa => 'HashRef', default => sub {{}} ); | |
has debug => ( isa => 'Bool', default => 0 ); | |
has logger => ( isa => 'CodeRef', default => sub { \&_default_log } ); | |
has _responses => ( isa => 'ArrayRef', default => sub {[]} ); | |
has _cv => ( isa => 'AnyEvent::CondVar', default => sub { AnyEvent->condvar } ); | |
use AnyEvent::HTTP; | |
use Sub::Exporter -setup => { exports => [ 'async_download' ] }; | |
sub async_download { | |
return __PACKAGE__->new( @_ )->run; | |
} | |
sub _init_workers_per_host { | |
my ( undef, $limit ) = @_; | |
$AnyEvent::HTTP::MAX_PER_HOST = $limit; | |
return; | |
} | |
sub _interleave_by_host { | |
my ( $self, $requests, $setter ) = @_; | |
my %hosts; | |
for ( @{$requests} ) { | |
my $host_name = $_->uri->host; | |
my $host = $hosts{$host_name} ||= []; | |
push @{$host}, $_; | |
} | |
my @interleaved_list; | |
while ( keys %hosts ) { | |
push @interleaved_list, shift @{$_} for values %hosts; | |
for ( keys %hosts ) { | |
next if @{$hosts{$_}}; | |
delete $hosts{$_}; | |
} | |
} | |
$setter->( \@interleaved_list ); | |
return; | |
} | |
sub run { | |
my ( $self ) = @_; | |
for ( 1 .. $self->sanitize_worker_max ) { | |
$self->_cv->begin; | |
$self->_log( msg => "$_ started", type => "WorkerStart", worker_id => $_ ); | |
$self->add_request( $_ ); | |
} | |
$self->_cv->recv; | |
return @{$self->_responses}; | |
} | |
sub add_request { | |
my ( $self, $worker_id ) = @_; | |
my $req = shift @{$self->requests}; | |
return $self->end_worker( $worker_id ) if !$req; | |
my $post_download_sub = $self->make_post_download_sub( $worker_id, $req ); | |
http_request( | |
$req->method, | |
$req->uri->as_string, | |
body => $req->content, | |
headers => $req->{_headers}, | |
%{$self->aehttp_args}, | |
$post_download_sub | |
); | |
my $host_name = $req->uri->host; | |
$self->_log( msg => "$worker_id accepted new request for $host_name", type => "WorkerRequestAdd", worker_id => $worker_id, req => $req ); | |
return; | |
} | |
sub make_post_download_sub { | |
my ( $self, $worker_id, $req ) = @_; | |
my $post_download_sub = sub { | |
push @{$self->_responses}, [ @_, $req ]; | |
my $host_name = $req->uri->host; | |
$self->_log( msg => "$worker_id completed a request for $host_name", type => "WorkerRequestEnd", worker_id => $worker_id, req => $req ); | |
$self->add_request( $worker_id ); | |
return; | |
}; | |
return $post_download_sub; | |
} | |
sub end_worker { | |
my ( $self, $worker_id ) = @_; | |
$self->_log( msg => "$worker_id ended", type => "WorkerEnd", worker_id => $worker_id ); | |
$self->_cv->end; | |
return; | |
} | |
sub sanitize_worker_max { | |
my ( $self ) = @_; | |
die "max should be 0 or more" if $self->workers < 0; | |
my $request_count = @{$self->requests}; | |
return $request_count if !$self->workers; # 0 = as many parallel as possible | |
return $request_count if $self->workers > $request_count; # not more than the request count | |
return $self->workers; | |
} | |
sub _log { | |
my ( $self, %msg ) = @_; | |
return if !$self->debug; | |
$self->logger->( $self, \%msg ); | |
return; | |
} | |
sub _default_log { | |
my ( $self, $msg ) = @_; | |
print "$msg->{msg}\n"; | |
return; | |
} | |
1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment