# Finding mutual friends with OpenMPI and map-reduce

Intro

Embedded devices such as RaspberryPI don’t have enough power for running Hadoop jobs. It is a complex piece of software. Instead of trying to adapt it, I decided to use a more "lightweight" solution, which is OpenMPI map-reduce. MPI was designed for distributed computations, so why not run a map-reduce framework on it? Let’s do it!

Goal

With Twitter sample data, we’re going to find mutual friends (or mutual subscribers) for each pair of users. To make it faster, we use OpenMPI map-reduce framework on the cluster built of 4 Odroid U3 machines (8G emmc flash)

OpenMPI cluster

Repeat these steps on each node:

Install OpenMPI (could take ~30min)

wget http://www.open-mpi.org/software/ompi/v1.8/downloads/openmpi-1.8.1.tar.gz
tar -zxvf openmpi-1.8.1.tar.gz

cd openmpi-1.8.1
./configure --prefix=/usr/local \
    --enable-mpirun-prefix-by-default \
    --enable-static

make all
sudo make install

Install mrmpi library

wget http://www.sandia.gov/~sjplimp/tars/mrmpi.tar.gz
tar xzvf mrmpi.tar.gz
cd mrmpi-*
cd src
make mpicc
make install

Enable SSH access

On master node (odroid0):

ssh-keygen -t dsa
ssh-copy-id mpiuser@odroid1 # Slave 1
ssh-copy-id mpiuser@odroid2 # Slave 2
...

Then, log in to each machine to confirm that everything is set up correctly.

Twitter user’s data

At first download and extract data:

wget https://snap.stanford.edu/data/twitter_combined.txt.gz
gunzip -d twitter_combined.txt.gz

This file contains 1768149 edges and 81306 nodes. It would be wasteful to read whole file every time the data is needed. Here is how HDFS is storing a single file:

HDFS data distribution

The single file is split into parts and stored across the nodes. Segments are also redundant.

As we operate on low-end embedded ARM devices, we rather won’t use HDFS. Instead, we may split the file into N parts and copy them to all of the nodes. You might use SCP, Rsync, or even NFS. Here is how to split a file:

split -l 10000 twitter_combined.txt tsplit
mkdir data
mv tsplit* data/

At the end each node (odroid0-3) should have access to "data" directory with N files of 10000 lines each.

How it’s done

About map reduce you may find tons of articles in google. The simplest explanation how it works is presented in a picture below.
Map Reduce structure

Using map and reduce operations we are going to present data in a form: (A,B) -> D,Z,F,G. Where D,Z,F,G are mutual friends for (A,B) tuple. In order to do that, follow the steps:

  1. Transform data to form A -> Friend list (e.g D,Z,E)

  2. Perform Cartesian product:
    (A, D) -> D, Z, E
    (A, Z) -> D, Z, E
    (A, E) -> D, Z, E

  3. Reduce all tuples to form (A, D) -> (D,Z,E)(Z,E,F,G)

  4. Intersect friends lists (A, D) -> (Z,E)

  5. Save output

IMHO hadoop API is far simpler to use. OpenMPI uses extra operations (in addition to map, reduce, sort) which have to be explained:

  • collate – method of a MapReduce object, which aggregates a KeyValue object across processors and converts it into a KeyMultiValue object
  • gather – method of a MapReduce object, which collects the key/value pairs of a KeyValue object spread across all processors to form a new KeyValue object on a subset (nprocs) of processors.

Implementation

A few notes according to code:

  1. Input data is in form:
    1 2
    1 3
    1 4
    2 1
    2 3

  2. FriendTuple is structure used as key

  3. Value is formed as array of integers, where value[0] == arr_length

  4. Output may be written to a file (OUTPUT_TO_FILE = 1) or send to memcached (OUTPUT_TO_FILE = 0)

  5. To enable printing on keys on screen set PRINT_OUTPUT = 1

#include "mpi.h"
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "sys/stat.h"
#include "mapreduce.h"
#include "keyvalue.h"
#include < sstream >
#include < libmemcached/memcached.h >

#define MAX(a,b) ((a) > (b) ? a : b)
#define MIN(a,b) ((a) < (b) ? a : b)
#define PRINT_OUTPUT 0
#define OUTPUT_TO_FILE 0

using namespace MAPREDUCE_NS;
typedef struct {
    int f1, f2;
} FriendTuple;

void fileread(int, char *, KeyValue *, void *);
void transform_user_friends(char *, int, char *, int, int *, KeyValue *, void *);
void transform_intersect(char *, int, char *, int, int *, KeyValue *, void *);
void first_map(uint64_t, char *, int, char *, int, KeyValue *, void *);
void swap(int*, int*);
int cmpfunc (const void*, const void*);
int find_intersection(int[], int, int[], int, int[]);
void output(uint64_t, char *, int, char *, int, KeyValue *, void *);
void set_key(char*, const char*);
void set_key(FriendTuple*, int*, int);

