Apache Druid is a high-performance real-time analytics database. Druid is a unique type of database that combines ideas from OLAP/analytic databases, timeseries databases, and search systems to enable new use cases in real-time architectures. For building a framework for time series trend analysis, prediction model and anomaly detection, I decided to use Druid. As per the requirements, apart from real-time data ingestion, there is a need for batch-based data ingestion too in Druid. After reading several blogs and articles around the production environment setup of Druid cluster for handling petabytes of data, I decided to follow the below architecture:
- 2 nodes as Druid master which run Druid Coordinator and Overlord processes. Two nodes are for high-availability.
- 2 nodes as Druid query server which run Druid Broker process. Two nodes are for high-availability and for defining two tiers of queries, i.e.
hot
and_default_tier
. Additionally, Router process on one node for a unique gateway to all Druid API access. - 3 nodes as Druid data server which run Druid Historical and MiddleManager processes. Also, three nodes of Zookeeper are also leveraged for running Druid Historical process. Caffeine is used for query results caching.
- 3 Zookeeper nodes for management of Druid current cluster state.
- Druid Metadata database node for running Postgres along with Grafana for visualizing Druid cluster metrics.
- S3 as a Druid Deep storage and also for jobs logs storage.
Below image precisely depicted the Druid architecture. An image is taken from the article Realtime Fast Data Analytics with Druid by Florian Troßbach
- Firstly, S3A connector is the newest connector with Hadoop. As per the AmazonS3 Hadoop wiki, previous connectors, i.e., S3 and S3N are deprecated now. With S3A connector there is no need of explicitly providing AWS_ACCESS_KEY & AWS_SECRET_KEY. S3A connector determines the credentials and role from
EC2 IAM
profile, that makes Druid common runtime properties files much simpler and more generic. - Secondly, although even if you are not planning to run Druid indexing job on Hadoop cluster, for ingesting Parquet format data into the Druid, index job should be of
type: index_hadoop
and hence workable S3A connector is required. Druid supports Parquet format through parquet extension. Since in my case data is in the Parquet format, I need to make it workable.
I am writing this article with perspective to Druid version-0.12.3. Recently, Druid released a new version but till yet I haven't evaluated it for S3A connector.
- If you try to run a
index_hadoop
job using Druid index jobs specs like below
"tuningConfig": {
"type": "hadoop",
"jobProperties": {
"fs.s3a.endpoint": "s3.ca-central-1.amazonaws.com",
"fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"io.compression.codecs": "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"
}
}
you will get an exception with stacktrace like below:
Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287) ~[?:?]
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669) ~[?:?]
at org.apache.hadoop.fs.FileSystem.access00(FileSystem.java:94) ~[?:?]
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703) ~[?:?]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685) ~[?:?]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) ~[?:?]
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) ~[?:?]
This is because of the usage of incompatabile version of the hadoop-client libraries and aws-sdk library in Druid code. This issue can be fixed either bumping the hadoop hadoop.compile.version
variable to value 2.8.3
from 2.7.3
or by downgrading aws.sdk.version
to one lower from 1.10.77
in maven pom.xml file. I decided to follow the first option and after bumping the version, re-built the Druid distribution.
- After fixing libraries incompatablilty issue, I faced another issue related to the storage path of segment file. Since Druid follows S3n connector, hence default
segmentOutputPath
value is based ons3n://
uri instead ofs3a://
. Below is the generated sample job spec for the index job.
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"paths" : "s3a://experiment-druid/input_data/wikiticker-2015-09-12-sampled.json.gz"
},
"metadataUpdateSpec" : null,
"segmentOutputPath" : "s3n://experiment-druid/deepstorage"
},
As per the hadoop-index
documentation, we can provide segmentOutputPath
in ioconfig
of the index job spec however, I was getting an error while providing a segment path. For fixing this issue, I found property useS3aSchema
in class S3DataSegmentPusherConfig
in druid source-code. Setting following below property in druid properties will fix this issue.
druid.storage.useS3aSchema=true
- Final issue I faced for making it workable was a runtime exception while pushing segment to the S3 deep-storage. Following below is the stacktrace of the execption
java.io.IOException: Output Stream closed
at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83) ~[hadoop-aws-2.8.3.jar:?]
at org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89) ~[hadoop-aws-2.8.3.jar:?]
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) ~[?:1.8.0_191]
at java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[?:1.8.0_191]
Thanks to the Druid user discussion thread and pull request raised by Christoph Hösler, I was able to fixed the problem by commenting out flush
statement in class JobHelper.
Compiling fixes of all the issues mentioned above, to make S3A Hadoop connector viable with Druid, follow the following steps
- Rebuild the Druid-0.12.3 source-code branch after changing the
hadoop.compile.version
property value to2.8.3
and commentingflush
statement in class JobHelper by using commandmvn clean package -DskipTests
- Replace
druid-hdfs-storage
with newly built artifacts in Druidextensions
folder. - Copy
hadoop-client
libraries to2.8.3
in Druidhadoop-dependencies
folder.
Finally, for testing, you can use example index-jobs published in my Github repo.
Happy Druid Exploration!!