Skip to content

Instantly share code, notes, and snippets.

@davideicardi
Last active April 4, 2024 19:00
Show Gist options
  • Save davideicardi/21aabb65faaa3c655770cf0ccbac6564 to your computer and use it in GitHub Desktop.
Save davideicardi/21aabb65faaa3c655770cf0ccbac6564 to your computer and use it in GitHub Desktop.
Running Cloudera with Docker for development/test

Cloudera quick start with docker

This installation of Cloudera is for development only, do not use it in production.

Getting the image

Download cloudera docker image from https://www.cloudera.com/downloads/quickstart_vms/5-13.html

See also. https://www.cloudera.com/documentation/enterprise/5-13-x/topics/quickstart_docker_container.html

Uncompress the file:

tar -xf cloudera-quickstart-vm-5.13.0-0-beta-docker.tar.gz

Import image inside your docker:

docker import cloudera-quickstart-vm-5.13.0-0-beta-docker.tar

Tag the image with a better name:

docker image tag IMAGE_HASH cloudera-5-13

Run the container

Run the container:

docker run --name quickstart.cloudera \
  --hostname=quickstart.cloudera \
  -d --privileged=true \
  -t -i cloudera-5-13 \
  /usr/bin/docker-quickstart

(hostname should not be changed, otherwise some services are not started correctly)

Attach to the container (to run command inside):

docker attach quickstart.cloudera

To detach the tty without exiting the shell, use the escape sequence Ctrl+p + Ctrl+q

IMPORTANT: For windows you should also expose one or more ports during docker run commands by adding options -p 8888:8888 -p 7180:7180 -p 8181:80 (host_port:container_port). See below for available ports.

NOTE: It will require several minutes to start all services and start to respond to http ports.

If you receive clock sync problems, try manually running /etc/init.d/ntpd start to sync time.

You can edit /usr/bin/docker-quickstart to disable the auto start of some of the services that you don't use to reduce the memory consumption.

Available services

Consider adding hostname quickstart.cloudera inside /etc/hosts that point to your container IP for a more friendly usage. Get the container IP using:

docker inspect quickstart.cloudera | grep "\"IPAddress\""

IMPORTANT: For windows use your HOST ip instead of the container ip.

Then just browse to the requested service, for example http://quickstart.cloudera:7180/ to connect to cloudera manager.

Available services/ports:

In this way you can connect to any service without exposing any ports.

Credentials used inside the Cloudera quick start:

  • username: cloudera
  • password: cloudera

Other info

For unknown reason sometime ntpd service is not started and you will see clock offset problems. To solve run /etc/init.d/ntpd stop and /etc/init.d/ntpd start inside the container.

Try Graceful shutdown using:

docker stop --time=60 quickstart.cloudera

Often after starting the image you should manually restart every Cloudera services using

curl -X POST -u "admin:admin" -i http://localhost:7180/api/v18/clusters/Cloudera%20QuickStart/commands/start

Upgrading cluster to Java 8 (1.8)

By default Cloudera container Java 7 (1.7). Here some guide to upgrade to Java 8:

https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_jdk_installation.html https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_cm_upgrading_to_jdk8.html

  • Download latest java 1.8 SDK, (this at the time of writing) (Cloudera suggest to use a specific revision, but seems to be not necessary).

  • extract it inside /usr/java/jdk1.8.0_171

    tar -xzf jdk-8u171-linux-x64.tar.gz
    mv jdk1.8.0_171 /usr/java/
    
  • Add/modify inside /etc/profile, /etc/default/cloudera-scm-server and /etc/default/bigtop-utils the following line:

    export JAVA_HOME=/usr/java/jdk1.8.0_171
    
  • Change sym link:

    rm /usr/bin/java
    ln -s /usr/java/jdk1.8.0_171/jre/bin/java /usr/bin/java
    
  • Change java for Cloudera host:

    • Open the Cloudera Manager Admin Console.
    • In the main navigation bar, click the Hosts tab and optionally click a specific host link.
    • Click the Configuration tab.
    • Select Category > Advanced.
    • Set the Java Home Directory property to the custom location.
    • Click Save Changes.
  • Restart the docker

Downgrade to Cloudera CDH 5.11 (from parcel)

CDH 5.11 is required to install Spark 2.1

  • Open Cloudera Manager
  • Click on Parcels icon (present)
  • Click to Configuration and add the following repositories:
    • http://archive.cloudera.com/cdh5/parcels/5.11/, CDH 5.11 is required by spark 2.1
  • Click Check for New Parcels
  • Select CDH (5.11)
  • Click Download, Distribute, Activate
  • Redeploy client configuration using Cloudera Manager
  • Restart the cluster/docker (check on CM if all is started correctly)

Upgrade/Add Spark 2 (from parcel)

First downgrade to CDH 5.11 (see above).

Download parcels

  • Open Cloudera Manager
  • Click on Parcels icon (present)
  • Click to Configuration and add the following repositories:
  • Click Check for New Parcels
  • Select Spark2 and click Download, Distributue, Activate
  • Redeploy Client Configuration using Cloudera Manager
  • Restart the cluster/docker

Download CDS (Cloudera Service)

Add service

  • Add the Spark 2 service to your cluster.
    • In step #1, select a dependency option:
      • HDFS, YARN, ZooKeeper: Choose this option if you do not need access to a Hive service.
      • HDFS, Hive, YARN, ZooKeeper: Hive is an optional dependency for the Spark service. If you have a Hive service and want to access Hive tables from your Spark applications, choose this option to include Hive as a dependency and have the Hive client configurations always available to Spark applications.
    • In step #2, when customizing the role assignments for CDS Powered By Apache Spark, add a gateway role to every host.
    • Note that the History Server port is 18089 instead of the usual 18088.
    • Complete the steps to add the Spark 2 service.

IMPORTANT: You should use spark2-shell and spark2-submit to use the new installed spark 2.

See also:

Install Kafka

Cloudera Kafka 3.1 will install kafka 1.0.1.

See https://www.cloudera.com/documentation/kafka/1-3-x/topics/kafka_installing.html

  • Download Kafka Parcel
  • Add the service from Cloudera Manager
  • Using Cloudera Manager-> Kafka -> Configurations
    • Increase Broker Heap Size
    • Change default replication factor from 3 to 1 using

Install Kerberos

  • From terminal inside /home/cloudera run sudo ./kerberos (the first time I have received an error, but running it a second time it will succeeded)
  • Run the CM Kerberos wizard using the information output by the script (Administration->Security->Enable Kerberos).
  • Select "Manage krb5.conf through Cloudera Manager"

Then, it will prompt you for the following details (accept defaults if not specified here):

KDC Type:                MIT KDC
KDC Server Host:         quickstart.cloudera
Kerberos Security Realm: CLOUDERA

Later, it will prompt you for KDC account manager credentials:

Username: cloudera-scm/admin (@ CLOUDERA)
Password: cloudera

Finally select "Yes, I am ready to restart the cluster now."

To connect a client host to the Kerberos Cloudera server install apt-get install krb5-user and follow instructions by entering required info (ensure that you can resolve the server name, quickstart.cloudera in this case). Usually it is recomanded to copy /etc/krb5.conf from server to all clients.

See also:

Grant access to hbase to admin user

The goal is to login to hbase shell using the hbase principal, from there grant access to another principal, in this case cloudera-scm/admin.

We can use the keytab file already created internally by cloudera inside /var/run/cloudera-scm-agent/process/, something similar:

kinit -k -t /var/run/cloudera-scm-agent/process/180-hbase-MASTER/hbase.keytab hbase/quickstart.cloudera@CLOUDERA 

Check with ls /var/run/cloudera-scm-agent/process your exact folder name.

