Commit 9d0f0007 authored by Nathan's avatar Nathan

ENH: Added MapReduce-MPI (http://www.cs.sandia.gov/~sjplimp/download.html) library

parent fcfbdd62
.git* export-ignore
*.sh crlf=input
newalpha crlf=input
newversion crlf=input
......@@ -13,6 +13,7 @@ VTK_THIRD_PARTY_SUBDIR(LIBXML2 vtklibxml2)
VTK_THIRD_PARTY_SUBDIR(DICOMParser DICOMParser)
VTK_THIRD_PARTY_SUBDIR(MATERIALLIBRARY MaterialLibrary)
VTK_THIRD_PARTY_SUBDIR(LIBPROJ4 vtklibproj4)
VTK_THIRD_PARTY_SUBDIR(MRMPI mrmpi)
# Force build options for verdict
SET( VERDICT_USE_FLOAT OFF CACHE BOOL "VTK requires doubles" FORCE )
......
PROJECT (MAPREDUCE)
IF (VTK_USE_MPI)
ADD_DEFINITIONS("-DMPICH_IGNORE_CXX_SEEK")
IF (MPI_LIBRARIES)
SET (mrmpi_LIBS ${mrmpi_LIBS} "${MPI_LIBRARIES}")
ELSE (MPI_LIBRARIES)
IF (MPI_LIBRARY)
SET (mrmpi_LIBS ${mrmpi_LIBS} "${MPI_LIBRARY}")
ELSE (MPI_LIBRARY)
MESSAGE("Could not find the required MPI libraries")
ENDIF (MPI_LIBRARY)
IF (MPI_EXTRA_LIBRARY)
SET(mrmpi_LIBS ${mrmpi_LIBS} "${MPI_EXTRA_LIBRARY}")
ENDIF (MPI_EXTRA_LIBRARY)
ENDIF (MPI_LIBRARIES)
ELSE (VTK_USE_MPI)
SET (mrmpi_LIBS mpi)
INCLUDE_DIRECTORIES(mrmpi ${VTK_SOURCE_DIR}/Utilities/mrmpi/mpistubs)
ADD_SUBDIRECTORY (mpistubs)
ENDIF (VTK_USE_MPI)
ADD_SUBDIRECTORY (src)
#ADD_SUBDIRECTORY (examples)
Program: MapReduce-MPI (MR-MPI) Library
Copyright (2009) Sandia Corporation. Under the terms of Contract
DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
certain rights in this software.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of Sandia Corporation nor the names of contributors
to this software may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
This is the MapReduce-MPI (MR-MPI) library, version 13 April 2009.
MapReduce is the operation popularized by Google for computing on
large distributed data sets. See the Wikipedia entry on MapReduce for
an overview of what a MapReduce is.
The MR-MPI library is a simple, portable implementation of MapReduce
that runs on any serial desktop machine or large parallel machine
using MPI message passing.
As a user, you write a program which calls the MR-MPI library and you
provide functions that operate on your data such as a map() and a
reduce(). These functions are invoked by the library on single
processors, so that you typically do not need to write any parallel
code to perform a MapReduce.
The library is written in C++ and can be called from C++ or from C or
other hi-level languages such as Fortran or a scripting language. A
Python wrapper for the library is provided. If you want to run on a
single processor, a dummy MPI library is provided to link against. To
perform MapReduces in parallel, you need to link against an installed
MPI library.
The MR-MPI library is licensed under the Berkeley Software Distribution
(BSD) License, which basically means it can be used by anyone for any
purpose. See the LICENSE file in this directory for details.
The most current version of the library including all bug fixes and
new featues can be downloaded at
www.cs.sandia.gov/~sjplimp/download.html.
The author of the library is Steve Plimpton at Sandia National
Laboratories who can be contacted at sjplimp at sandia.gov. Or see
www.cs.sandia.gov/~sjplimp.
This MR-MPI distribution includes the following files and directories:
README this file
LICENSE the BSD License
doc documentation
examples simple example MapReduce programs
mpistubs dummy MPI library
python Python wrapper files
src library source files
user MapReduce programs
Point your browser at doc/mapreduce.html to get started.
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
This diff is collapsed.
SET (example_LIBS mrmpi)
SET (wordfreq_SOURCES wordfreq.cpp)
ADD_EXECUTABLE (wordfreq ${wordfreq_SOURCES})
TARGET_LINK_LIBRARIES (wordfreq ${example_LIBS})
SET (rmat_SOURCES rmat.cpp)
ADD_EXECUTABLE (rmat ${rmat_SOURCES})
INCLUDE_DIRECTORIES(rmat ${VTK_SOURCE_DIR}/Utilities/mrmpi/src)
TARGET_LINK_LIBRARIES (rmat ${example_LIBS})
# Targets
all: wordfreq cwordfreq rmat crmat
wordfreq: wordfreq.o $(LIB)
$(LINK) $(LINKFLAGS) wordfreq.o $(USRLIB) $(SYSLIB) -o wordfreq
cwordfreq: cwordfreq.o $(LIB)
$(LINK) $(LINKFLAGS) cwordfreq.o $(USRLIB) $(SYSLIB) -o cwordfreq
rmat: rmat.o $(LIB)
$(LINK) $(LINKFLAGS) rmat.o $(USRLIB) $(SYSLIB) -o rmat
crmat: crmat.o $(LIB)
$(LINK) $(LINKFLAGS) crmat.o $(USRLIB) $(SYSLIB) -o crmat
clean:
rm *.o wordfreq cwordfreq rmat crmat
# Rules
%.o:%.cpp
$(CPP) $(CCFLAGS) -c $<
%.o:%.c
$(CC) $(CCFLAGS) -c $<
# Linux/MPI Makefile for MapReduce examples, g++, MPI
CC = gcc
CPP = g++
CCFLAGS = -g -O -I../src -DMPICH_IGNORE_CXX_SEEK
LINK = g++
LINKFLAGS = -g -O -L../src
USRLIB = -lmrmpi -lmpich
SYSLIB = -lpthread
LIB = ../src/libmrmpi.a
include Makefile.common
# MPI-based makefile using mpic++ and mpicc.
CC = mpicc -m64
CPP = mpic++ -m64
CCFLAGS = -g -O -I../src
LINK = mpic++
LINKFLAGS = -g -O
#USRLIB = ../src/libmrmpi.a
USRLIB = -L../src -lmrmpi
SYSLIB =
LIB = ../src/libmrmpi.a
include Makefile.common
# MPI-based makefile using mpic++ and mpicc.
CC = mpicc
CPP = mpic++
CCFLAGS = -g -O -I../src
LINK = mpic++
LINKFLAGS = -g -O
#USRLIB = ../src/libmrmpi.a
USRLIB = -L../src -lmrmpi
SYSLIB =
LIB = ../src/libmrmpi.a
include Makefile.common
# Serial Makefile for MapReduce examples, g++, no MPI
CC = gcc
CPP = g++
CCFLAGS = -g -O -I../src -I../mpistubs
LINK = g++
LINKFLAGS = -g -O
USRLIB = ../src/libmrmpi.a ../mpistubs/libmpi.a
SYSLIB =
LIB = ../src/libmrmpi.a
include Makefile.common
# RedStorm (Cray XT3) Makefile for MapReduce examples, CC, MPI
CC = CC
CCFLAGS = -fastsse -I../src -DMPICH_IGNORE_CXX_SEEK
LINK = CC
LINKFLAGS = -O -L../src
USRLIB = -lmrmpi
SYSLIB =
LIB = ../src/libmrmpi.a
include Makefile.common
/* ----------------------------------------------------------------------
MR-MPI = MapReduce-MPI library
http://www.cs.sandia.gov/~sjplimp/mapreduce.html
Steve Plimpton, sjplimp@sandia.gov, Sandia National Laboratories
Copyright (2009) Sandia Corporation. Under the terms of Contract
DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
certain rights in this software. This software is distributed under
the modified Berkeley Software Distribution (BSD) License.
See the README file in the top-level MapReduce directory.
------------------------------------------------------------------------- */
/*
MapReduce random RMAT matrix generation example in C++
Syntax: rmat N Nz a b c d frac seed {outfile}
2^N = # of rows in RMAT matrix
Nz = non-zeroes per row
a,b,c,d = RMAT params (must sum to 1.0)
frac = RMAT randomization param (frac < 1, 0 = no randomization)
seed = RNG seed (positive int)
outfile = output RMAT matrix to this filename (optional)
*/
#include "mpi.h"
#include "math.h"
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "cmapreduce.h"
void generate(int, void *, void *);
void cull(char *, int, char *, int, int *, void *, void *);
void output(char *, int, char *, int, int *, void *, void *);
void nonzero(char *, int, char *, int, int *, void *, void *);
void degree(char *, int, char *, int, int *, void *, void *);
void histo(char *, int, char *, int, int *, void *, void *);
int ncompare(char *, int, char *, int);
void stats(int, char *, int, char *, int, void *, void *);
typedef struct { // RMAT params
int nlevels,order;
int nnonzero;
int ngenerate;
double a,b,c,d,fraction;
char *outfile;
FILE *fp;
} RMAT;
typedef int VERTEX; // vertex ID
typedef struct { // edge = 2 vertices
VERTEX vi,vj;
} EDGE;
/* ---------------------------------------------------------------------- */
int main(int narg, char **args)
{
int me,nprocs;
MPI_Init(&narg,&args);
MPI_Comm_rank(MPI_COMM_WORLD,&me);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
// parse command-line args
if (narg != 9 && narg != 10) {
if (me == 0) printf("Syntax: rmat N Nz a b c d frac seed {outfile}\n");
MPI_Abort(MPI_COMM_WORLD,1);
}
RMAT rmat;
rmat.nlevels = atoi(args[1]);
rmat.nnonzero = atoi(args[2]);
rmat.a = atof(args[3]);
rmat.b = atof(args[4]);
rmat.c = atof(args[5]);
rmat.d = atof(args[6]);
rmat.fraction = atof(args[7]);
int seed = atoi(args[8]);
if (narg == 10) {
int n = strlen(args[9]) + 1;
rmat.outfile = (char *) malloc(n*sizeof(char));
strcpy(rmat.outfile,args[9]);
} else rmat.outfile = NULL;
if (rmat.a + rmat.b + rmat.c + rmat.d != 1.0) {
if (me == 0) printf("ERROR: a,b,c,d must sum to 1\n");
MPI_Abort(MPI_COMM_WORLD,1);
}
if (rmat.fraction >= 1.0) {
if (me == 0) printf("ERROR: fraction must be < 1\n");
MPI_Abort(MPI_COMM_WORLD,1);
}
srand48(seed+me);
rmat.order = 1 << rmat.nlevels;
void *mr = MR_create(MPI_COMM_WORLD);
// loop until desired number of unique nonzero entries
MPI_Barrier(MPI_COMM_WORLD);
double tstart = MPI_Wtime();
int niterate = 0;
int ntotal = (1 << rmat.nlevels) * rmat.nnonzero;
int nremain = ntotal;
while (nremain) {
niterate++;
rmat.ngenerate = nremain/nprocs;
if (me < nremain % nprocs) rmat.ngenerate++;
MR_map_add(mr,nprocs,&generate,&rmat,1);
int nunique = MR_collate(mr,NULL);
if (nunique == ntotal) break;
MR_reduce(mr,&cull,&rmat);
nremain = ntotal - nunique;
}
MPI_Barrier(MPI_COMM_WORLD);
double tstop = MPI_Wtime();
// output matrix if requested
if (rmat.outfile) {
char fname[128];
sprintf(fname,"%s.%d",rmat.outfile,me);
rmat.fp = fopen(fname,"w");
if (rmat.fp == NULL) {
printf("ERROR: Could not open output file\n");
MPI_Abort(MPI_COMM_WORLD,1);
}
void *mr2 = MR_copy(mr);
MR_reduce(mr2,&output,&rmat);
fclose(rmat.fp);
MR_destroy(mr2);
}
// stats to screen
// include stats on number of nonzeroes per row
if (me == 0) {
printf("%d rows in matrix\n",rmat.order);
printf("%d nonzeroes in matrix\n",ntotal);
}
MR_reduce(mr,&nonzero,NULL);
MR_collate(mr,NULL);
MR_reduce(mr,&degree,NULL);
MR_collate(mr,NULL);
MR_reduce(mr,&histo,NULL);
MR_gather(mr,1);
MR_sort_keys(mr,&ncompare);
int total = 0;
MR_map_kv(mr,mr,&stats,&total);
if (me == 0) printf("%d rows with 0 nonzeroes\n",rmat.order-total);
if (me == 0)
printf("%g secs to generate matrix on %d procs in %d iterations\n",
tstop-tstart,nprocs,niterate);
// clean up
MR_destroy(mr);
free(rmat.outfile);
MPI_Finalize();
}
/* ----------------------------------------------------------------------
generate RMAT matrix entries
emit one KV per edge: key = edge, value = NULL
------------------------------------------------------------------------- */
void generate(int itask, void *kv, void *ptr)
{
RMAT *rmat = (RMAT *) ptr;
int nlevels = rmat->nlevels;
int order = rmat->order;
int ngenerate = rmat->ngenerate;
double a = rmat->a;
double b = rmat->b;
double c = rmat->c;
double d = rmat->d;
double fraction = rmat->fraction;
int i,j,ilevel,delta,m;
double a1,b1,c1,d1,total,rn;
EDGE edge;
for (m = 0; m < ngenerate; m++) {
delta = order >> 1;
a1 = a; b1 = b; c1 = c; d1 = d;
i = j = 0;
for (ilevel = 0; ilevel < nlevels; ilevel++) {
rn = drand48();
if (rn < a1) {
} else if (rn < a1+b1) {
j += delta;
} else if (rn < a1+b1+c1) {
i += delta;
} else {
i += delta;
j += delta;
}
delta /= 2;
if (fraction > 0.0) {
a1 += a1*fraction * (drand48() - 0.5);
b1 += b1*fraction * (drand48() - 0.5);
c1 += c1*fraction * (drand48() - 0.5);
d1 += d1*fraction * (drand48() - 0.5);
total = a1+b1+c1+d1;
a1 /= total;
b1 /= total;
c1 /= total;
d1 /= total;
}
}
edge.vi = i;
edge.vj = j;
MR_kv_add(kv,(char *) &edge,sizeof(EDGE),NULL,0);
}
}
/* ----------------------------------------------------------------------
eliminate duplicate edges
input: one KMV per edge, MV has multiple entries if duplicates exist
output: one KV per edge: key = edge, value = NULL
------------------------------------------------------------------------- */
void cull(char *key, int keybytes, char *multivalue,
int nvalues, int *valuebytes, void *kv, void *ptr)
{
MR_kv_add(kv,key,keybytes,NULL,0);
}
/* ----------------------------------------------------------------------
write edges to a file unique to this processor
------------------------------------------------------------------------- */
void output(char *key, int keybytes, char *multivalue,
int nvalues, int *valuebytes, void *kv, void *ptr)
{
RMAT *rmat = (RMAT *) ptr;
EDGE *edge = (EDGE *) key;
fprintf(rmat->fp,"%d %d 1\n",edge->vi+1,edge->vj+1);
}
/* ----------------------------------------------------------------------
enumerate nonzeroes in each row
input: one KMV per edge
output: one KV per edge: key = row I, value = NULL
------------------------------------------------------------------------- */
void nonzero(char *key, int keybytes, char *multivalue,
int nvalues, int *valuebytes, void *kv, void *ptr)
{
EDGE *edge = (EDGE *) key;
MR_kv_add(kv,(char *) &edge->vi,sizeof(VERTEX),NULL,0);
}
/* ----------------------------------------------------------------------
count nonzeroes in each row
input: one KMV per row, MV has entry for each nonzero
output: one KV: key = # of nonzeroes, value = NULL
------------------------------------------------------------------------- */
void degree(char *key, int keybytes, char *multivalue,
int nvalues, int *valuebytes, void *kv, void *ptr)
{
MR_kv_add(kv,(char *) &nvalues,sizeof(int),NULL,0);
}
/* ----------------------------------------------------------------------
count rows with same # of nonzeroes
input: one KMV per nonzero count, MV has entry for each row
output: one KV: key = # of nonzeroes, value = # of rows
------------------------------------------------------------------------- */
void histo(char *key, int keybytes, char *multivalue,
int nvalues, int *valuebytes, void *kv, void *ptr)
{
MR_kv_add(kv,key,keybytes,(char *) &nvalues,sizeof(int));
}
/* ----------------------------------------------------------------------
compare two counts
order values by count, largest first
------------------------------------------------------------------------- */
int ncompare(char *p1, int len1, char *p2, int len2)
{
int i1 = *(int *) p1;
int i2 = *(int *) p2;
if (i1 > i2) return -1;
else if (i1 < i2) return 1;
else return 0;
}
/* ----------------------------------------------------------------------
print # of rows with a specific # of nonzeroes
------------------------------------------------------------------------- */
void stats(int itask, char *key, int keybytes, char *value,
int valuebytes, void *kv, void *ptr)
{
int *total = (int *) ptr;
int nnz = *(int *) key;
int ncount = *(int *) value;
*total += ncount;
printf("%d rows with %d nonzeroes\n",ncount,nnz);
}
/* ----------------------------------------------------------------------
MR-MPI = MapReduce-MPI library
http://www.cs.sandia.gov/~sjplimp/mapreduce.html
Steve Plimpton, sjplimp@sandia.gov, Sandia National Laboratories
Copyright (2009) Sandia Corporation. Under the terms of Contract
DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
certain rights in this software. This software is distributed under
the modified Berkeley Software Distribution (BSD) License.
See the README file in the top-level MapReduce directory.
------------------------------------------------------------------------- */
/*
MapReduce word frequency example in C
Syntax: cwordfreq file1 file2 ...
(1) reads all files, parses into words separated by whitespace
(2) counts occurrence of each word in all files
(3) prints top 10 words
*/
#include "mpi.h"
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "sys/stat.h"
#include "cmapreduce.h"
void fileread(int, void *, void *);
void sum(char *, int, char *, int, int *, void *, void *);
int ncompare(char *, int, char *, int);
void output(int, char *, int, char *, int, void *, void *);
typedef struct {
int n,limit,flag;
} Count;
/* ---------------------------------------------------------------------- */
int main(int narg, char **args)
{
int me,nprocs;
int nwords,nunique;
double tstart,tstop;
Count count;
MPI_Init(&narg,&args);
MPI_Comm_rank(MPI_COMM_WORLD,&me);
MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
if (narg <= 1) {
if (me == 0) printf("Syntax: cwordfreq file1 file2 ...\n");
MPI_Abort(MPI_COMM_WORLD,1);
}
void *mr = MR_create(MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
tstart = MPI_Wtime();
nwords = MR_map(mr,narg-1,&fileread,&args[1]);
MR_collate(mr,NULL);
nunique = MR_reduce(mr,&sum,NULL);
MPI_Barrier(MPI_COMM_WORLD);
tstop = MPI_Wtime();
MR_sort_values(mr,&ncompare);
count.n = 0;
count.limit = 10;
count.flag = 0;
MR_map_kv(mr,&output,&count);
MR_gather(mr,1);
MR_sort_values(mr,&ncompare);
count.n = 0;
count.limit = 10;
count.flag = 1;
MR_map_kv(mr,&output,&count);
MR_destroy(mr);
if (me == 0) {
printf("%d total words, %d unique words\n",nwords,nunique);
printf("Time to wordcount %d files on %d procs = %g (secs)\n",
narg-1,nprocs,tstop-tstart);
}
MPI_Finalize();
}
/* ----------------------------------------------------------------------
read a file
for each word in file, emit key = word, value = NULL
------------------------------------------------------------------------- */
void fileread(int itask, void *kv, void *ptr)
{
// filesize = # of bytes in file
char **files = (char **) ptr;
struct stat stbuf;
int flag = stat(files[itask],&stbuf);
if (flag < 0) {
printf("ERROR: Could not query file size\n");
MPI_Abort(MPI_COMM_WORLD,1);
}
int filesize = stbuf.st_size;
FILE *fp = fopen(files[itask],"r");
char text[filesize+1];
int nchar = fread(text,1,filesize,fp);
text[nchar] = '\0';
fclose(fp);
char *whitespace = " \t\n\f\r\0";
char *word = strtok(text,whitespace);
while (word) {
MR_kv_add(kv,word,strlen(word)+1,NULL,0);
word = strtok(NULL,whitespace);
}
}
/* ----------------------------------------------------------------------
count word occurrence
emit key = word, value = # of multi-values
------------------------------------------------------------------------- */
void sum(char *key, int keybytes, char *multivalue,
int nvalues, int *valuebytes, void *kv, void *ptr)
{
MR_kv_add(kv,key,keybytes,(char *) &nvalues,sizeof(int));
}
/* ----------------------------------------------------------------------
compare two counts
order values by count, largest first
------------------------------------------------------------------------- */
int ncompare(char *p1, int len1, char *p2, int len2)
{
int i1 = *(int *) p1;
int i2 = *(int *) p2;
if (i1 > i2) return -1;
else if (i1 < i2) return 1;
else return 0;
}
/* ----------------------------------------------------------------------