Rsudio provides a series of packages for the connection between R and hadoop. rhdfs provides the manipulation of HDFS in hadoop in R. rmr2 and plyrmr let user do mapreduce job in R. rhbase allow user to access data in hbase.
Before using packages in R, we implement wordcount by using hadoop streaming. First, export the hadoop home in the terminal by command export HADOOP_HOME=/usr/local/hadoop/
. New two R files named mapper.R and reducer.R, respectively.
1 2 3 4 5 6 7 8 9 10 11 12 13 trimWhiteSpace <- function (line) gsub("(^ +)|( +$)" , "" , line) splitIntoWords <- function (line) unlist(strsplit(line, "[[:space:]]+" )) con <- file("stdin" , open = "r" ) while (length(line <- readLines(con, n = 1 , warn = FALSE )) > 0 ) { line <- trimWhiteSpace(line) words <- splitIntoWords(line) for (w in words) cat(w, "\t1\n" , sep="" ) } close(con)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 trimWhiteSpace <- function (line) gsub("(^ +)|( +$)" , "" , line) splitLine <- function (line) { val <- unlist(strsplit(line, "\t" )) list(word = val[1 ], count = as.integer(val[2 ])) } env <- new.env(hash = TRUE ) con <- file("stdin" , open = "r" ) while (length(line <- readLines(con, n = 1 , warn = FALSE )) > 0 ) { line <- trimWhiteSpace(line) split <- splitLine(line) word <- split$word count <- split$count if (exists(word, envir = env, inherits = FALSE )) { oldcount <- get(word, envir = env) assign(word, oldcount + count, envir = env) } else assign(word, count, envir = env) } close(con) for (w in ls(env, all = TRUE )) cat(w, "\t" , get(w, envir = env), "\n" , sep = "" )
Using the example in previous article for hadoop and run hadoop streaming in the terminal:
1 2 3 4 5 6 7 8 9 cd ~/Downloads && mkdir testData && cd testDatawget http://www.gutenberg.org/ebooks/5000.txt.utf-8 cd ..hdfs dfs -copyFromLocal testData/ /user/celest/ hadoop jar /usr/local /hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \ -files mapper.R,reducer.R,/opt/intel/composer_xe_2013_sp1.3.174/compiler/lib/intel64/libiomp5.so \ -mapper "mapper.R -m" -reducer "reducer.R -r" \ -input /user/celest/testData/* -output /user/celest/testData2-output hdfs dfs -cat /user/celest/testData2-output/part-00000
We can obtain the same result for wordcount. Next, we are going to install the following packages:
1 2 3 4 5 rhdfs rmr ravro plyrmr rhbase
Note that ravro is the data parser for avro in R.
Some dependencies need to be installed in system:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 sudo apt-get install libcurl4-openssl-dev git R CMD javareconf sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libboost-system-dev libboost-filesystem-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev sudo apt-get install php5-dev php5-cli phpunit libbit-vector-perl libclass-accessor-class-perl python-all python-all-dev python-all-dbg libglib2.0-dev ruby-full ruby-dev ruby-rspec rake sudo gem install daemons gem_plugin mongrel wget http://apache.stu.edu.tw//ant/binaries/apache-ant-1.9.4-bin.tar.gz tar -zxvf apache-ant-1.9.4-bin.tar.gz cd apache-ant-1.9.4-binsudo mv apache-ant-1.9.4-bin /usr/local /ant cd /usr/local sudo chown -R celest ant cd ~/Downloadswget http://apache.stu.edu.tw/thrift/0.9.2/thrift-0.9.2.tar.gz tar -zxvf thrift-0.9.2.tar.gz cd thrift-0.9.2./configure make sudo make install sudo cp /usr/local /lib/libthrift-0.9.2.so /usr/lib/ sudo /sbin/ldconfig /usr/lib/libthrift-0.9.2.so
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 sudo subl /etc/bash.bashrc subl /usr/local /hadoop/etc/hadoop/hadoop-env.sh source /etc/bash.bashrcstart-dfs.sh start-yarn.sh hive --service hiveserver
Install the dependent R packages:
1 2 3 4 5 6 7 8 9 install.packages(c("rJava" , "Rcpp" , "rjson" , "RJSONIO" , "bit64" , "reshape2" , "data.table" , "plyr" , "dplyr" , "digest" , "functional" , "stringr" , "caTools" , "lazyeval" , "Hmisc" , "testthat" , "devtools" , "iterators" , "itertools" , "pryr" )) library (devtools)install_github("RevolutionAnalytics/quickcheck@3.2.0" , subdir = "pkg" ) install_github("RevolutionAnalytics/memoise" ) install_github("RevolutionAnalytics/rhdfs" , subdir = "pkg" ) install_github("RevolutionAnalytics/ravro" , subdir = "pkg/ravro" ) install_github("RevolutionAnalytics/rmr2" , subdir = "pkg" ) install_github("RevolutionAnalytics/plyrmr" , subdir = "pkg" ) install_github("RevolutionAnalytics/rhbase" , subdir = "pkg" )
When I install rhbase, I encounter a problem. The command pkg-config --cflags thrift
does not return -I/usr/local/include
instead the correct path -I/usr/local/include/thrift
. So I copy all files in /usr/local/include/thrift
into -I/usr/local/include
by cp -R /usr/local/include/thrift/* /usr/local/include/
.
When I use the function hdfs.init()
in package rhdfs
, it come to a error massage:
1 2 3 4 5 sh: /usr/lib/hadoop/bin/: is a directory Error in .jnew("org/apache/hadoop/conf/Configuration" ) : java.lang.ClassNotFoundException In addition: Warning message: running command '/usr/lib/hadoop/bin/ classpath' had status 126
The reason why cause this problem is wrong setting of HADOOP_CMD, fix it and get work.
install rhbase, I encounter a problem. The command pkg-config --cflags thrift
does not return -I/usr/local/include
instead the correct path -I/usr/local/include/thrift
. So I copy all files in /usr/local/include/thrift
into -I/usr/local/include
by cp -R /usr/local/include/thrift/* /usr/local/include/
.
1 2 3 4 5 6 7 cd ~/Downloadsgit clone https://github.com/RevolutionAnalytics/rmr2.git mv rmr2/pkg pkg rm -r rmr2 mv pkg rmr2 subl rmr2/R/streaming.R R CMD INSTALL rmr2
The lines shown in following:
1 2 3 4 5 paste.options( files = paste( collapse = "," , c(image.files, map.file, reduce.file, combine.file)))
can be modified to following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 paste.options( files = paste( collapse = "," , c(image.files, map.file, reduce.file, combine.file, "/opt/intel/composer_xe_2015.1.133/compiler/lib/intel64/libiomp5.so" , "/usr/lib/jvm/java-8-oracle/jre/lib/amd64/server/libjvm.so" ))), paste.options( files = paste( collapse = "," , c(image.files, map.file, reduce.file, combine.file, "/opt/intel/composer_xe_2015.1.133/compiler/lib/intel64/libiomp5.so" , "/opt/intel/composer_xe_2015.1.133/compiler/lib/intel64/libifport.so.5" , "/opt/intel/composer_xe_2015.1.133/compiler/lib/intel64/libifcoremt.so.5" , "/opt/intel/composer_xe_2015.1.133/compiler/lib/intel64/libimf.so" , "/opt/intel/composer_xe_2015.1.133/compiler/lib/intel64/libsvml.so" , "/opt/intel/composer_xe_2015.1.133/compiler/lib/intel64/libirc.so" , "/opt/intel/composer_xe_2015.1.133/compiler/lib/intel64/libirng.so" , "/opt/intel/composer_xe_2015.1.133/compiler/lib/intel64/libintlc.so.5" , "/usr/lib/jvm/java-8-oracle/jre/lib/amd64/server/libjvm.so" ))),
Test rhadoop series packages:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 library (rhdfs)library (data.table)library (dplyr)library (magrittr)N = 300 mydata = replicate(3 , rnorm(N)) %>% tbl_dt() %>% setnames(paste0("x" , 1 :3 )) %>% mutate(y = x1+2 *x2+3 *x3+rnorm(N,0 ,5 )) model = lm(y ~., mydata) hdfs.init() modelfilename = "mymodel" modelfile = hdfs.file(modelfilename, "w" ) hdfs.write(model, modelfile) hdfs.close(modelfile) model modelfile = hdfs.file(modelfilename, "r" ) m = hdfs.read(modelfile) model2 = unserialize(m) hdfs.close(modelfile) model2 N = 3000 mydata = replicate(3 , rnorm(N)) %>% tbl_dt() %>% setnames(paste0("x" , 1 :3 )) %>% mutate(y = x1+2 *x2+3 *x3+rnorm(N,0 ,5 )) model = lm(y ~., mydata) modelfilename = "my_smart_unique_name" modelfile = hdfs.file(modelfilename, "w" ) hdfs.write(model, modelfile) hdfs.close(modelfile) model modelfile = hdfs.file(modelfilename, "r" ) m = hdfs.read(modelfile)itertools model2 = unserialize(m) hdfs.close(modelfile) length(serialize(model,NULL )) length(m) modelfile = hdfs.file(modelfilename, "r" ) fileSize = hdfs.file.info(paste0("/user/celest/" , modelfilename))$size m = NULL i = 1 repeat { break_out = 65536 *i > fileSize size = ifelse(break_out, 65536 , fileSize - 65536 *(i-1 )) tmp = hdfs.read(modelfile, size, 65536 *(i-1 )) m = c(m, tmp) i = i + 1 if (break_out) break } model2 = unserialize(m) hdfs.close(modelfile) length(m) model2
Back to terminal, you can use hdfs to find what R write. (There is some functions in R doing the same way.)
1 2 3 4 5 hdfs dfs -ls hdfs dfs -rm -r my_smart_unique_name
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 library (data.table)library (dplyr)library (magrittr)library (rmr2)library (plyrmr)library (rhdfs)ints = to.dfs(1 :100 ) calc = mapreduce(input = ints, map = function (k, v) cbind(v, v=2 *v)) output = from.dfs(calc) output ints2 = to.dfs(matrix(rnorm(25 ), 5 )) calc2 = mapreduce(input = ints2, map = function (k, v) v %*% v) output2 = from.dfs(calc2) output2 N = 15 dat = replicate(3 , rnorm(N)) %>% tbl_dt() %>% setnames(paste0("x" , 1 :3 )) %>% mutate(y = x1+2 *x2+3 *x3+rnorm(N,0 ,5 )) %>% as.data.frame() hdfs.init() mydata = to.dfs(dat) as.data.frame(input(mydata), x1_x2 = x1*x2) bind.cols(input(mydata), x1_x2 = x1*x2) output(bind.cols(input(mydata),x1_x2 = x1*x2), "/tmp/mydata2" ) hbase thrift start R R > library (rhbase) R > hb.list.tables() R > q("no" )