From now you are authenticated as hbase.

Connect to hbase shell:

hbase shell

Grant admin user access to HBase:

Important: If you are using Kerberos principal names when setting ACLs for users, Hadoop uses only the first part (short) of the Kerberos principal when converting it to the username. Hence, for the principal ann/[email protected], HBase ACLs should only be set for user ann.

grant 'cloudera-scm', 'RWCA'
exit

Test by authenticate again using 'cloudera-scm/admin@CLOUDERA':

kinit cloudera-scm/admin@CLOUDERA

How to create a custom collection inside SolR

Generate a sample configuration file:

solrctl instancedir --generate /davide-test

Inside that directory you will find the conf/schema.xml configuration file with all the fields indexed inside solr. You can update it based on your needs.

Create and upload config to zookeeper:

solrctl instancedir --create davide-test /davide-test

Create core and collection inside SolR:

solrctl core --create davide-test

If you update local configuration you can update to zookeeper using:

solrctl instancedir --update davide-test /davide-test
solrctl core --reload davide-test
solrctl collection --reload davide-test

Consider that by default the configuration schema.xml and solrconfig.xml generated are full with examples and should be cleaned up before a real world usage. Here an example of a more compact configurations: https://gist.github.com/davideicardi/0cf9f08ebc618b8c0aa2b99dae5d7dc4

For more recent versions of SolR you can see the following files: https://github.com/apache/lucene-solr/tree/master/solr/server/solr/configsets/_default/conf

Create a collection with current date as suffix

Bash script to create a collection with the date as a suffix:

DIRECTORY=$(cd `dirname $0` && pwd)

DATE_FORMAT="%Y-%m-%d"
COLLECTION_PREFIX="my-collection"

TODAY=`date +$DATE_FORMAT`
TODAY_COLLECTION_NAME="$COLLECTION_PREFIX-$TODAY"

solrctl instancedir --create $TODAY_COLLECTION_NAME $DIRECTORY/solr-instancedir/
solrctl core --create $TODAY_COLLECTION_NAME

# Delete the collection
# YESTERDAY=`date --date='1 days ago' +$DATE_FORMAT`
# YESTERDAY_COLLECTION_NAME="$COLLECTION_PREFIX-$YESTERDAY"
# solrctl core --unload $YESTERDAY_COLLECTION_NAME

Read a CSV file from HDFS using impala

Import from CSV (TSF, tab separated file) skipping the first header line:

