Skip to content

Instantly share code, notes, and snippets.

@aorjoa
Last active December 17, 2015 04:36
Show Gist options
  • Save aorjoa/a83da260e63e1267390b to your computer and use it in GitHub Desktop.
Save aorjoa/a83da260e63e1267390b to your computer and use it in GitHub Desktop.
Spark with QFS
1) Set environment following :
// set enviroment variable Hadoop - QFS interface.
$ export SPARK_CLASSPATH=/home/ubuntu/qfs-1.1.4/build/java/hadoop-qfs/hadoop-2.5.1-qfs-.jar:/home/ubuntu/qfs-1.1.4/build/java/qfs-access/qfs-access-.jar
// set enviroment variable for qfs_access library. (If not set qfs_accent can't be load)
$ export LD_LIBRARY_PATH=/home/ubuntu/qfs-1.1.4/build/release/lib/
2) start sample QFS chunkserver and metaserver via python script.
$ ./examples/sampleservers/sample_setup.py -a install
and you can see something at http://localhost:22000
3) try to echo text to QFS path
$ echo 'Hello World' | cptoqfs -s localhost -p 20000 -S -r 1 -k /qfs/tmp/helloworld -d -
and cat that file from distributed file system
$ qfscat -s localhost -p 20000 /qfs/tmp/helloworld
if it's work QFS perform properly.
4) start spark with
$ spark-shell
5) set hadoop configurations into spark context.
sc.hadoopConfiguration.set("fs.qfs.impl", "com.quantcast.qfs.hadoop.QuantcastFileSystem");
sc.hadoopConfiguration.set("fs.defaultFS", "qfs://localhost:20000");
sc.hadoopConfiguration.set("fs.qfs.metaServerHost", "localhost");
sc.hadoopConfiguration.set("fs.qfs.metaServerPort", "20000");
6) try to load file from QFS.
> val file = sc.textFile("/qfs/tmp/helloworld")
> file.count() // 1
> file.toArray().foreach(line => println(line)) // Hello World
====== Logging ======
scala> val file = sc.textFile("/qfs/tmp/helloworld")
15/12/15 16:24:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
15/12/15 16:24:49 INFO MemoryStore: ensureFreeSpace(110008) called with curMem=0, maxMem=560497950
15/12/15 16:24:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.4 KB, free 534.4 MB)
15/12/15 16:24:49 INFO MemoryStore: ensureFreeSpace(12789) called with curMem=110008, maxMem=560497950
15/12/15 16:24:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.5 KB, free 534.4 MB)
15/12/15 16:24:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:46341 (size: 12.5 KB, free: 534.5 MB)
15/12/15 16:24:49 INFO SparkContext: Created broadcast 0 from textFile at <console>:24
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:24
scala> file.count()
15/12/15 16:25:52 INFO FileInputFormat: Total input paths to process : 1
15/12/15 16:25:52 INFO SparkContext: Starting job: count at <console>:27
15/12/15 16:25:52 INFO DAGScheduler: Got job 0 (count at <console>:27) with 1 output partitions
15/12/15 16:25:52 INFO DAGScheduler: Final stage: ResultStage 0(count at <console>:27)
15/12/15 16:25:52 INFO DAGScheduler: Parents of final stage: List()
15/12/15 16:25:52 INFO DAGScheduler: Missing parents: List()
15/12/15 16:25:52 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at textFile at <console>:24), which has no missing parents
15/12/15 16:25:52 INFO MemoryStore: ensureFreeSpace(2864) called with curMem=122797, maxMem=560497950
15/12/15 16:25:52 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.8 KB, free 534.4 MB)
15/12/15 16:25:52 INFO MemoryStore: ensureFreeSpace(1700) called with curMem=125661, maxMem=560497950
15/12/15 16:25:52 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1700.0 B, free 534.4 MB)
15/12/15 16:25:52 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:46341 (size: 1700.0 B, free: 534.5 MB)
15/12/15 16:25:52 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
15/12/15 16:25:52 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at textFile at <console>:24)
15/12/15 16:25:52 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/12/15 16:25:52 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 2068 bytes)
15/12/15 16:25:52 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/12/15 16:25:52 INFO HadoopRDD: Input split: qfs://localhost:20000/qfs/tmp/helloworld:0+12
15/12/15 16:25:53 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/12/15 16:25:53 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/12/15 16:25:53 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/12/15 16:25:53 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/12/15 16:25:53 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/12/15 16:25:53 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2082 bytes result sent to driver
15/12/15 16:25:53 INFO DAGScheduler: ResultStage 0 (count at <console>:27) finished in 0.219 s
15/12/15 16:25:53 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 195 ms on localhost (1/1)
15/12/15 16:25:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/12/15 16:25:53 INFO DAGScheduler: Job 0 finished: count at <console>:27, took 0.455395 s
res4: Long = 1
scala> file.toArray().foreach(line => println(line))
15/12/15 16:25:59 INFO ContextCleaner: Cleaned accumulator 1
15/12/15 16:25:59 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:46341 in memory (size: 1700.0 B, free: 534.5 MB)
warning: there was one deprecation warning; re-run with -deprecation for details
15/12/15 16:26:00 INFO SparkContext: Starting job: toArray at <console>:27
15/12/15 16:26:00 INFO DAGScheduler: Got job 1 (toArray at <console>:27) with 1 output partitions
15/12/15 16:26:00 INFO DAGScheduler: Final stage: ResultStage 1(toArray at <console>:27)
15/12/15 16:26:00 INFO DAGScheduler: Parents of final stage: List()
15/12/15 16:26:00 INFO DAGScheduler: Missing parents: List()
15/12/15 16:26:00 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[1] at textFile at <console>:24), which has no missing parents
15/12/15 16:26:00 INFO MemoryStore: ensureFreeSpace(3016) called with curMem=122797, maxMem=560497950
15/12/15 16:26:00 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.9 KB, free 534.4 MB)
15/12/15 16:26:00 INFO MemoryStore: ensureFreeSpace(1731) called with curMem=125813, maxMem=560497950
15/12/15 16:26:00 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1731.0 B, free 534.4 MB)
15/12/15 16:26:00 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:46341 (size: 1731.0 B, free: 534.5 MB)
15/12/15 16:26:00 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861
15/12/15 16:26:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[1] at textFile at <console>:24)
15/12/15 16:26:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
15/12/15 16:26:00 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, ANY, 2068 bytes)
15/12/15 16:26:00 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/12/15 16:26:00 INFO HadoopRDD: Input split: qfs://localhost:20000/qfs/tmp/helloworld:0+12
15/12/15 16:26:00 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2058 bytes result sent to driver
15/12/15 16:26:00 INFO DAGScheduler: ResultStage 1 (toArray at <console>:27) finished in 0.020 s
15/12/15 16:26:00 INFO DAGScheduler: Job 1 finished: toArray at <console>:27, took 0.053142 s
Hello World
scala> 15/12/15 16:26:00 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 36 ms on localhost (1/1)
15/12/15 16:26:00 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
@aorjoa
Copy link
Author

aorjoa commented Dec 15, 2015

=== Updated ===

  1. edit conf/spark_env.sh (make sure your chmod +x spark_env.sh)
export LD_LIBRARY_PATH='/home/ubuntu/qfs-1.1.4/build/release/lib/'
export SPARK_CLASSPATH='/home/ubuntu/qfs-1.1.4/build/java/hadoop-qfs/hadoop-2.5.1-qfs-.jar:/home/ubuntu/qfs-1.1.4/build/java/qfs-access/qfs-access-.jar:/home/ubuntu/Documents/spark-1.5.0/dist/conf/core-site.xml'
  1. add file conf/core-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Setting for QFS-->

<configuration>

<property>
  <name>fs.qfs.impl</name>
  <value>com.quantcast.qfs.hadoop.QuantcastFileSystem</value>
</property>

<property>
  <name>fs.defaultFS</name>
  <value>qfs://localhost:20000</value>
</property>

<property>
  <name>fs.qfs.metaServerHost</name>
  <value>localhost</value>
</property>

<property>
  <name>fs.qfs.metaServerPort</name>
  <value>20000</value>
</property>

</configuration>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment