-
-
Save drumadrian/8a0c1f6bd95cb70f871cffbc38084c22 to your computer and use it in GitHub Desktop.
# References: | |
# https://www.elastic.co/guide/en/logstash/current/plugins-inputs-s3.html | |
# https://www.elastic.co/blog/logstash-lines-inproved-resilience-in-S3-input | |
# https://www.elastic.co/guide/en/logstash/6.3/installing-logstash.html | |
# https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html | |
# https://www.garron.me/en/bits/curl-delete-request.html | |
sudo yum update -y | |
sudo yum install -y java-1.8.0-openjdk | |
java -version | |
# Logstash requires Java 8 | |
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch | |
sudo vi /etc/yum.repos.d/logstash.repo | |
# Insert this below as the contents (omitting the leading "#" ): | |
# [logstash-6.x] | |
# name=Elastic repository for 6.x packages | |
# baseurl=https://artifacts.elastic.co/packages/6.x/yum | |
# gpgcheck=1 | |
# gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch | |
# enabled=1 | |
# autorefresh=1 | |
# type=rpm-md | |
# Now install Logstash | |
sudo yum install -y logstash | |
sudo systemctl start logstash | |
sudo systemctl stop logstash | |
#ensure that logstash starts on boot | |
sudo systemctl enable logstash | |
# The S3 Logstash plugins should be present by default....otherwise you will need to install them | |
sudo yum install -y mlocate | |
sudo updatedb | |
cd /usr/share/logstash | |
bin/logstash-plugin list | |
# Config files are stored here: | |
# /etc/logstash/conf.d/*.conf | |
cd /etc/logstash/conf.d/ | |
sudo vi s3_input.conf | |
sudo systemctl start logstash | |
# Now look at the log file for logstash here: tail -f /var/log/logstash/logstash-plain.log | |
# Sample Logstash configuration for creating a simple | |
# AWS S3 -> Logstash -> Elasticsearch pipeline. | |
# References: | |
# https://www.elastic.co/guide/en/logstash/current/plugins-inputs-s3.html | |
# https://www.elastic.co/blog/logstash-lines-inproved-resilience-in-S3-input | |
# https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html | |
input { | |
s3 { | |
#"access_key_id" => "your_access_key_id" | |
#"secret_access_key" => "your_secret_access_key" | |
"region" => "us-west-2" | |
"bucket" => "testlogstashbucket1" | |
"prefix" => "Logs" | |
"interval" => "10" | |
"additional_settings" => { | |
"force_path_style" => true | |
"follow_redirects" => false | |
} | |
} | |
} | |
output { | |
elasticsearch { | |
hosts => ["http://vpc-test-3ozy7xpvkyg2tun5noua5v2cge.us-west-2.es.amazonaws.com:80"] | |
index => "logs-%{+YYYY.MM.dd}" | |
#user => "elastic" | |
#password => "changeme" | |
} | |
} | |
Hi @stoufa
I want to help you as much as I can but I haven't worked on this project for a while.
TL;DR
If you are working in biotech you should send me an email so I can give you much more modern strategies to ETL(Extract Transform and Load) in AWS. Native tools are better on AWS.
This approach with logstash is cool, but you end up using EC2 inefficiently since the instance will need to run all the time, but the compute work in Logstash may only run for a fraction of that time. This plugin will grab all the data for the prefix you configure. It will continue to look for new files in the bucket path. So, you don't have to regenerate the file each day, you should determine the widest window of data to pull into Logstash for loading into another destination.
From what I remember, the plugin will always look for new files to pull from S3, it probably used the list bucket feature or head-object feature to check agains historical objects processed. Since it has this feature, you can just pull all of your data and then delete the data you don't want in Elasticsearch(or the destination you pick)
I would recommend pulling all of the data and then using something like Curator or the Index Management in OpenSearch:
https://curator.readthedocs.io/en/latest/
https://opensearch.org/docs/latest/im-plugin/index/
If you really need to specify 1 day at a time, you should write it out explicitly as you suggested and update the file each day.
Example:
2020/07/03/
Keep in mind, that S3 is not a filesystem, so the /
in the path is just a nice way to represent folders to we humans. Underneath the hood of Amazon S3 (behind the web tier), the storage system will create new partitions to support the rate of requests to the data. S3 can handle far more TPS that you think ;-)
Good luck and I hope this helps.
Hi again Adrian (@drumadrian),
Thank you for the swift reply, We found a solution for the dynamic prefix value.
Now, I'm curious to know what other tools you recommend we use besides Logstash.
Email sent.
Thank you so much for your attention and participation.
And that's exactly what We did!
Thanks again for the great advice.
Hi again Adrian (@drumadrian),
Thank you for the swift reply, We found a solution for the dynamic prefix value. Now, I'm curious to know what other tools you recommend we use besides Logstash.
Email sent. Thank you so much for your attention and participation.
what was the solution you figured out for dynamic prefix ?
Hi @ganeshk-nd,
Before switching to AWS Kinesis Firehose, We used to generate the date in the required format and inject it in a template config file.
Here are some snippets from both the template file and the Python script.
input {
s3 {
"region" => "REGION_PLACEHOLDER"
"bucket" => "BUCKET_PLACEHOLDER"
"prefix" => "PREFIX_PLACEHOLDER"
"interval" => "10"
"additional_settings" => {
"force_path_style" => true
"follow_redirects" => false
}
}
}
...
# generating the folder having today's files;
# format: yyyy/mm/dd/ e.g. 2022/07/05/
today = date.today()
PREFIX = f'{today.year:4}/{today.month:02}/{today.day:02}/'
...
data = {
'REGION_PLACEHOLDER': args.region,
'BUCKET_PLACEHOLDER': BUCKET_NAME,
'PREFIX_PLACEHOLDER': PREFIX,
# if no environment is set, use dev by default
'ENVIRONMENT_PLACEHOLDER': 'prod' if args.prod else 'dev'
}
with open(f'templates/pipeline_{context}.conf') as f:
template = f.read()
result = template
for placeholder, value in data.items():
result = result.replace(placeholder, value)
# saving results to a file
output_file_path = '/path/to/logstash-x.y.z/config/pipeline.conf'
with open(output_file_path, 'w') as f:
f.write(result)
I hope this helps.
Hi Adrian, (@drumadrian)
Thanks for sharing. ✨
I'm tackling a slightly more challenging task where our S3 bucket has the following structure:
In this case, what should we put as a prefix? a
"."
? a"/"
? an empty string (""
)?or should we write a script that generates a new configuration file each day with the prefix set to
"<year>/<month>/<day>"
?I really appreciate any help you can provide.