CREATE EXTERNAL TABLE IF NOT EXISTS your_table_name (
    uid INT,
    lastName STRING,
    firstName STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/your_directory/'
TBLPROPERTIES ("skip.header.line.count"="1");

Index a csv to SolR using CrunchIndexerTool

This guide show you how to index/upload a .csv file to SolR using CrunchIndexerTool. This procedure will read the csv from HDFS and push the index inside SolR.

See also https://www.cloudera.com/documentation/enterprise/latest/topics/search_spark_index_ref.html .

IMPORTANT: This procedure is quite similar to the one using MapReduceIndexerTool, but instead of writing directly the index file this procedure call SolR API so can be a little slowly. But you can also update documents, while using MapReduceIndexerTool you can just insert.

Assuming that you have:

  • a valid cloudera installation (see THIS_IS_YOUR_CLOUDERA_HOST, if using Docker Quickstart it should be quickstart.cloudera)
  • a csv file stored in HDFS (see THIS_IS_YOUR_INPUT_CSV_FILE, like /your-hdfs-dir/your-csv.csv)
  • a valid destination SolR collection with the expected fields already configured (see THIS_IS_YOUR_DESTINATION_COLLECTION)

For some reason inside the cloudera quickstart search-crunch-1.0.0-cdh5.13.0.jar is not present. I have copied it from cloudera repository. You can get a copy from https://repository.cloudera.com/content/repositories/releases/com/cloudera/search/search-crunch/1.0.0-cdh5.13.0/ and put inside /usr/lib/solr/contrib/crunch.

To delete previous data you can use:

solrctl collection --deletedocs THIS_IS_YOUR_DESTINATION_COLLECTION

For this example we will process a TAB separated file with uid, firstName and lastName columns. The first row contains the headers. The Morphlines configuration files will skip the first line, so the actual column name doesn't matter, columns are expected just in this order. On SolR we should configure the fields with something similar:

<field name="_version_" type="long" indexed="true" stored="true" />
<field name="uid" type="string" indexed="true" stored="true" required="true" />
<field name="firstName" type="text_general" indexed="true" stored="true" />
<field name="lastName" type="text_general" indexed="true" stored="true" />
<field name="text" type="text_general" indexed="true" multiValued="true" />

Then you should create a Morphlines configuration file (csv-to-solr-morphline.conf) with the following code:

# Specify server locations in a SOLR_LOCATOR variable; used later in
# variable substitutions:
SOLR_LOCATOR : {
  # Name of solr collection
  collection : "THIS_IS_YOUR_DESTINATION_COLLECTION"

  # ZooKeeper ensemble
  zkHost : "THIS_IS_YOUR_CLOUDERA_HOST:2181/solr"
}


# Specify an array of one or more morphlines, each of which defines an ETL
# transformation chain. A morphline consists of one or more potentially
# nested commands. A morphline is a way to consume records such as Flume events,
# HDFS files or blocks, turn them into a stream of records, and pipe the stream
# of records through a set of easily configurable transformations on the way to
# a target application such as Solr.
morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]

    commands : [
      {
        readCSV {
          separator : "\t"
          # This columns should map the one configured in SolR and are expected in this position inside CSV
          columns : [uid,lastName,firstName]
          ignoreFirstLine : true
          quoteChar : ""
          commentPrefix : ""
          trim : true
          charset : UTF-8
        }
      }

      # Consume the output record of the previous command and pipe another
      # record downstream.
      #
      # This command deletes record fields that are unknown to Solr
      # schema.xml.
      #
      # Recall that Solr throws an exception on any attempt to load a document
      # that contains a field that is not specified in schema.xml.
      {
        sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
          solrLocator : ${SOLR_LOCATOR}
        }
      }

      # log the record at DEBUG level to SLF4J
      { logDebug { format : "output record: {}", args : ["@{}"] } }

      # load the record into a Solr server or MapReduce Reducer
      {
        loadSolr {
          solrLocator : ${SOLR_LOCATOR}
        }
      }

    ]
  }
]

To import run the following command inside a cluster:

Running using spark-submit

# export myDriverJarDir=/opt/cloudera/parcels/CDH/lib/solr/contrib/crunch # for CDH with parcels
export myDriverJarDir=/usr/lib/solr/contrib/crunch # for CDH with packages
# export myDependencyJarDir=/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch # for CDH with parcels
export myDependencyJarDir=/usr/lib/search/lib/search-crunch # for CDH with packages
export myDependencyJarFiles=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ',' | head -c -1)
export myDriverJar=$(find $myDriverJarDir -maxdepth 1 -name 'search-crunch-*.jar' ! -name '*-job.jar' ! -name '*-sources.jar')

spark-submit \
    --jars $myDependencyJarFiles \
    --class org.apache.solr.crunch.CrunchIndexerTool \
    $myDriverJar \
    -D morphlineVariable.ZK_HOST=THIS_IS_YOUR_CLOUDERA_HOST:2181/solr \
    -D morphlineVariable.COLLECTION=THIS_IS_YOUR_DESTINATION_COLLECTION \
    --morphline-file ./csv-to-solr-morphline.conf \
    --pipeline-type spark \
    hdfs://THIS_IS_YOUR_CLOUDERA_HOST/THIS_IS_YOUR_INPUT_CSV_FILE

WARNING: I was unable to pass morphline variables, so they are repeated inside the morphline conf file.

