# RHadoop source code from Adler, R in a Nutshell, pp. 560 to 566. # This code has not all been tested. # This schema is used for parsing input data instead of read.csv or read.table because these functions are not accessable from RHadoop functions. mort.schema <- c( .X0=10, ResidentStatus=1, .X1=40, Education1989=2, Education2003=1, EducationFlag=1, MonthOfDeath=2, .X2=2, Sex=1, Agedetail=4, AgeSubstitution=1, AgeRecode52=2, AgeRecode27=2, AgeRecode12=2, AgeRecodeInfant22=2, PlaceOfDeath=1, MaritalStatus=1, DayOfWeekofDeath=1, .X3=16, CurrentDataYear=4, injuryAtWork=1, MannerOfdeath=1, MethodDisposition=1, Autopsy=1, .X4=34, ActivityCode=1, PlaceOfInjury=1, ICDCode=4, CauseRecode358=3, .X5=1, CauseRecode113=3, CauseRecode130=3, CauseRecode39=2m .X6=1, Conditions=261, .X8=1, race=2, BridgeRaceFlag=1, RaceimputationFlag=1, RaceRecode3=1, RaceRecode5=1, .X9=33, HispanicOrigin=3, .X10=1, HispanicOriginRecode=1) unpack.line <- function(data, schema) { filter.func <- function(x) { substr(x, 1, 2) != ".X" } data.pointer <- 1 output.data <- list( ) for (i in 1:length(schema)) { if (filter.func(names(schema)[i])) { output.data[[names(schema)[i]]] <- type.convert( substr(data, data.pointer, data.pointer + schema[i] - 1), as.is=TRUE) } data.pointer <- data.pointer + schema[i] } output.data } # Test script for testing unpack.line function test.data <- c("A11a 8", "B22b09", "C33C10") test.schema <- c(one=1, .X1=2, three=1, four=2) t(sapply(test.data, FUN=function(x) unpack.line(x, test.schema))) # Map function that counts number of deaths by sex sex.map.fn <- function(k, v) { record <- unpack.line(v, mort.schema) key <- ifelse(record[["Sex"]] == FALSE, "female", "male") # Create keyval pair with key "female" or "male" and value 1. keyval(key, 1) } # Reduce function that counts the set of values for each key. count.records.reduce.fn <- function(k, v) { keyval(k, length(v)) } # Use mapreduce on HDFS cluster to run the two previous functions # to input data and summarize results. sex.counts <- mapreduce( input="mort09", map=sex.map.fn, reduce=count.records.reduce.fn, combine=NULL, input.format="text", output.format="csv") # If run is successful, output something like this will be produced: 12/08/01 21:11:50 mapred.FileInputFormat: Total Input paths to process : 4 ... 12/08/01 21:12:07 streaming.StreamJob: map 0% reduce 0% 12/08/01 21:12:07 streaming.StreamJob: map 1% reduce 0% 12/08/01 21:12:12 streaming.StreamJob: map 2% reduce 0% 12/08/01 21:12:18 streaming.StreamJob: map 3% reduce 0% ... 12/08/01 21:26:08 streaming.StreamJob: map 100% reduce 67% 12/08/01 21:28:33 streaming.StreamJob: map 100% reduce 83% 12/08/01 11:31:00 streaming.StreamJob: map 100% reduce 100% 12/08/01 11:31:00 streaming.StreamJob: Jub complete: job_201208012345_0001 12/08/01 11:31:00 streaming.StreamJob: Output: # Display output. > from.dfs(sex.counts, format="csv", structured=TRUE) V1 V2 1 male 1220120 2 female 1221099