memcached_st *memc;
void initialize_memcached(){
    memcached_server_st *servers = NULL;
    memcached_return rc;

    memcached_server_st *memcached_servers_parse (char *server_strings);
    memc = memcached_create(NULL);

    servers= memcached_server_list_append(servers, "192.168.4.106", 11211, &rc);
    rc= memcached_server_push(memc, servers);

    if (rc != MEMCACHED_SUCCESS)
        fprintf(stderr,"Couldn't add server: %s\n",memcached_strerror(memc, rc));
}

int main(int narg, char **args) {
    MPI_Init(&narg,&args);

    int me,nprocs;
    MPI_Comm_rank(MPI_COMM_WORLD,&me);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);

    if (narg <= 1) {
        if (me == 0) printf("Syntax: mfriends file1 file2 ...\n");
        MPI_Abort(MPI_COMM_WORLD,1);
    }

    initialize_memcached();

    MapReduce *mr = new MapReduce(MPI_COMM_WORLD);
    mr->verbosity = 1;
    mr->timer = 1;
    //mr->memsize = 1;
    //mr->outofcore = 1;

    MPI_Barrier(MPI_COMM_WORLD);
    double tstart = MPI_Wtime();

    if(me == 0) printf("1. Map to form A -> B C D\n");
    mr->map(narg-1,&args[1],0,1,0,fileread,NULL);
    int nfiles = mr->mapfilecount;
    mr->collate(NULL);
    mr->reduce(transform_user_friends,NULL);

    MPI_Barrier(MPI_COMM_WORLD);
    double tstop = MPI_Wtime();

    if(me == 0) printf("2. Map to form (A,B) -> B C D\n");
    mr->map(mr,first_map, NULL);

    if(me == 0) printf("3. Reduce to form (A,B) -> (B C D)(B D E)\n");
    mr->collate(NULL);
    mr->reduce(transform_intersect,NULL);

    if(OUTPUT_TO_FILE){
        char fname[] = "output.txt";
        FILE* fp = fopen(fname,"w");
        if (fp == NULL) {
            printf("ERROR: Could not open output file");
            MPI_Abort(MPI_COMM_WORLD,1);
        }

        mr->gather(1);
        mr->map(mr, output, (void*)fp);
        fclose(fp);
    }

    delete mr;
    if (me == 0) {
        printf("Time to process %d files on %d procs = %g (secs)\n",
                nfiles,nprocs,tstop-tstart);
    }

    MPI_Finalize();
}

void fileread(int itask, char *fname, KeyValue *kv, void *ptr){
    struct stat stbuf;
    int flag = stat(fname,&stbuf);
    if (flag < 0) {
        printf("ERROR: Could not query file size\n");
        MPI_Abort(MPI_COMM_WORLD,1);
    }
    int filesize = stbuf.st_size;

    FILE *fp = fopen(fname,"r");
    char *text = new char[filesize+1];
    int nchar = fread(text,1,filesize,fp);
    text[nchar] = '\0';
    fclose(fp);

    const char *whitespace = " \t\n\f\r\0";
    char *id = strtok(text,whitespace);
    char *value = strtok(NULL,whitespace);
    int id1;
    int id2;
    while (id) {
        id1 = atoi(id);
        id2 = atoi(value);
        kv->add((char*)&id1,sizeof(int),(char*)&id2,sizeof(int));
        id = strtok(NULL,whitespace);
        value = strtok(NULL,whitespace);
    }

    delete [] text;
}

void transform_user_friends(char *key, int keybytes, char *multivalue,
        int nvalues, int *valuebytes, KeyValue *kv, void *ptr) {

    size_t t_block_len = (sizeof(int)*nvalues)+1;
    int* t_block = new int[t_block_len];
    int* t_len = &t_block[0];
    int* t_friends = &t_block[1];
    *t_len = nvalues;

    int* values = (int*) multivalue;
    for(int i = 0; i < *t_len; i++){
        t_friends[i] = values[i];
    }

    kv->add(key,keybytes, (char*) t_block, t_block_len*sizeof(int));
    delete [] t_block;
}

void set_key(char* key, const char* value){
    memcached_return rc;
    rc = memcached_set(memc, key, strlen(key), value, strlen(value), (time_t)0, (uint32_t)0);

    if (rc != MEMCACHED_SUCCESS)
        fprintf(stderr,"Couldn't store key: %s\n",memcached_strerror(memc, rc)); 

    if(PRINT_OUTPUT) printf("M_SET KEY: %s, VALUE: %s\n", key, value);
}

void set_key(FriendTuple* ft, int* fl, int len){
    char key[100];
    int cx = snprintf(key, 100, "m_%d_%d", ft->f1, ft->f2);

    if (cx>=0 && cx < 100){
        std::stringstream s;
        for(int i = 0; i < len; i++){
            s << std::to_string(fl[i]);
            if(i+1 < len) s << ",";
        }

        if(len == 0) s << "11111";
        const char* value = s.str().c_str();
        set_key(key, value);
    } else {
        fprintf(stderr,"Couldn't add key: m_%d_%d\n", ft->f1, ft->f2);
    }
}