Running using hadoop

WARNING: NOT WORKING DUE TO A CONNECTION PROBLEM ??

# export myDependencyJarDir=/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch # for CDH with parcels
export myDependencyJarDir=/usr/lib/search/lib/search-crunch # for CDH with packages
export myDependencyJarFiles=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ',' | head -c -1)
export myDependencyJarPaths=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ':' | head -c -1)
export HADOOP_CLASSPATH=$myDependencyJarPaths

hadoop \
    --config /etc/hadoop/conf.cloudera.yarn \
    jar /usr/lib/solr/contrib/crunch/search-crunch-*.jar \
    org.apache.solr.crunch.CrunchIndexerTool \
    --libjars $myDependencyJarFiles \
    -D morphlineVariable.ZK_HOST=THIS_IS_YOUR_CLOUDERA_HOST:2181/solr \
    -D morphlineVariable.COLLECTION=THIS_IS_YOUR_DESTINATION_COLLECTION \
    --morphline-file ./csv-to-solr-morphline.conf \
    --pipeline-type mapreduce \
    hdfs://THIS_IS_YOUR_CLOUDERA_HOST/THIS_IS_YOUR_INPUT_CSV_FILE

Some considerations:

  • By default Cloudera QuickStart has a very small memory and heap memory configuration. If you receive out of memory exception or heap exception I suggest to increase it using Cloudera Manager->Yarn->Configurations (http://THIS_IS_YOUR_CLOUDERA_HOST:7180/cmf/services/11/config#filterdisplayGroup=Resource+Management ) I have used 1 GB for memory and 500MB for heap for both map and reduce jobs. Consider also changing yarn.app.mapreduce.am.command-opts, mapreduce.map.java.opts, mapreduce.map.memory.mb and mapreduce.map.memory.mb inside /etc/hadoop/conf/map-red-sites.xml

Morphline manipulation

Before writing to SolR you can add other morphlines transformations to manipulate the input records. For example you can split fields, convert date times and more ... See http://kitesdk.org/docs/1.1.0/morphlines/morphlines-reference-guide.html .

You can also split morphline files if they are too big using the 'include' command.

Other resources:

Index a csv to SolR using MapReduceIndexerTool

This guide show you how to index/upload a .csv file to SolR using MapReduceIndexerTool. This procedure will read the csv from HDFS and write directly the index inside HDFS.

See also https://www.cloudera.com/documentation/enterprise/latest/topics/search_mapreduceindexertool.html .

IMPORTANT: Repeatedly running the MapReduceIndexerTool on the same set of input files can result in duplicate entries in the Solr collection. This occurs because currently the tool can only insert documents and cannot update or delete existing Solr documents.

IMPORTANT: Batch indexing into offline Solr shards is not supported in environments in which batch indexing into online Solr servers using GoLive occurs.

This method index data to SolR by directly writing the index file, it will not invoke HTTP SolR API.

Assuming that you have:

  • a valid cloudera installation (see THIS_IS_YOUR_CLOUDERA_HOST, if using Docker Quickstart it should be quickstart.cloudera)
  • a csv file stored in HDFS (see THIS_IS_YOUR_INPUT_CSV_FILE, like /your-hdfs-dir/your-csv.csv)
  • a valid destination SolR collection with the expected fields already configured (see THIS_IS_YOUR_DESTINATION_COLLECTION)
    • the output directory will be the SolR configured instanceDir (see THIS_IS_YOUR_CORE_INSTANCEDIR) and should be an HDFS path

To delete previous data you can use:

solrctl collection --deletedocs THIS_IS_YOUR_DESTINATION_COLLECTION

For this example we will process a TAB separated file with uid, firstName and lastName columns. The first row contains the headers. The Morphlines configuration files will skip the first line, so the actual column name doesn't matter, columns are expected just in this order. On SolR we should configure the fields with something similar:

<field name="_version_" type="long" indexed="true" stored="true" />
<field name="uid" type="string" indexed="true" stored="true" required="true" />
<field name="firstName" type="text_general" indexed="true" stored="true" />
<field name="lastName" type="text_general" indexed="true" stored="true" />
<field name="text" type="text_general" indexed="true" multiValued="true" />

Then you should create a Morphlines configuration file (csv-to-solr-morphline.conf) with the following code:

# Specify server locations in a SOLR_LOCATOR variable; used later in
# variable substitutions:
SOLR_LOCATOR : {
  # Name of solr collection (taken from --collection param)
  collection : "$COLLECTION"

  # ZooKeeper ensemble (taken from --zk-host param)
  zkHost : "$ZK_HOST"
}


# Specify an array of one or more morphlines, each of which defines an ETL
# transformation chain. A morphline consists of one or more potentially
# nested commands. A morphline is a way to consume records such as Flume events,
# HDFS files or blocks, turn them into a stream of records, and pipe the stream
# of records through a set of easily configurable transformations on the way to
# a target application such as Solr.
morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]

    commands : [
      {
        readCSV {
          separator : "\t"
          # This columns should map the one configured in SolR and are expected in this position inside CSV
          columns : [uid,lastName,firstName]
          ignoreFirstLine : true
          quoteChar : ""
          commentPrefix : ""
          trim : true
          charset : UTF-8
        }
      }

      # Consume the output record of the previous command and pipe another
      # record downstream.
      #
      # This command deletes record fields that are unknown to Solr
      # schema.xml.
      #
      # Recall that Solr throws an exception on any attempt to load a document
      # that contains a field that is not specified in schema.xml.
      {
        sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
          solrLocator : ${SOLR_LOCATOR}
        }
      }

      # log the record at DEBUG level to SLF4J
      { logDebug { format : "output record: {}", args : ["@{}"] } }

      # load the record into a Solr server or MapReduce Reducer
      {
        loadSolr {
          solrLocator : ${SOLR_LOCATOR}
        }
      }

    ]
  }
]

