Hadoop

From Helpme

Jump to: navigation, search

Cloudera Hadoop (CDH-5.12) with YARN (2.6, MapReduce v2) and Spark (1.6).

Contents

Access

Access to the Hadoop cluster is limited. Faculty wishing to use it for research or teaching must first submit requests and plans for their usage to the ECSC for consideration.

Instructions for submitting such requests can be found here.

Availability

The Hadoop client software is available on any Design Center Linux system. (Direct remote access to Hadoop is not available. You'll need to SSH or FreeNX into a Linux box first, then connect from there)

Preparation

To use Hadoop, you must first have a Hadoop/HDFS account created for you. Simply having a Design Center account does not mean you have access to Hadoop. Generally, your professor will submit a class roster for processing to automatically create your account. However, if you believe this has not happened, contact them directly to ask them to submit a request for you.

Basic Info

  • Your Hadoop username is identical to your Design Center username.
  • Your HDFS home directory is: /user/<username> (This is where all non-absolute paths you use will begin)
  • Your HDFS space quota is set upon account creation. To see your quota, you can run the command get-hdfs-quota. It will list your usable HDFS quota. (This assumes the default replication level of 3. That means that if the script says you have 100GB of quota, then you actually have 300GB of raw quota, 100GB of which is usable)
  • Your HDFS file quota is 50,000 files. (Large numbers of files negatively impact HDFS performance. Hadoop is designed to handle large files, not large numbers of small files)
  • Data stored in HDFS is NOT backed up. This means that accidentally deleted files can not be recovered. Additionally, the default redundancy level of 3 will help protect against data loss due to hardware failure. However, beware that overriding the redundancy level downwards can put your data at risk.

Running

To run the Hadoop client software, you must first run the setup script:

$ setup cdh-5.12

which will put the binaries into your PATH, and set several associated environment variables.

Examples

The following examples can be run on the Hadoop cluster:

MapReduce

WordCount

Hadoop contains a very simple word count example. It simply divides up a text file into "words" based on whitespace, then totals up the number of times each unique "word" appears in the file. ("Word" is in quotes because since it splits based on whitespace you end up with "here", "here,", and "here." being counted as three different words)

  • Upload the text of War and Peace into your HDFS home directory:
$ hadoop fs -copyFromLocal /opt/cdh-5.12/data/war-and-peace.txt .
  • Run the WordCount example on it:
$ hadoop jar $HADOOP_PREFIX/share/hadoop/mapreduce/hadoop-mapreduce-examples*.jar wordcount war-and-peace.txt wordcount-out
  • View some of the results:
$ hadoop fs -cat wordcount-out/part-r-00000 | less
  • (OPTIONAL) When done, remove the generated data:
$ hadoop fs -rm -r -f wordcount-out
Since we didn't specify -skipTrash here, the data gets moved to the trash for a bit rather than being immediately deleted.

TeraSort

Hadoop comes with a standard sorting benchmark suite. Data to be sorted in created by TeraGen, then passed into TeraSort for sorting.

What follows is adapted from: http://www.michael-noll.com/blog/2011/04/09/benchmarking-and-stress-testing-an-hadoop-cluster-with-terasort-testdfsio-nnbench-mrbench/

  • Generate 10GB of data (with replication level 2, so 20GB of quota consumed) with TeraGen, putting the output into the directory "teragen" inside your HDFS home directory
$ hadoop jar $HADOOP_PREFIX/share/hadoop/mapreduce/hadoop-mapreduce-examples*.jar teragen -Ddfs.replication=2 -Dmapred.map.tasks=24 100000000 teragen
  • To make sure the data is there, you can run:
$ hadoop fs -ls teragen
  • Sort the previously created data (with replication level 1, so only 10GB of additional quota consumed, for a total of 30GB so far)
$ hadoop jar $HADOOP_PREFIX/share/hadoop/mapreduce/hadoop-mapreduce-examples*.jar terasort -Ddfs.replication=1 -Dmapred.reduce.tasks=24 teragen terasort
  • Check your disk usage:
$ hadoop fs -du -s -h .
which should tell you you've used 20GB. That's the size of the actual data in your HDFS home directory, ignoring replication. (The "." specifies "the current directory" and remember that any non-absolute PATH in HDFS starts in your home directory, so this is equivalent to "/user/<username>")
  • Check your disk quota:
$ hadoop fs -count -q .
This will output several numbers in the following order:
  1. File Count Quota
  2. File Count Quota Remaining
  3. Disk Space Quota
  4. Disk Space Quota Remaining
  5. Directory Count
  6. File Count
  7. Content Size (the amount of data present, ignoring replication)
  8. Path Name
  • Remove data when done. TeraGen and TeraSort will fail if their output directories already exist, so it's a good idea to remove them when you're done with them.
$ hadoop fs -rm -r -f -skipTrash terasort teragen
The "-skipTrash" option permanently deletes data rather than putting it in the trash. We do that with the teragen/terasort data since it's very easy to recreate if necessary. Leaving this flag off will move deleted items to the HDFS trash, where they will reside for a short time before being permanently deleted. (Thus allowing you to restore them if you accidentally deleted something)

Listing Running MapReduce Jobs

For a list of your MapReduce jobs currently running on the cluster, you can run:

$ mapred job -list

To see the status of a particular job:

$ mapred job -status <jobID>

Mahout

K-means

Adapted from this example: https://cwiki.apache.org/confluence/display/MAHOUT/Quick+tour+of+text+analysis+using+the+Mahout+command+line

  • Uncompress the example data into your local Unix home directory:
$ cd ~
$ mkdir reuters-out
$ cd reuters-out
$ tar xzvf /opt/cdh-5.12/data/reuters21578.tar.gz
$ cd ..
  • Convert the data from SGML to plain-text: (this will take a couple minutes)
$ mahout org.apache.lucene.benchmark.utils.ExtractReuters ~/reuters-out ~/reuters-text
  • Copy the data from your local Unix home directory your HDFS home directory: (this will take around 10 minutes)
$ hadoop fs -copyFromLocal ~/reuters-text/ ./reuters-text
  • (OPTIONAL) You can now remove the intermediate data from your unix home directory if desired:
$ rm -rf ~/reuters-out/ ~/reuters-text/
  • Now turn the raw text you uploaded into Mahout sequence files:
$ mahout seqdirectory -c UTF-8 -i ./reuters-text -o ./reuters-seqfiles
  • Examine the sequence files with seqdumper:
$ mahout seqdumper -i reuters-seqfiles/chunk-0 | less
$ mahout seq2sparse -i reuters-seqfiles/ -o reuters-vectors/ -ow -chunk 100 -x 90 -seq -ml 50 -n 2 -nv
  • Run kmeans:
$ mahout kmeans -i reuters-vectors/tfidf-vectors/ -c reuters-kmeans-centroids -cl -o reuters-kmeans-clusters -k 20 -ow -x 10 -dm org.apache.mahout.common.distance.CosineDistanceMeasure
  • Examine the clusters:
$ mahout clusterdump -d reuters-vectors/dictionary.file-0 -dt sequencefile -i reuters-kmeans-clusters/clusters-2-final -n 20 -b 100 -o cdump.txt -p reuters-kmeans-clusters/clusteredPoints/
$ less ~/cdump.txt
  • (OPTIONAL) Clean up intermediate files:
$ hadoop fs -rm -r -f -skipTrash reuters-kmeans-* reuters-vectors reuters-seqfiles

Spark

SparkPi

SparkPi is a simple example that estimates the value of pi. The output of this estimate is buried among a lot of other output when you run these commands, but will look like this: "Pi is roughly X.XXXXX"

  • Running on YARN in client mode (still runs on cluster, output goes to stdout)
spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn-client \
  $SPARK_HOME/examples/lib/spark-examples*.jar 10
  • Running on YARN in cluster mode (output ends up in HDFS) (FIXME: Where?)
spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn-cluster \
  $SPARK_HOME/examples/lib/spark-examples*.jar 10
  • Running on local client only (not using the cluster at all)
spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local \
  $SPARK_HOME/examples/lib/spark-examples*.jar 10

spark-shell

This example assumes you've already copied "war-and-peace.txt" into your HDFS home directory as directed in the wordcount example above.

$ spark-shell --master yarn
<output omitted>
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.0-cdh5.6.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
<output omitted>
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4ef1926f

scala> val warandpeace = sc.textFile("war-and-peace.txt")
warandpeace: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

scala> warandpeace.count
res1: Long = 65007

scala> exit
$ 

pyspark

pyspark works very similarly to spark-shell, however you then get to interact with spark in python, rather than scala

$ pyspark --master yarn
<output omitted>
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.0-cdh5.6.0
      /_/

Using Python version 2.6.6 (r266:84292, Jul 23 2015 15:22:56)
SparkContext available as sc, HiveContext available as sqlContext.
>>> sc
<pyspark.context.SparkContext object at 0x217d750>
>>> exit()

Getting Large Data into HDFS

An NFS-HDFS proxy exists to allow the upload of large data sets directly into HDFS without needing to stage them somewhere on the normal ECC unix storage system first. This simplifies the upload process, since you're not constrained by the 5GB production storage quota.

You can interact with your HDFS home directory directly from any ECC Linux workstation at the path

/HADOOP/hdfs/user/<username>

With a few limitations, you can write (and read) data to this directory using any normal unix application. (Files must be written sequentially. Random writes inside a single file are not supported. This is generally not a problem)

(NOTE: The NFS-HDFS proxy technology is new and somewhat fragile. It is not intended as the primary means to access/manipulate data in HDFS. For that see the hadoop fs and hdfs dfs subcommands)

Examples

If you have downloaded a large, compressed archive that you want to upload and uncompress for use in HDFS, you have a few options. For this example, we'll call the file you've downloaded 'example.txt.gz'. In each example, we'll uncompress the file as 'example.txt' in your HDFS home directory.

From Linux to HDFS via NFS-HDFS proxy

$ gzip -cd /home/username/example.txt.gz > /HADOOP/hdfs/user/username/example.txt

Uncompressing file already on HDFS via NFS-HDFS proxy

$ gzip -cd /HADOOP/hdfs/user/username/example.txt.gz > /HADOOP/hdfs/user/username/example.txt

The HDFS-NFS proxy is experimental, and may not always work. Here's how to do the same operations without it:

From Linux to HDFS without NFS-HDFS proxy

$ gzip -cd /home/username/example.txt.gz | hdfs dfs -put - /user/username/example.txt

Uncompressing file already on HDFS without NFS-HDFS proxy

$ hdfs dfs -cat /HADOOP/hdfs/user/username/example.txt.gz | gzip -cd - | hdfs dfs -put - /user/username/example.txt

Weka

While used alongside Mahout on Hadoop, Weka does NOT actually run inside Hadoop, nor is it able to access data in HDFS. To support the large datasets Weka processes, we thus provide the temporary filesystem:

/local/weka/

This filesystem is 400G in size and is configured to operate similar to /tmp. This directory is where Weka datasets (and only Weka datasets) should be stored.

Note that this directory is local to each linux workstation, so data written here is only accessible on the workstation where it was written and unlike your home directory it will not follow you to a different workstation. If you need to logout between downloading and processing your dataset with Weka, make note of the system you downloaded on (you can use the "hostname" command) since you will need to connect to the same system when you log back in.

NOTE: Files/directories in /local/weka are DELETED once they are 10 days old. This directory is intended for short-term use and immediate processing in Weka. Do NOT store any results or data that must persist in this directory or it will be deleted with no means to recover it.
!!! DO NOT STORE ANYTHING OTHER THAN EASILY REDOWNLOADABLE DATASETS IN /local/weka !!!
!!! YOU MUST PROCESS ALL WEKA DATASETS WITHIN 48 HOURS OF DOWNLOADING THEM !!!

Troubleshooting

Mahout jobs fail during map with a "Java heap space" error

You need to appropriately set the memory limits for your job. What those limits are depend on the job and how you've structured it. Two key settings for memory limits of map jobs that you likely need to set are mapreduce.map.memory.mb and mapreduce.map.java.opts. The first one specifies the total memory limit allowed for map processes, while the second one specifies the heap limit for map processes. (So the first one needs to be bigger than the second, since the process needs some available memory for non-heap usage)

If you wanted to set a heap limit of 1GB with a total limit of 1.5GB, you'd do the following:

$ mahout <your existing arguments> -Dmapreduce.map.memory.mb=1536 -Dmapreduce.map.java.opts=-Xmx1024m

If you're seeing a similar failure but during "reduce", then the settings you want are mapreduce.reduce.memory.mb and mapreduce.reduce.java.opts. (These settings are available for all jobs that ultimately fall back to using MapReduce, not just mahout)

Cluster Info

Overall Cluster

  • Environment: Cloudera CDH-5.12 - YARN (MapReduce v2) and Spark (1.6)
  • Worker Nodes: 24
  • Cores: 96
  • Threads: 192
  • RAM: 768GB
  • HDFS Storage (Raw): 261TB
  • HDFS Storage (Usable): 80TB (After factoring replication overhead)

Specific Nodes

  • NameNode
    • Hostname: name1.hadoop.dc.engr.scu.edu
    • Cores: 4
    • RAM: 32GB
  • SecondaryNameNode
    • Hostname: name2.hadoop.dc.engr.scu.edu
    • Cores: 4
    • RAM: 32GB
  • 24x Worker Nodes (DataNode/NodeManager)
    • Hostname: worker-M-NN.hadoop.dc.engr.scu.edu (Where M is 1-2 and NN is 01-12)
    • Cores: 4
    • Threads: 8
    • RAM: 32GB
    • Storage: 12TB
    • Bandwidth: 1Gbit

Further Information

Personal tools