void transform_intersect(char *key, int keybytes, char *multivalue,
        int nvalues, int *valuebytes, KeyValue *kv, void *ptr) {
    FriendTuple* ft = (FriendTuple*) key;

    int* t_block;
    int* t_len;
    int* t_len2;
    int* t_friends;
    int* t_friends2;

    if(nvalues > 1){ // If <= 1 then there is no user where u1 has u2 && u2 has u1
        t_block = (int*) (multivalue);
        t_len = &t_block[0];
        t_friends = &t_block[1];

        t_block = (int*) (multivalue+valuebytes[0]);
        t_len2 = &t_block[0];
        t_friends2 = &t_block[1];

        int* res_block = new int[MIN(*t_len, *t_len2)+1]; // arr[Length, intersection]
        int* intersection_array = &res_block[1];
        int* intersection_array_len = &res_block[0];

        *intersection_array_len = find_intersection(t_friends, *t_len, t_friends2,
                *t_len2, intersection_array);

        if(OUTPUT_TO_FILE){
            kv->add(key, keybytes, (char*) res_block, (*intersection_array_len+1)*(sizeof(int)));
        }else {
            set_key(ft, intersection_array, *intersection_array_len);
        }

        delete [] res_block;
    }
}

void output(uint64_t itask, char *key, int keybytes, char *value,
        int valuebytes, KeyValue *kv, void *ptr) {
    FILE* fp = (FILE*) ptr;
    FriendTuple* ft = (FriendTuple*) key;

    int* t_block = (int*) (value);
    int* t_len = &t_block[0];
    int* t_friends = &t_block[1]; // mutual friends

    fprintf(fp, "[%d, %d]: ",ft->f1, ft->f2);
    for(int j = 0; j < *t_len; j++){
        fprintf(fp, "%d ", t_friends[j]);
    }
    fprintf(fp, "\n");

    if(PRINT_OUTPUT){
        printf("[%d, %d]: ",ft->f1, ft->f2);
        for(int j = 0; j < *t_len; j++){
            printf("%d ", t_friends[j]);
        }
        printf("\n");
    }
}

int find_intersection(int a1[], int s1, int a2[], int s2, int iarr[]){
    int i = 0, j = 0, k = 0;
    while ((i < s1) && (j < s2)){
        if (a1[i] < a2[j]){
            i++;
        } else if (a1[i] > a2[j]) {
            j++;
        } else {
            iarr[k] = a1[i];
            i++;
            j++;
            k++;
        }
    }

    return(k);
}

int cmpfunc (const void * a, const void * b) {
    return ( *(int*)a - *(int*)b );
}

void swap(int* a, int* b){
    if(*a > *b){
        int tmp = *a;
        *a = *b;
        *b = tmp;
    }
}

//Pair (A,B) -> (B,C,D)
void first_map(uint64_t itask, char *key, int keybytes, char *value,
        int valuebytes, KeyValue *kv, void *ptr) {

    int* t_block = (int*) value;
    int* t_len = &t_block[0];
    int* t_friends = &t_block[1];

    qsort(t_friends, *t_len, sizeof(int), cmpfunc); // Executed on diff procs
    FriendTuple ft;
    memset(&ft,0,sizeof(FriendTuple));

    for(int i = 0; i < *t_len; i++){
        ft.f1 = *((int*) key);
        ft.f2 = t_friends[i];
        swap(&ft.f1, &ft.f2);

        kv->add((char*)&ft, sizeof(FriendTuple),(char*) t_block, valuebytes);
    }
}

Makefile

CC =            mpicc #-m64
CPP =           mpic++ #-m64
CCFLAGS =       -g -O -std=gnu++11 -Wall -I../src
LINK =          mpic++
LINKFLAGS =     -g -O -lmemcached
USRLIB =        ../src/libmrmpi_mpicc.a
SYSLIB =

all: mfriends
mfriends: mfriends.o $(USRLIB)
        $(LINK) $(LINKFLAGS) mfriends.o $(USRLIB) $(SYSLIB) -o mfriends
clean:
        rm *.o mfriends

%.o:%.cpp
        $(CPP) $(CCFLAGS) -c $<

Compile and run

np - number of processes
host - hosts or IPs of target machines

make
mpirun --allow-run-as-root -np 16 --host odroid0,odroid1,odroid2,odroid3 ~/mfriends ~/data/

Summary

Test was conducted on 4 machines (odroid u3). Each running max. 4 processes.
Results are:

procs time
1 623.784
2 562.679
3 202.538
4 179.679
5 197.639
6 262.464
7 56.529
8 34.286
9 31.052
10 31.041
11 28.649
12 26.256
13 23.924
14 23.61
15 22.775
16 21.538

OpenMPI map reduce results

The total time to process 421000 tuples on 16 cores (4 machines) equals 21.538. Results of last reduce operations were collected to master node and saved to disk. IMHO this is quite a good result. I expected somewhat worse performance.

Demo