# Finding mutual friends with OpenMPI and map-reduce

0. List of contents

1. Preface

Embedded devices such as RaspberryPI don’t have enough power for running hadoop jobs. It is a really complex peace of software. Instead of trying to adapt it I decided to use more “lightweight” solution which is OpenMPI map-reduce. MPI was designed for distributed computations, so why not run map-reduce framework on it? Let’s do it!

2. Goal

Using Twitter’s users data we’re going to find mutual friends (or mutual subscribers) for each pair of users. To make it faster we will use OpenMPI map-reduce framework. Cluster contains of 4 odroid u3 machines witch 8Gb flash (emmc) memory.

2. OpenMPI cluster

Repeat these steps on each node:

1. Install OpenMPI (could take ~30min):
wget http://www.open-mpi.org/software/ompi/v1.8/downloads/openmpi-1.8.1.tar.gz
tar -zxvf openmpi-1.8.1.tar.gz
cd openmpi-1.8.1
./configure --prefix=/usr/local \
--enable-mpirun-prefix-by-default \
--enable-static
make all
sudo make install
2. Install mrmpi
wget http://www.sandia.gov/~sjplimp/tars/mrmpi.tar.gz
tar xzvf mrmpi.tar.gz
cd mrmpi-*
cd src
make mpicc
make install
3. Enable access via SSH

On master node (odroid0):

ssh-keygen -t dsa
ssh-copy-id mpiuser@odroid1 # Slave 1
ssh-copy-id mpiuser@odroid2 # Slave 2
...
4. Login to each machine to confirm that everything is setup correctly.

4. Twitter user’s data

At first download and extract data:

wget https://snap.stanford.edu/data/twitter_combined.txt.gz
gunzip -d twitter_combined.txt.gz

This file contains 1768149 edges and 81306 nodes. It would be wasteful to read whole file every time the data is needed. Here is how HDFS is storing a single file:

hdfs-data-distribution

Single file is split into parts and stored across the nodes. Segments are also redundant.
As we operate on low-end embedded ARM devices, we rather won’t use HDFS. Instead we may split the file into N parts and copy them to all of the nodes. You might use scp, rsync or even nfs. Here is how to split a file:

split -l 10000 twitter_combined.txt tsplit
mkdir data
mv tsplit* data/

At the end each node (odroid0-3) should have access to “data” directory with N files of 10000 lines each.

5. Logic: How it’s done

About map reduce you may find tons of articles in google. The simplest explanation how it works is presented in a picture below. (image source)
MapReduce_Work_Structure

Using map and reduce operations we are going to present data in a form: (A,B) -> D,Z,F,G. Where D,Z,F,G are mutual friends for (A,B) tuple. In order to do that, follow the steps:

1. Transform data to form A -> Friend list (e.g D,Z,E)
2. Perform Cartesian product:
(A, D) -> D, Z, E
(A, Z) -> D, Z, E
(A, E) -> D, Z, E

3. Reduce all tuples to form (A, D) -> (D,Z,E)(Z,E,F,G)
4. Intersect friends lists (A, D) -> (Z,E)
5. Save output

IMHO hadoop API is far simpler to use.  OpenMPI uses extra operations (in addition to map, reduce, sort) which have to be explained:

  • collate – method of a MapReduce object, which aggregates a KeyValue object across processors and converts it into a KeyMultiValue object,
  • gather – method of a MapReduce object, which collects the key/value pairs of a KeyValue object spread across all processors to form a new KeyValue object on a subset (nprocs) of processors.

6. Implementation

A few notes according to code:
1. Input data is in form:
1 2
1 3
1 4
2 1
2 3

2. FriendTuple is structure used as key
3. Value is formed as array of integers, where value[0] == arr_length
4. Output may be written to a file (OUTPUT_TO_FILE = 1) or send to memcached (OUTPUT_TO_FILE = 0)
5. To enable printing on keys on screen set PRINT_OUTPUT = 1

#include "mpi.h"
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "sys/stat.h"
#include "mapreduce.h"
#include "keyvalue.h"
#include <sstream>
#include <libmemcached/memcached.h>

#define MAX(a,b) ((a) > (b) ? a : b)
#define MIN(a,b) ((a) < (b) ? a : b)
#define PRINT_OUTPUT 0
#define OUTPUT_TO_FILE 0

using namespace MAPREDUCE_NS;
typedef struct {
    int f1, f2;
} FriendTuple;

void fileread(int, char *, KeyValue *, void *);
void transform_user_friends(char *, int, char *, int, int *, KeyValue *, void *);
void transform_intersect(char *, int, char *, int, int *, KeyValue *, void *);
void first_map(uint64_t, char *, int, char *, int, KeyValue *, void *);
void swap(int*, int*);
int cmpfunc (const void*, const void*);
int find_intersection(int[], int, int[], int, int[]);
void output(uint64_t, char *, int, char *, int, KeyValue *, void *);
void set_key(char*, const char*);
void set_key(FriendTuple*, int*, int);

