Big Data with Raspberry PI cluster, did you hear about that?
Who didn’t? The internet is swamped with tutorials explaining how to set up the Hadoop cluster on a bunch of ARM prototype boards. The idea itself is super cool, but instead of the intended HPC cluster, we often get a set of low-on-resources machines. It’s hard to do anything “big” with 1-2 GB of RAM; it gets even worse if the java process continues to drain the remaining scraps of memory…
Are you saying we can’t have a Hadoop ARM cluster?
No, instead I am questioning its capabilities, and by that it’s purpose.
Instead of going with the mainstream solution, I wanted to pick the right tool for the job. This is: finding a way to run distributed map-reduce tasks on low-end devices while keeping the resource utilization on a reasonable level. In this endeavor, I came up with a not-so-artificial problem of finding mutual friends based on the Twitter public dataset. But instead of ever-hungry Hadoop, I used a lightweight OpenMPI map-reduce library.
Did it work? Well, try it yourself (GitHub) or check out the rest of the post!
This experiment aims to boost up data processing speed by spreading the workload on a few low-end ARM boards. So, I'd recommend setting up a few Raspberry PIs within the same network pod, but if you can't do that, the OpenMPI framework also allows local execution (handy for testing).
My setup consists of 4x Odroid U3 boards connected by a cheap 1 GBps network switch. The eMMC speed is a clear advantage, and since we heavily depend on data access speed, the odroid board sounds like a good fit.
Accessing Twitter's data
In the real-world big data cluster, the data likely is stored on some distributed filesystem to provide faster access and data redundancy. As an example, the figure below explains how HDFS stores files.
Because we operate on low-end ARM devices, we won't use HDFS, but instead, we split the file into N parts and copy them across the nodes.
$ wget https://snap.stanford.edu/data/twitter_combined.txt.gz $ gunzip -d twitter_combined.txt.gz $ split -l 10000 twitter_combined.txt tsplit $ mv tsplit* data/
The OpenMPI library takes care of message passing but doesn't store the intermediate results of the map-reduce steps; therefore, we need to spawn up some remote database, i.e., Memcache:
$ docker run --network host --name my-memcache -d memcached
Running on multiple machines (via ssh):
mpirun --allow-run-as-root -np 16 --host odroid0,odroid1,odroid2,odroid3 MEMCACHED_SERVER=<host> ~/friends ~/data/
Testing on a single node:
$ mkdir build && cd build $ cmake ../ $ make $ MEMCACHED_SERVER=localhost ./friends ../data/twitter_combined.txt
The results can be optionally dumped to a file, for example:
$ cat output.txt | grep "[17711130, 99712896]" [17711130, 99712896]: 21447363 22462180 34428380 36475508 43003845 107512718 126391227 222851879 223990701
How does it work?
Map-reduce tutorials are abundant on the internet, so I'll skip the deep-dive and point you to the figure below that briefly explains the overall flow.
The example run for A & B friends who have D, Z, F, G friends in common:
Transform data to form A -> D,Z,E (friend list)
Perform a Cartesian product:
(A, D) -> D, Z, E (A, Z) -> D, Z, E (A, E) -> D, Z, E
Reduce all tuples to a form (A, D) -> (D,Z,E)(Z,E,F,G)
Intersect friends lists (A, D) -> (Z,E)
In comparison to far simpler (IMHO) Hadoop API the OpenMPI adds a few new operations:
- collate - method of a MapReduce object, that aggregates a KeyValue object across processors and converts it into a KeyMultiValue object
- gather - method of a MapReduce object, that collects the key/value pairs of a KeyValue object spread across all processors to form a new KeyValue object on a subset (nprocs) of processors.
The OpenMPI version of map-reduce is not as straightforward as using Hadoop, as it requires a few extra steps and a bit of coding work. However, resource-wise, it's a better fit for the cluster of ARM prototype boards.
Here's the results of the test I conducted on 4x odroids U3 (16 nprocs):
It took 21.538s to process 421000 tuples on 16 cores (4 machines), where on 1 core it takes 623.784s. Is that satisfactory? To me, yes... somewhat, I expected worse performance.