Monday, April 17, 2017

Multiple Inputs, Outputs and Personalized Datatype in Hadoop

Wow, nearly two months since my last post!! My thesis needed more time then I excepted, but now it's over, so I'll probably have more time to dedicate to the blog (hopefully).

In this post, I'm going to show some basic options that Hadoop offers for improving a MapReduce job, as always using Kotlin as programming language. This time, the example used is the analysis of sensors generated data, available in two different formats. Moreover, we will produce two different kind of outputs, one with sensors data that reported a temperature over a certain threshold, and the other where the temperature is under that threshold. As always, all the code is available in this repository.

For summing up, these are the concepts presented in this post:
  1. Map-only Jobs.
  2. How to handle Multiple Inputs and Multiple Outputs in Hadoop.
  3. The setup() and the cleanup() method of a Mapper.
  4. How to define personalized datatypes as Mapper/Reducer Values.
Let's start with the first two points. A Map-only Job is a MapReduce program where the Reduce phase is missing, and the output of the job is the one from the Map phase. In order to have a Map-only Job, is enough to set the number of reducers to 0, like in line 27 of SensorsDriver.kt.
Handling multiple inputs is quite simple; the extension method addMultipleInputPath() accepts the input path of a file, or a folder, and the types of InputFormat and Mapper to handle this input.
Multiple outputs are, instead, a bit more complicated. They don't represent a different folder where to put the output files but the prefix to use for defining a certain kind of output. First, it is necessary to call the extension method addMultipleNamedOutput() to set the prefix of the output file to create, the type of OutputFormat to use and its Key and Value types (look at line 31 and 32 of SensorsDriver.kt).

SensorsMapperType1 and SensorsMapperType2 are the two mappers used for handling the two types of sensors data expected. Point 3 is connected to the Multiple Outputs concept of point 2. Specifically, what is important is that the setup() and cleanup() method are called exactly once for each mapper. The first method is called after the mapper instantiation, while the second one is called after the mapper job is completed. What makes this methods special, is that, for both of them, an instance of Context is available when they are called. In our example, the setup method is used for creating an instance of MultipleOutputs for writing in the two different file formats defined in the Driver class, while the cleanup() method is used for closing the outputs stream.
The MultipleOutputs.write() method works exactly like the Context.write() method used in previous examples, except that this time it requires, also, the prefix of the output file where to write.

SensorData addresses point #4. In order to define serializable classes that can be used by an Hadoop program, one of these two interfaces must be implemented: Writable and WritableComparable. The only difference between the two is that the second allows to define custom classes as keys, requiring them to be comparable. What is important is that the methods readFields(), for deserialization, and write(), for serialization, are correctly implemented, in particular, is important that the order in which the data are written, serialized, is the same in which are read, deserialized.

Conclusions
Before ending this post, I want to present some thoughts that I have in using Kotlin for this example:
  • I was hoping to use addMultipleNamedOutput<TextOutputFormat, Text, SensorData> but it isn't allowed by Kotlin. If you are interested, I asked on stackoverflow why I can't do that.
  • Kotlin data classes are good candidates for implementing the Writable and WritableComparable interfaces, making available for free the methods equals()/hashcode(), toString() and copy(). The only limitation is related to the fact that the attributes can't be immutable. The obvious reason for this is the fact that the real object instantiation is done by the readFields method, making not possible to have val attributes. However, the advantage in having free methods implementation in respect to losing immutability, makes them, in my opinion, preferable over traditional classes.

With these conclusions, I end my third blog post about Hadoop using Kotlin. As always, if you have suggestions or criticisms, don't esitate to comment here or to write me in private.