memcached_st *memc;
void initialize_memcached(){
    memcached_server_st *servers = NULL;
    memcached_return rc;

    memcached_server_st *memcached_servers_parse (char *server_strings);
    memc = memcached_create(NULL);

    servers= memcached_server_list_append(servers, "192.168.4.106", 11211, &rc);
    rc= memcached_server_push(memc, servers);

    if (rc != MEMCACHED_SUCCESS)
        fprintf(stderr,"Couldn't add server: %s\n",memcached_strerror(memc, rc));
}

int main(int narg, char **args) {
    MPI_Init(&narg,&args);

    int me,nprocs;
    MPI_Comm_rank(MPI_COMM_WORLD,&me);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);

    if (narg <= 1) {
        if (me == 0) printf("Syntax: mfriends file1 file2 ...\n");
        MPI_Abort(MPI_COMM_WORLD,1);
    }

    initialize_memcached();

    MapReduce *mr = new MapReduce(MPI_COMM_WORLD);
    mr->verbosity = 1;
    mr->timer = 1;
    //mr->memsize = 1;
    //mr->outofcore = 1;

    MPI_Barrier(MPI_COMM_WORLD);
    double tstart = MPI_Wtime();

    if(me == 0) printf("1. Map to form A -> B C D\n");
    mr->map(narg-1,&args[1],0,1,0,fileread,NULL);
    int nfiles = mr->mapfilecount;
    mr->collate(NULL);
    mr->reduce(transform_user_friends,NULL);

    MPI_Barrier(MPI_COMM_WORLD);
    double tstop = MPI_Wtime();

    if(me == 0) printf("2. Map to form (A,B) -> B C D\n");
    mr->map(mr,first_map, NULL);

    if(me == 0) printf("3. Reduce to form (A,B) -> (B C D)(B D E)\n");
    mr->collate(NULL);
    mr->reduce(transform_intersect,NULL);

    if(OUTPUT_TO_FILE){
        char fname[] = "output.txt";
        FILE* fp = fopen(fname,"w");
        if (fp == NULL) {
            printf("ERROR: Could not open output file");
            MPI_Abort(MPI_COMM_WORLD,1);
        }

        mr->gather(1);
        mr->map(mr, output, (void*)fp);
        fclose(fp);
    }

    delete mr;
    if (me == 0) {
        printf("Time to process %d files on %d procs = %g (secs)\n",
                nfiles,nprocs,tstop-tstart);
    }

    MPI_Finalize();
}

void fileread(int itask, char *fname, KeyValue *kv, void *ptr){
    struct stat stbuf;
    int flag = stat(fname,&stbuf);
    if (flag < 0) {
        printf("ERROR: Could not query file size\n");
        MPI_Abort(MPI_COMM_WORLD,1);
    }
    int filesize = stbuf.st_size;

    FILE *fp = fopen(fname,"r");
    char *text = new char[filesize+1];
    int nchar = fread(text,1,filesize,fp);
    text[nchar] = '\0';
    fclose(fp);

    const char *whitespace = " \t\n\f\r\0";
    char *id = strtok(text,whitespace);
    char *value = strtok(NULL,whitespace);
    int id1;
    int id2;
    while (id) {
        id1 = atoi(id);
        id2 = atoi(value);
        kv->add((char*)&id1,sizeof(int),(char*)&id2,sizeof(int));
        id = strtok(NULL,whitespace);
        value = strtok(NULL,whitespace);
    }

    delete [] text;
}

void transform_user_friends(char *key, int keybytes, char *multivalue,
        int nvalues, int *valuebytes, KeyValue *kv, void *ptr) {

    size_t t_block_len = (sizeof(int)*nvalues)+1;
    int* t_block = new int[t_block_len];
    int* t_len = &t_block[0];
    int* t_friends = &t_block[1];
    *t_len = nvalues;

    int* values = (int*) multivalue;
    for(int i = 0; i < *t_len; i++){
        t_friends[i] = values[i];
    }

    kv->add(key,keybytes, (char*) t_block, t_block_len*sizeof(int));
    delete [] t_block;
}

void set_key(char* key, const char* value){
    memcached_return rc;
    rc = memcached_set(memc, key, strlen(key), value, strlen(value), (time_t)0, (uint32_t)0);

    if (rc != MEMCACHED_SUCCESS)
        fprintf(stderr,"Couldn't store key: %s\n",memcached_strerror(memc, rc)); 

    if(PRINT_OUTPUT) printf("M_SET KEY: %s, VALUE: %s\n", key, value);
}

void set_key(FriendTuple* ft, int* fl, int len){
    char key[100];
    int cx = snprintf(key, 100, "m_%d_%d", ft->f1, ft->f2);

    if (cx>=0 && cx<100){
        std::stringstream s;
        for(int i = 0; i < len; i++){
            s << std::to_string(fl[i]);
            if(i+1 < len) s << ",";
        }

        if(len == 0) s << "11111";
        const char* value = s.str().c_str();
        set_key(key, value);
    } else {
        fprintf(stderr,"Couldn't add key: m_%d_%d\n", ft->f1, ft->f2);
    }
}

void transform_intersect(char *key, int keybytes, char *multivalue,
        int nvalues, int *valuebytes, KeyValue *kv, void *ptr) {
    FriendTuple* ft = (FriendTuple*) key;

    int* t_block;
    int* t_len;
    int* t_len2;
    int* t_friends;
    int* t_friends2;

    if(nvalues > 1){ // If <= 1 then there is no user where u1 has u2 && u2 has u1
        t_block = (int*) (multivalue);
        t_len = &t_block[0];
        t_friends = &t_block[1];

        t_block = (int*) (multivalue+valuebytes[0]);
        t_len2 = &t_block[0];
        t_friends2 = &t_block[1];

        int* res_block = new int[MIN(*t_len, *t_len2)+1]; // arr[Length, intersection]
        int* intersection_array = &res_block[1];
        int* intersection_array_len = &res_block[0];

        *intersection_array_len = find_intersection(t_friends, *t_len, t_friends2,
                *t_len2, intersection_array);

        if(OUTPUT_TO_FILE){
            kv->add(key, keybytes, (char*) res_block, (*intersection_array_len+1)*(sizeof(int)));
        }else {
            set_key(ft, intersection_array, *intersection_array_len);
        }

        delete [] res_block;
    }
}

void output(uint64_t itask, char *key, int keybytes, char *value,
        int valuebytes, KeyValue *kv, void *ptr) {
    FILE* fp = (FILE*) ptr;
    FriendTuple* ft = (FriendTuple*) key;

    int* t_block = (int*) (value);
    int* t_len = &t_block[0];
    int* t_friends = &t_block[1]; // mutual friends

    fprintf(fp, "[%d, %d]: ",ft->f1, ft->f2);
    for(int j = 0; j < *t_len; j++){
        fprintf(fp, "%d ", t_friends[j]);
    }
    fprintf(fp, "\n");

    if(PRINT_OUTPUT){
        printf("[%d, %d]: ",ft->f1, ft->f2);
        for(int j = 0; j < *t_len; j++){
            printf("%d ", t_friends[j]);
        }
        printf("\n");
    }
}

int find_intersection(int a1[], int s1, int a2[], int s2, int iarr[]){
    int i = 0, j = 0, k = 0;
    while ((i < s1) && (j < s2)){
        if (a1[i] < a2[j]){
            i++;
        } else if (a1[i] > a2[j]) {
            j++;
        } else {
            iarr[k] = a1[i];
            i++;
            j++;
            k++;
        }
    }

    return(k);
}

int cmpfunc (const void * a, const void * b) {
    return ( *(int*)a - *(int*)b );
}

void swap(int* a, int* b){
    if(*a > *b){
        int tmp = *a;
        *a = *b;
        *b = tmp;
    }
}

//Pair (A,B) -> (B,C,D)
void first_map(uint64_t itask, char *key, int keybytes, char *value,
        int valuebytes, KeyValue *kv, void *ptr) {

    int* t_block = (int*) value;
    int* t_len = &t_block[0];
    int* t_friends = &t_block[1];

    qsort(t_friends, *t_len, sizeof(int), cmpfunc); // Executed on diff procs
    FriendTuple ft;
    memset(&ft,0,sizeof(FriendTuple));

    for(int i = 0; i < *t_len; i++){
        ft.f1 = *((int*) key);
        ft.f2 = t_friends[i];
        swap(&ft.f1, &ft.f2);

        kv->add((char*)&ft, sizeof(FriendTuple),(char*) t_block, valuebytes);
    }
}

7. Makefile

CC =            mpicc #-m64
CPP =           mpic++ #-m64
CCFLAGS =       -g -O -std=gnu++11 -Wall -I../src
LINK =          mpic++
LINKFLAGS =     -g -O -lmemcached
USRLIB =        ../src/libmrmpi_mpicc.a
SYSLIB =

all: mfriends
mfriends: mfriends.o $(USRLIB)
        $(LINK) $(LINKFLAGS) mfriends.o $(USRLIB) $(SYSLIB) -o mfriends
clean:
        rm *.o mfriends

%.o:%.cpp
        $(CPP) $(CCFLAGS) -c $<

8. Compile and run

– np – number of processes
– host – hosts or IPs of target machines

make
mpirun --allow-run-as-root -np 16 --host odroid0,odroid1,odroid2,odroid3 ~/mfriends ~/data/

9. Summary

Test was conducted on 4 machines (odroid u3). Each running max. 4 processes.
Results are:

procs time
1 623.784
2 562.679
3 202.538
4 179.679
5 197.639
6 262.464
7 56.529
8 34.286
9 31.052
10 31.041
11 28.649
12 26.256
13 23.924
14 23.61
15 22.775
16 21.538

openmpi-map-reduce-results

Total time to process 421000 tuples on 16 cores (4 machines) equals to 21.538. Results of last reduce operations was collected to master node and saved to disk. IMHO this is quite good result. I expected fairly worse performance.

10. Demo

Leave a Comment.