To import run the following command inside a cluster:

hadoop jar /usr/lib/solr/contrib/mr/search-mr-*-job.jar \
  org.apache.solr.hadoop.MapReduceIndexerTool \
  --output-dir hdfs://quickstart.cloudera/THIS_IS_YOUR_CORE_INSTANCEDIR/  \
  --morphline-file ./csv-to-solr-morphline.conf \
  --zk-host quickstart.cloudera:2181/solr \
  --solr-home-dir /THIS_IS_YOUR_CORE_INSTANCEDIR \
  --collection THIS_IS_YOUR_DESTINATION_COLLECTION \
  --go-live \
  hdfs://THIS_IS_YOUR_CLOUDERA_HOST/THIS_IS_YOUR_INPUT_CSV_FILE

Some considerations:

  • You can use sudo -u hdfs to run the above command because you should not have permissiong to write in the HDFS output directory.

  • By default Cloudera QuickStart has a very small memory and heap memory configuration. If you receive out of memory exception or heap exception I suggest to increase it using Cloudera Manager->Yarn->Configurations (http://THIS_IS_YOUR_CLOUDERA_HOST:7180/cmf/services/11/config#filterdisplayGroup=Resource+Management ) I have used 1 GB for memory and 500MB for heap for both map and reduce jobs. Consider also changing yarn.app.mapreduce.am.command-opts, mapreduce.map.java.opts, mapreduce.map.memory.mb and mapreduce.map.memory.mb inside /etc/hadoop/conf/map-red-sites.xml

  • If SolR crash during the go live phase it is probably an out of memory problem. Check SolR logs. In this case increase SolR heap size inside Cloudera Manager Admin Configuration.

Morphline manipulation

Before writing to SolR you can add other morphlines transformations to manipulate the input records. For example you can split fields, convert date times and more ... See http://kitesdk.org/docs/1.1.0/morphlines/morphlines-reference-guide.html .

You can also split morphline files if they are too big using the 'include' command.

Other resources:

Working with hdfs

Copy a file to HDFS

An option is to use Hue File Browser and upload features.

Another options is to copy it manually:

First copy it inside the container:

docker cp YOUR_FILE my_cloudera:/tmp/

Create a directory inside HDFS:

sudo -u hdfs hadoop fs -mkdir /YOUR_DEST_FOLDER

Note that I need to exec the command using sudo with user hdfs. Ensure that hdfs user is able to read that file.

Then from the container upload it to HDFS:

sudo -u hdfs hadoop fs -copyFromLocal /tmp/YOUR_FILE /YOUR_DEST_FOLDER/

Allow all users to write inside a directory

WARNING: For production use a more specific permission.

sudo -u hdfs hadoop fs -chmod -R 0777 /YOUR_DEST_FOLDER

Index csv to SolR using Spark and spark-solr

This guide show you how to index/upload a .csv file to SolR using Spark and spark-solr library. This procedure will read the csv from HDFS and push data to SolR.

With this solution you can write and update documents inside SolR. Consider that this method directly invoke SolR using HTTP API.

See also https://github.com/LucidWorks/spark-solr .

Assuming that you have:

  • a valid cloudera installation (see THIS_IS_YOUR_CLOUDERA_HOST, if using Docker Quickstart it should be quickstart.cloudera)
  • a csv file stored in HDFS (see THIS_IS_YOUR_INPUT_CSV_FILE, like /your-hdfs-dir/your-csv.csv)
  • a valid destination SolR collection with the expected fields already configured (see THIS_IS_YOUR_DESTINATION_COLLECTION)

For this example we will process a TAB separated file with UID, FIRST NAME and LAST NAME columns. The first row contains the headers. On SolR we should configure the fields with something similar:

<field name="_version_" type="long" indexed="true" stored="true" />
<field name="uid" type="string" indexed="true" stored="true" required="true" />
<field name="firstName" type="text_general" indexed="true" stored="true" />
<field name="lastName" type="text_general" indexed="true" stored="true" />
<field name="text" type="text_general" indexed="true" multiValued="true" />
<field name="_indexed_at_tdt" type="tdate" indexed="true" stored="true" required="false" multiValued="false" />

NOTE: On solr the field _indexed_at_tdt is automatically generated and required by spark-solr library.
See https://stackoverflow.com/questions/43371613/lucidworks-save-solr-format-unknown-field#43373502

Create a Scala project (see YOUR_SCALA_PROJECT) with the following files: Remember to use the same Scala version of your Cloudera cluster. Cloudera 5.13 use Spark 1.6 that is compiled with Scala 2.10.5 project (for more information check version using spark-shell command).

project/assembly.sbt:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")

build.sbt:

// Config based on:
// http://queirozf.com/entries/creating-scala-fat-jars-for-spark-on-sbt-with-sbt-assembly-plugin
// https://github.com/sbt/sbt-assembly
// https://stackoverflow.com/questions/28459333/how-to-build-an-uber-jar-fat-jar-using-sbt-within-intellij-idea

// build the final jar using:
//  sbt clean assembly

name := "screen-engine-lab-scala"

version := "0.1"

scalaVersion := "2.10.5"

resolvers += "Agile Lab Dev" at "https://dl.bintray.com/agile-lab-dev/Spark-Solr/"
resolvers += "Restlet repo" at "http://maven.restlet.com/"

// marked as provided because is already present in cloudera
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided"

libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.5" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"

// Dependency for CSV loading
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.5.0"
// Module found at https://dl.bintray.com/agile-lab-dev/Spark-Solr/
//  see resolvers
//  the default spark-solr is compiled with another spark version
// spark-solr has a dependency on org.restlet.jee#org.restlet;2.1.1 and org.restlet.jee#org.restlet.ext.servlet;2.1.1
//  that is not present in the default repo, you should also add http://maven.restlet.com/
libraryDependencies += "it.agilelab.bigdata.spark" % "spark-solr" % "1.1.0-spark-1.6-scala-2.10"

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", _ @ _*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

// Exclude scala runtime because already present inside spark
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

We can't use the official LucidWorks spark-solr library because is compiled with another Spark, SolR and Scala version. AgileLab has created a custom version compiled with Spark 1.6 and Scala 2.10. If can be found at https://dl.bintray.com/agile-lab-dev/Spark-Solr/. Library is:

    "it.agilelab.bigdata.spark" % "spark-solr" % "1.1.0-spark-1.6-scala-2.10"

Also consider that this library as a dependency on org.restlet that can be found in repository http://maven.restlet.com/.

We have also configured a fat/uber jar using sbt-assembly (more info at: https://github.com/sbt/sbt-assembly) to create an standalone jar that can be submitted to Spark. Remember to mark all dependencies provided by Cloudera/Hadoop/Spark as provided to be excluded from the final jar.

SparkSolrApp.scala (main scala file):

    import com.lucidworks.spark.SolrSupport
    import org.apache.solr.common.SolrInputDocument
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.api.java.JavaRDD
    import org.apache.spark.sql.{DataFrame, Row}
    
    object SparkSolrApp {
    
      val zookeeperHost = "THIS_IS_YOUR_CLOUDERA_HOST:2181/solr"
      val inputFile = "THIS_IS_YOUR_INPUT_CSV_FILE"
      val solrCollection = "THIS_IS_YOUR_DESTINATION_COLLECTION"
    
      def main(args: Array[String]) {
    
        println("Welcome to SparkSolrApp")
    
        val conf = new SparkConf().setAppName("SparkSolrApp")
        val sc = new SparkContext(conf)
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
        // Read CSV
        // For more info see: https://github.com/databricks/spark-csv
        val csvData = sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", "true")
          .option("delimiter", "\t")
          .option("inferSchema", "true")
          .load(inputFile)
    
        println("Importing lines : " + csvData.count)
    
        // Write to Solr
        writeBatchRDD(csvData)
        // or
        // writeDataFrame(csvData)
      }
    
      private def writeBatchRDD(csvData: DataFrame): Unit = {
        val solrDocs = csvData.rdd.map(toSolrDoc)
    
        SolrSupport.indexDocs(
          zookeeperHost,
          solrCollection,
          10,
          JavaRDD.fromRDD(solrDocs))
      }
    
      // TODO Writing using dataframe doesn't work because `solr` data source is not registered in our current spark-solr implementation:
    //  private def writeDataFrame(csvData: DataFrame): Unit = {
    //    csvData.write
    //      .format("solr")
    //      .option("zkhost", zookeeperHost)
    //      .option("collection", solrCollection)
    //      .option("gen_uniq_key", "true")
    //      .mode(org.apache.spark.sql.SaveMode.Overwrite)
    //      .save()
    //  }
    
      private def toSolrDoc(row: Row) = {
        val doc = new SolrInputDocument
    
        doc.addField("uid", row.getAs[String]("UID"))
        doc.addField("firstName", row.getAs[String]("FIRST NAME"))
        doc.addField("lastName", row.getAs[String]("LAST NAME"))
    
        doc
      }
    }

Create the assembly jar (fat/uber jar) using (TODO review compile warnings):

    sbt clean assembly

Copy fat jar to docker container:

    docker cp /home/davide/projects/YOUR_SCALA_PROJECT/target/scala-2.10/YOUR_SCALA_PROJECT-assembly-0.1.jar my_cloudera:/davide-test/

Run the spark job inside the cluster using:

    spark-submit --class SparkSolrApp /davide-test/YOUR_SCALA_PROJECT-assembly-0.1.jar

See also:

@JGustavo0
Copy link

Great job.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment