MPI

From Parts
Jump to: navigation, search

External Resources

The MPI Standard

The MPI standard is a formal specification document that is backed by the MPI Forum, but no official standardization body (e.g. [1] or ANSI.

Please see this page for all of the MPI standardization documents, including the latest version of the standard, MPI 3.0.

Tutorials

The LLNL tutorial is excellent. The Internet has too many MPI-related tutorials to list them all here. Search engines can help you find some of them.

Books

I personally recommend Using MPI and Using MPI-2 as a means for learning both the basic and advanced features of MPI.

I recall that Peter Pacheco's book was good, but I lost my copy many years ago and can't certify that my recollection is accurate.

Profiling

External:

Internal:

Basic MPI

These are some very simple programs that start with a basic "Hello, world!" program, demonstrate broadcast and reduce, then bring these together to do the (in)famous Monte Carlo computation of Pi.

Makefile

CC      = mpicc
COPT    = -g -O2 -Wall #-std=gnu99

# OpenMP
# modify this as appropriate if you're compiler is not GCC
COPT += -fopenmp

LD      = $(CC)
CFLAGS  = $(COPT) -DCHECK_ERRORS

LDFLAGS = $(COPT)
LIBS    = -lm

all: hello.x reduce.x bcast.x montecarlo.x

extra: mpi-omp.x

%.x: %.o
	$(LD) $(LDFLAGS) $< $(LIBS) -o $@

%.o: %.c
	$(CC) $(CFLAGS) -c $< -o $@

clean:
	-rm -f *.o

realclean: clean
	-rm -f *.x

hello.c

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

#define MPI_THREAD_STRING(level)  \
        ( level==MPI_THREAD_SERIALIZED ? "THREAD_SERIALIZED" : \
                ( level==MPI_THREAD_MULTIPLE ? "THREAD_MULTIPLE" : \
                        ( level==MPI_THREAD_FUNNELED ? "THREAD_FUNNELED" : \
                                ( level==MPI_THREAD_SINGLE ? "THREAD_SINGLE" : "THIS_IS_IMPOSSIBLE" ) ) ) )

int main(int argc, char ** argv)
{
    /* These are the desired and available thread support.
       A hybrid code where all MPI calls are made from the main thread can used FUNNELED.
       If threads are making MPI calls, MULTIPLE is appropriate. */
    int requested = MPI_THREAD_FUNNELED, provided;

    /* MPICH2 will be substantially more efficient than OpenMPI 
       for MPI_THREAD_{FUNNELED,SERIALIZED} but this is unlikely
       to be a serious bottleneck. */
    MPI_Init_thread(&argc, &argv, requested, &provided);
    if (provided<requested)
    {
        printf("MPI_Init_thread provided %s when %s was requested.  Exiting. \n",
               MPI_THREAD_STRING(provided), MPI_THREAD_STRING(requested) );
        exit(1);
    }

    int world_size, world_rank;

    MPI_Comm_size(MPI_COMM_WORLD,&world_size);
    MPI_Comm_rank(MPI_COMM_WORLD,&world_rank);

    printf("Hello from %d of %d processors\n", world_rank, world_size);

    MPI_Finalize();
    return 0;
}

bcast.c

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>

#define MPI_THREAD_STRING(level)  \
    ( level==MPI_THREAD_SERIALIZED ? "THREAD_SERIALIZED" : \
        ( level==MPI_THREAD_MULTIPLE ? "THREAD_MULTIPLE" : \
            ( level==MPI_THREAD_FUNNELED ? "THREAD_FUNNELED" : \
                ( level==MPI_THREAD_SINGLE ? "THREAD_SINGLE" : "THIS_IS_IMPOSSIBLE" ) ) ) )

#ifdef CHECK_MPI_ERRORS
#define CHECK_MPI(rc) \
    do {                                                \
        if (rc!=MPI_SUCCESS) {                          \
            printf("MPI call failed.  Exiting. \n");    \
            exit(1);                                    \
        }                                               \
    } while (0) 
#else
#define CHECK_MPI(rc)
#endif

int main(int argc, char ** argv)
{
    int rc;

    /* These are the desired and available thread support.
       A hybrid code where all MPI calls are made from the main thread can used FUNNELED.
       If threads are making MPI calls, MULTIPLE is appropriate. */
    int requested = MPI_THREAD_FUNNELED, provided;

    /* MPICH2 will be substantially more efficient than OpenMPI 
       for MPI_THREAD_{FUNNELED,SERIALIZED} but this is unlikely
       to be a serious bottleneck. */
    rc = MPI_Init_thread(&argc, &argv, requested, &provided); CHECK_MPI(rc);
    if (provided<requested)
    {
        printf("MPI_Init_thread provided %s when %s was requested.  Exiting. \n",
               MPI_THREAD_STRING(provided), MPI_THREAD_STRING(requested) );
        exit(1);
    }

    int world_size, world_rank;

    rc = MPI_Comm_size(MPI_COMM_WORLD,&world_size); CHECK_MPI(rc);
    rc = MPI_Comm_rank(MPI_COMM_WORLD,&world_rank); CHECK_MPI(rc);

    int root = 0, count = 1;

    /* the ternary is often branchless... */
    int i, n = (argc>1 ? atoi(argv[1]) : 1000);
    rc = MPI_Bcast(&n, count, MPI_INT, root, MPI_COMM_WORLD); CHECK_MPI(rc);

    /* seed the RNG with something unique to a rank */
    srand(world_rank);

    double x = 0.0;
    for (i=0;i<n;i++)
        x += (double)rand()/(double)RAND_MAX;

    printf("%d: The sum of %d random numbers is %lf.\n", world_rank, n, x);

    MPI_Finalize();
    return 0;
}

reduce.c

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

#define MPI_THREAD_STRING(level)  \
    ( level==MPI_THREAD_SERIALIZED ? "THREAD_SERIALIZED" : \
        ( level==MPI_THREAD_MULTIPLE ? "THREAD_MULTIPLE" : \
            ( level==MPI_THREAD_FUNNELED ? "THREAD_FUNNELED" : \
                ( level==MPI_THREAD_SINGLE ? "THREAD_SINGLE" : "THIS_IS_IMPOSSIBLE" ) ) ) )

#ifdef CHECK_MPI_ERRORS
#define CHECK_MPI(rc) \
    do {                                                \
        if (rc!=MPI_SUCCESS) {                          \
            printf("MPI call failed.  Exiting. \n");    \
            exit(1);                                    \
        }                                               \
    } while (0) 
#else
#define CHECK_MPI(rc)
#endif

int main(int argc, char ** argv)
{
    int rc;

    /* These are the desired and available thread support.
       A hybrid code where all MPI calls are made from the main thread can used FUNNELED.
       If threads are making MPI calls, MULTIPLE is appropriate. */
    int requested = MPI_THREAD_FUNNELED, provided;

    /* MPICH2 will be substantially more efficient than OpenMPI 
       for MPI_THREAD_{FUNNELED,SERIALIZED} but this is unlikely
       to be a serious bottleneck. */
    rc = MPI_Init_thread(&argc, &argv, requested, &provided); CHECK_MPI(rc);
    if (provided<requested)
    {
        printf("MPI_Init_thread provided %s when %s was requested.  Exiting. \n",
               MPI_THREAD_STRING(provided), MPI_THREAD_STRING(requested) );
        exit(1);
    }

    int world_size, world_rank;

    rc = MPI_Comm_size(MPI_COMM_WORLD,&world_size); CHECK_MPI(rc);
    rc = MPI_Comm_rank(MPI_COMM_WORLD,&world_rank); CHECK_MPI(rc);

    int root = 0, count = 1;
    int max, min, sum;

    rc = MPI_Reduce(&world_rank, &min, count, MPI_INT, MPI_MIN, root, MPI_COMM_WORLD); CHECK_MPI(rc);
    rc = MPI_Reduce(&world_rank, &max, count, MPI_INT, MPI_MAX, root, MPI_COMM_WORLD); CHECK_MPI(rc);
    rc = MPI_Reduce(&world_rank, &sum, count, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD); CHECK_MPI(rc);

    if (world_rank==0)
        printf("%d: min = %d, max = %d, sum = %d \n", world_rank, min, max, sum);

    MPI_Finalize();
    return 0;
}


montecarlo.c

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>

#define MPI_THREAD_STRING(level)  \
    ( level==MPI_THREAD_SERIALIZED ? "THREAD_SERIALIZED" : \
        ( level==MPI_THREAD_MULTIPLE ? "THREAD_MULTIPLE" : \
            ( level==MPI_THREAD_FUNNELED ? "THREAD_FUNNELED" : \
                ( level==MPI_THREAD_SINGLE ? "THREAD_SINGLE" : "THIS_IS_IMPOSSIBLE" ) ) ) )

#ifdef CHECK_ERRORS
#define CHECK_MPI(rc) \
    do {                                             \
        if (rc!=MPI_SUCCESS) {                       \
            printf("MPI call failed.  Exiting. \n"); \
            exit(1);                                 \
        }                                            \
    } while (0) 
#else
#define CHECK_MPI(rc)
#endif

int main(int argc, char ** argv)
{
    int rc;

    /* These are the desired and available thread support.
       A hybrid code where all MPI calls are made from the main thread can used FUNNELED.
       If threads are making MPI calls, MULTIPLE is appropriate. */
    int requested = MPI_THREAD_FUNNELED, provided;

    /* MPICH2 will be substantially more efficient than OpenMPI 
       for MPI_THREAD_{FUNNELED,SERIALIZED} but this is unlikely
       to be a serious bottleneck. */
    rc = MPI_Init_thread(&argc, &argv, requested, &provided); CHECK_MPI(rc);
    if (provided<requested)
    {
        printf("MPI_Init_thread provided %s when %s was requested.  Exiting. \n",
               MPI_THREAD_STRING(provided), MPI_THREAD_STRING(requested) );
        exit(1);
    }

    int world_size, world_rank;

    rc = MPI_Comm_size(MPI_COMM_WORLD,&world_size); CHECK_MPI(rc);
    rc = MPI_Comm_rank(MPI_COMM_WORLD,&world_rank); CHECK_MPI(rc);

    int root = 0, count = 1;

    /* the ternary is often branchless... */
    long i, n = (argc>1 ? atol(argv[1]) : 100000);
    rc = MPI_Bcast(&n, count, MPI_LONG, root, MPI_COMM_WORLD); CHECK_MPI(rc);
    if (world_rank==0)
        printf("%d: using %ld samples.\n", world_rank, world_size*n);

    /* seed the RNG with something unique to a rank */
    srand(world_rank);

    long in = 0, total = 0;
    for (i=0;i<n;i++)
    {
        register double x = (double)rand()/(double)RAND_MAX;
        register double y = (double)rand()/(double)RAND_MAX;
        register double z = x*x + y*y;
        if (z<1.0) in++;
    }

    rc = MPI_Reduce(&in, &total, count, MPI_LONG, MPI_SUM, root, MPI_COMM_WORLD); CHECK_MPI(rc);
    double pi = 4.0*(double)total/(world_size*n);
    if (world_rank==0)
        printf("%d: pi = %12.8lf.\n", world_rank, pi);

    MPI_Finalize();
    return 0;
}

Hybrid MPI

See Challenges for Interoperability of Runtime Systems in Scientific Applications for some commentary on using MPI and threads, among other things.

MPI and threads

MPI and OpenMP

The most common usage of OpenMP is fork-join, in which case MPI_THREAD_FUNNELED is sufficient.

Example Code

Use the makefile provided above if necessary.

#include <stdio.h>
#include <stdlib.h>

#ifdef _OPENMP
#include <omp.h>
#else
#warning Your compiler does not support OpenMP, at least with the flags you're using.
#endif

#include <mpi.h>

#define MPI_THREAD_STRING(level)  \
        ( level==MPI_THREAD_SERIALIZED ? "THREAD_SERIALIZED" : \
                ( level==MPI_THREAD_MULTIPLE ? "THREAD_MULTIPLE" : \
                        ( level==MPI_THREAD_FUNNELED ? "THREAD_FUNNELED" : \
                                ( level==MPI_THREAD_SINGLE ? "THREAD_SINGLE" : "THIS_IS_IMPOSSIBLE" ) ) ) )

int main(int argc, char ** argv)
{
    /* These are the desired and available thread support.
       A hybrid code where all MPI calls are made from the main thread can used FUNNELED.
       If threads are making MPI calls, MULTIPLE is appropriate. */
    int requested = MPI_THREAD_FUNNELED, provided;

    /* MPICH2 will be substantially more efficient than OpenMPI 
       for MPI_THREAD_{FUNNELED,SERIALIZED} but this is unlikely
       to be a serious bottleneck. */
    MPI_Init_thread(&argc, &argv, requested, &provided);
    if (provided<requested)
    {
        printf("MPI_Init_thread provided %s when %s was requested.  Exiting. \n",
               MPI_THREAD_STRING(provided), MPI_THREAD_STRING(requested) );
        exit(1);
    }

    int world_size, world_rank;

    MPI_Comm_size(MPI_COMM_WORLD,&world_size);
    MPI_Comm_rank(MPI_COMM_WORLD,&world_rank);

    printf("Hello from %d of %d processors\n", world_rank, world_size);

#ifdef _OPENMP
    #pragma omp parallel
    {
        int omp_id  = omp_get_thread_num();
        int omp_num = omp_get_num_threads();
        printf("MPI rank = %2d OpenMP thread = %2d of %2d \n", world_rank, omp_id, omp_num);
        fflush(stdout);
    }
#else
    printf("MPI rank = %2d \n", world_rank);
    fflush(stdout);
#endif

    MPI_Finalize();
    return 0;
}

MPI and Pthreads

If multiple threads make MPI calls, MPI_THREAD_SERIALIZED or MPI_THREAD_MULTIPLE is required, the former if the user code implements mutual exclusion between different thread's access to MPI and the latter if the MPI implementation is expected to do this.

Example code

This is a simple example that prints out information about MPI, Pthreads and OpenMP used together.

You should submit this job with different values of the environment variables POSIX_NUM_THREADS and OMP_NUM_THREADS. On Blue Gene/Q, this test will output where the threads are executing. On other systems, the hardware affinity information is null.

Makefile

CC      = mpicc
COPT    = -g -O2 -std=gnu99 -fopenmp
LABEL   = gnu

LD      = $(CC)
CFLAGS  = $(COPT)
LDFLAGS = $(COPT) bgq_threadid.o -lm -lpthread

all: mpi_omp_pthreads.x

%.$(LABEL).x: %.o bgq_threadid.o
	$(LD) $(LDFLAGS) $< -o $@

%.o: %.c
	$(CC) $(CFLAGS) -c $< -o $@

clean:
	$(RM) $(RMFLAGS) *.o *.lst 

realclean: clean
	$(RM) $(RMFLAGS) *.x

bgq_threadid.c

#ifdef __bgq__
#include </bgsys/drivers/ppcfloor/spi/include/kernel/location.h>
#endif

/*=======================================*/
/* routine to return the BGQ core number */
/*=======================================*/
int get_bgq_core(void)
{
#ifdef __bgq__
    //int core = Kernel_PhysicalProcessorID();
    int core = Kernel_ProcessorCoreID();
    return core;
#else
    return -1;
#endif
}

/*==========================================*/
/* routine to return the BGQ hwthread (0-3) */
/*==========================================*/
int get_bgq_hwthread(void)
{
#ifdef __bgq__
    //int hwthread = Kernel_PhysicalHWThreadID();
    int hwthread = Kernel_ProcessorThreadID();
    return hwthread;
#else
    return -1;
#endif
}

/*======================================================*/
/* routine to return the BGQ virtual core number (0-67) */
/*======================================================*/
int get_bgq_vcore(void)
{
#ifdef __bgq__
    int hwthread = Kernel_ProcessorID();
    return hwthread;
#else
    return -1;
#endif
}

mpi_omp_pthreads.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <pthread.h>
#include <omp.h>
#include <mpi.h>

/* this is to ensure that the threads overlap in time */
#define NAPTIME 3

#define MAX_POSIX_THREADS 64

static pthread_t thread_pool[MAX_POSIX_THREADS];

static int mpi_size, mpi_rank;
static int num_posix_threads;

void* foo(void* dummy)
{
    int i, my_pth = -1;
    pthread_t my_pthread = pthread_self();

    for (i=0 ; i<num_posix_threads ; i++)
        if (my_pthread==thread_pool[i]) my_pth = i;
    
    sleep(NAPTIME);

    int my_core = -1, my_hwth = -1;
    int my_omp, num_omp;
    #pragma omp parallel private(my_core,my_hwth,my_omp,num_omp) shared(my_pth)
    {
        sleep(NAPTIME);

        my_core = get_bgq_core();
        my_hwth = get_bgq_hwthread();

        my_omp  = omp_get_thread_num();
        num_omp = omp_get_num_threads();
        fprintf(stdout,"MPI rank = %2d Pthread = %2d OpenMP thread = %2d of %2d core = %2d:%1d \n",
                       mpi_rank, my_pth, my_omp, num_omp, my_core, my_hwth);
        fflush(stdout);

        sleep(NAPTIME);
    }

    sleep(NAPTIME);

    pthread_exit(0);
}

void bar()
{
    sleep(NAPTIME);

    int my_core = -1, my_hwth = -1;
    int my_omp, num_omp;
    #pragma omp parallel private(my_core,my_hwth,my_omp,num_omp)
    {
        sleep(NAPTIME);

        my_core = get_bgq_core();
        my_hwth = get_bgq_hwthread();

        my_omp  = omp_get_thread_num();
        num_omp = omp_get_num_threads();
        fprintf(stdout,"MPI rank = %2d OpenMP thread = %2d of %2d core = %2d:%1d \n",
                       mpi_rank, my_omp, num_omp, my_core, my_hwth);
        fflush(stdout);

        sleep(NAPTIME);
    }
    sleep(NAPTIME);
}

int main(int argc, char *argv[])
{
    int i, rc;
    int provided;
 
    MPI_Init_thread(&argc,&argv,MPI_THREAD_MULTIPLE,&provided);
    if ( provided != MPI_THREAD_MULTIPLE ) exit(1);
 
    MPI_Comm_size(MPI_COMM_WORLD,&mpi_size);
    MPI_Comm_rank(MPI_COMM_WORLD,&mpi_rank);
 
    MPI_Barrier(MPI_COMM_WORLD);
 
    sleep(NAPTIME);

#ifdef __bgq__
    int bg_threadlayout = atoi(getenv("BG_THREADLAYOUT"));
    if (mpi_rank==0) fprintf(stdout,"BG_THREADLAYOUT = %2d\n", bg_threadlayout);
#endif

    num_posix_threads = atoi(getenv("POSIX_NUM_THREADS"));
    if (num_posix_threads<0)                 num_posix_threads = 0;
    if (num_posix_threads>MAX_POSIX_THREADS) num_posix_threads = MAX_POSIX_THREADS;

    if (mpi_rank==0) fprintf(stdout,"POSIX_NUM_THREADS = %2d\n", num_posix_threads);
    if (mpi_rank==0) fprintf(stdout,"OMP_MAX_NUM_THREADS = %2d\n", omp_get_max_threads());
    fflush(stdout);

    if ( num_posix_threads > 0 ) {
        //fprintf(stdout,"MPI rank %2d creating %2d POSIX threads\n", mpi_rank, num_posix_threads); fflush(stdout);
        for (i=0 ; i<num_posix_threads ; i++){
            rc = pthread_create(&thread_pool[i], NULL, foo, NULL);
            assert(rc==0);
        }

        MPI_Barrier(MPI_COMM_WORLD);

        sleep(NAPTIME);

        for (i=0 ; i<num_posix_threads ; i++){
            rc = pthread_join(thread_pool[i],NULL);
            assert(rc==0);
        }
        //fprintf(stdout,"MPI rank %2d joined %2d POSIX threads\n", mpi_rank, num_posix_threads); fflush(stdout);
    } else {
        bar();
    }

    MPI_Barrier(MPI_COMM_WORLD);

    sleep(NAPTIME);

    MPI_Finalize();

    return 0;
}

MPI and PGAS languages

The challenge of interoperability between MPI and PGAS languages such as CAF and UPC is the subject of ongoing research.

One challenge is the mapping between MPI processes, which are not required to be OS processes but are in most implementations, and CAF images, UPC threads or SHMEM processing elements, respectively. The canonical usage is for a 1-to-1 mapping, although Dinan and coworkers have discussed alternative models for MPI-UPC interoperability in their recent paper (need citation).

Intermediate MPI

TODO: Subcommunicators...

TODO: Datatypes...

One-sided MPI

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <assert.h>
#include <string.h>
#include <math.h>
#include <time.h>
#include <mpi.h>

int main(int argc, char **argv)
{
    int provided;
    MPI_Init_thread(&argc, &argv, MPI_THREAD_SINGLE, &provided);
    assert(provided==MPI_THREAD_SINGLE);

    int me;
    int nproc;
    MPI_Comm_rank(MPI_COMM_WORLD,&me);
    MPI_Comm_size(MPI_COMM_WORLD,&nproc);

    int status;
    double t0,t1,t2,t3,t4,t5;
    double tt0,tt1,tt2,tt3,tt4;

    int bufSize = ( argc>1 ? atoi(argv[1]) : 1000000 );
    if (me==0) printf("%d: bufSize = %d doubles\n",me,bufSize);

    /* allocate RMA buffers for windows */
    double* m1;
    double* m2;
    status = MPI_Alloc_mem(bufSize * sizeof(double), MPI_INFO_NULL, &m1);
    status = MPI_Alloc_mem(bufSize * sizeof(double), MPI_INFO_NULL, &m2);

    /* register remote pointers */
    MPI_Win w1;
    MPI_Win w2;
    status = MPI_Win_create(m1, bufSize * sizeof(double), sizeof(double), MPI_INFO_NULL, MPI_COMM_WORLD, &w1);
    status = MPI_Win_create(m2, bufSize * sizeof(double), sizeof(double), MPI_INFO_NULL, MPI_COMM_WORLD, &w2);
    MPI_Barrier(MPI_COMM_WORLD);

    /* allocate RMA buffers */
    double* b1;
    double* b2;
    status = MPI_Alloc_mem(bufSize * sizeof(double), MPI_INFO_NULL, &b1);
    status = MPI_Alloc_mem(bufSize * sizeof(double), MPI_INFO_NULL, &b2);

    /* initialize buffers */
    int i;
    for (i=0;i<bufSize;i++) b1[i]=1.0*me;
    for (i=0;i<bufSize;i++) b2[i]=-1.0;

    status = MPI_Win_fence( MPI_MODE_NOPRECEDE | MPI_MODE_NOSTORE , w1 );
    status = MPI_Win_fence( MPI_MODE_NOPRECEDE | MPI_MODE_NOSTORE , w2);
    status = MPI_Put(b1, bufSize, MPI_DOUBLE, me, 0, bufSize, MPI_DOUBLE, w1);
    status = MPI_Put(b2, bufSize, MPI_DOUBLE, me, 0, bufSize, MPI_DOUBLE, w2);
    status = MPI_Win_fence( MPI_MODE_NOSTORE , w1);
    status = MPI_Win_fence( MPI_MODE_NOSTORE , w2);

    int target;
    int j;
    double dt,bw;
    MPI_Barrier(MPI_COMM_WORLD);
    if (me==0){
        printf("MPI_Get performance test for buffer size = %d doubles\n",bufSize);
        printf("  jump    host   target       get (s)       BW (MB/s)\n");
        printf("===========================================================\n");
        fflush(stdout);
    }
    MPI_Barrier(MPI_COMM_WORLD);
    for (j=0;j<nproc;j++){
        target = (me+j) % nproc;
        MPI_Barrier(MPI_COMM_WORLD);
        t0 = MPI_Wtime();
        status = MPI_Win_lock(MPI_LOCK_EXCLUSIVE, target, MPI_MODE_NOCHECK, w1);
        t1 = MPI_Wtime();
        status = MPI_Get(b2, bufSize, MPI_DOUBLE, target, 0, bufSize, MPI_DOUBLE, w1);
        t2 = MPI_Wtime();
        status = MPI_Win_unlock(target, w1);
        t3 = MPI_Wtime();
        for (i=0;i<bufSize;i++) assert( b2[i]==(1.0*target) );
        dt = t3 - t0;
        bw = (double)bufSize*sizeof(double)*(1e-6)/dt;
        printf("%4d     %4d     %4d       %9.6f     %9.3f\n",j,me,target,dt,bw);
        fflush(stdout);
    }
    MPI_Barrier(MPI_COMM_WORLD);

    status = MPI_Win_free(&w2);
    status = MPI_Win_free(&w1);

    status = MPI_Free_mem(b2);
    status = MPI_Free_mem(b1);

    status = MPI_Free_mem(m2);
    status = MPI_Free_mem(m1);

    MPI_Barrier(MPI_COMM_WORLD);

    if (me==0) printf("%d: MPI_Finalize\n",me);
    MPI_Finalize();

    return(0);
}

Advanced MPI

TODO: Generalized requests...

Implementing active-message using MPI and Pthreads

This is a toy version of ARMCI to demonstrate active-message capability using MPI and Pthreads. Obviously, one can statically define new remote methods by extending the enum and case-switch statement. A more flexible approach is to implement collective registration of handlers at runtime. If one can assume that function pointers are symmetric across all processes running this program, then this registration need not be collective.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <limits.h>
#include <pthread.h>

#include <mpi.h>

#include "safemalloc.h"

//#define DEBUG

typedef enum
{
    MSG_GET,
    MSG_PUT,
    MSG_ACC,
    MSG_RMW,
    MSG_FENCE,
    MSG_CHT_EXIT
} 
msg_type_e;

typedef enum
{
    MSG_INFO_TAG,
    MSG_FENCE_TAG, 
    MSG_GET_TAG,
    MSG_PUT_TAG,
    MSG_ACC_TAG,
    MSG_RMW_TAG
} 
msg_tag_e;

MPI_Comm MSG_COMM_WORLD;
 
typedef struct
{
    int          count; 
    MPI_Datatype dt;
    MPI_Op       op; /* only used for MSG_ACC */
}
msg_rma_info_t;

typedef union
{
    int           si;
    unsigned int  ui;
    long          sl;
    unsigned long ul;
}
msg_rmw_data_t;

typedef struct
{
    int            rmw_op;
    int            rmw_type;
    msg_rmw_data_t rmw_data;
}
msg_rmw_info_t;

typedef struct
{
    msg_type_e   type;
    void *       address;
    int          count; 
    MPI_Datatype dt;
    MPI_Op       op; /* only used for MSG_ACC */
}
msg_info_t;

typedef struct
{
    MPI_Comm comm;
    void ** base;
    void *  my_base;
}
msg_window_t;

void Poll(void)
{
    int rank;
    int type_size;

    msg_info_t info;
    MPI_Status status;
    MPI_Recv(&info, sizeof(msg_info_t), MPI_BYTE, MPI_ANY_SOURCE, MSG_INFO_TAG, MSG_COMM_WORLD, &status);
    int source = status.MPI_SOURCE;

    switch (info.type)
    {
        case MSG_FENCE:
#ifdef DEBUG
            printf("MSG_FENCE \n");
#endif
            MPI_Send(NULL, 0, MPI_BYTE, source, MSG_FENCE_TAG, MSG_COMM_WORLD);
            break;

        case MSG_GET:
#ifdef DEBUG
            printf("MSG_GET \n");
#endif
            MPI_Send(info.address, info.count, info.dt, source, MSG_GET_TAG, MSG_COMM_WORLD);
            break;

        case MSG_PUT:
#ifdef DEBUG
            printf("MSG_PUT \n");
#endif
            MPI_Recv(info.address, info.count, info.dt, source, MSG_PUT_TAG, MSG_COMM_WORLD, MPI_STATUS_IGNORE);
            break;

        case MSG_ACC: /* TODO: need to do pipelining to limit buffering */
#ifdef DEBUG
            printf("MSG_ACC \n");
#endif
            MPI_Type_size(info.dt, &type_size);
            void * temp = safemalloc(info.count*type_size);
            MPI_Recv(temp, info.count, info.dt, source, MSG_ACC_TAG, MSG_COMM_WORLD, MPI_STATUS_IGNORE);
            MPI_Reduce_local(temp, info.address, info.count, info.dt, info.op);
            free(temp);
            break;

        case MSG_RMW:
#ifdef DEBUG
            printf("MSG_RMW \n");
#endif
            MPI_Comm_rank(MSG_COMM_WORLD, &rank);
            fprintf(stderr, "%d: MSG_RMW not supported yet \n", rank);
            MPI_Abort(MSG_COMM_WORLD, 1);
            break;

        case MSG_CHT_EXIT:
#ifdef DEBUG
            printf("MSG_CHT_EXIT \n");
#endif
            MPI_Comm_rank(MSG_COMM_WORLD, &rank);
            if (rank!=source)
            {
                fprintf(stderr, "%d: CHT received EXIT signal from rank %d \n", rank, source);
                MPI_Abort(MSG_COMM_WORLD, 1);
            }
            pthread_exit(NULL);
            break;

        default:
            MPI_Comm_rank(MSG_COMM_WORLD, &rank);
            fprintf(stderr, "%d: CHT received invalid MSG TAG (%d) \n", rank, info.type);
            MPI_Abort(MSG_COMM_WORLD, 1);
            break;
    }
    return;
}

static void * Progress_function(void * dummy)
{
	while (1) {
        	Poll();
		//usleep(500);
	}

	return NULL;
}

void MSG_CHT_Exit(void)
{
    int rank;
    MPI_Comm_rank(MSG_COMM_WORLD, &rank);

    msg_info_t info;
    info.type = MSG_CHT_EXIT;

    MPI_Ssend(&info, sizeof(msg_info_t), MPI_BYTE, rank, MSG_INFO_TAG, MSG_COMM_WORLD);

    return;
}

void MSG_Win_fence(int target)
{
    msg_info_t info;
    info.type = MSG_FENCE;

    MPI_Send(&info, sizeof(msg_info_t), MPI_BYTE, target, MSG_INFO_TAG, MSG_COMM_WORLD);
    MPI_Recv(NULL, 0, MPI_BYTE, target, MSG_FENCE_TAG, MSG_COMM_WORLD, MPI_STATUS_IGNORE);

    return;
}

void MSG_Win_get(int target, msg_window_t * win, size_t offset, int count, MPI_Datatype type, void * buffer)
{
    msg_info_t info;

    info.type     = MSG_GET;
    info.address  = win->base[target]+offset;
    info.count    = count;
    info.dt       = type;

#ifdef DEBUG
    printf("MSG_Win_get win->base[%d]=%p address=%p count=%d\n", target, win->base[target], info.address, info.count);
    fflush(stdout);
#endif

    MPI_Send(&info, sizeof(msg_info_t), MPI_BYTE, target, MSG_INFO_TAG, MSG_COMM_WORLD);
    MPI_Recv(buffer, info.count, info.dt, target, MSG_GET_TAG, MSG_COMM_WORLD, MPI_STATUS_IGNORE);

    return;
}

void MSG_Win_put(int target, msg_window_t * win, size_t offset, int count, MPI_Datatype type, void * buffer)
{
    msg_info_t info;

    info.type     = MSG_PUT;
    info.address  = win->base[target]+offset;
    info.count    = count; 
    info.dt       = type;

#ifdef DEBUG
    printf("MSG_Win_put win->base[%d]=%p address=%p count=%d\n", target, win->base[target], info.address, info.count);
    fflush(stdout);
#endif

    MPI_Send(&info, sizeof(msg_info_t), MPI_BYTE, target, MSG_INFO_TAG, MSG_COMM_WORLD);
    MPI_Send(buffer, info.count, info.dt, target, MSG_PUT_TAG, MSG_COMM_WORLD);

    return;
}

void MSG_Win_acc(int target, msg_window_t * win, size_t offset, int count, MPI_Datatype type, MPI_Op op, void * buffer)
{
    msg_info_t info;

    info.type     = MSG_ACC;
    info.address  = win->base[target]+offset;
    info.count    = count; 
    info.dt       = type;
    info.op       = op;

#ifdef DEBUG
    printf("MSG_Win_acc win->base[%d]=%p address=%p count=%d type=%d\n", target, win->base[target], info.address, info.count, info.dt);
    fflush(stdout);
#endif

    MPI_Send(&info, sizeof(msg_info_t), MPI_BYTE, target, MSG_INFO_TAG, MSG_COMM_WORLD);
    MPI_Send(buffer, info.count, info.dt, target, MSG_ACC_TAG, MSG_COMM_WORLD);

    return;
}

void MSG_Win_allocate(MPI_Comm comm, int bytes, msg_window_t * win)
{
    MPI_Comm_dup(comm, &(win->comm));

    int size;
    MPI_Comm_size(win->comm, &size);

    win->base    = safemalloc( size * sizeof(void *) );
    win->my_base = safemalloc(bytes);

    MPI_Allgather(&(win->my_base), sizeof(void *), MPI_BYTE, win->base, sizeof(void *), MPI_BYTE, comm);

#ifdef DEBUG
    int rank;
    MPI_Comm_rank(win->comm, &rank);

    printf("%d: win->base = %p \n", rank, win->base);
    printf("%d: my_base = %p \n", rank, win->my_base);
    for (int i=0; i<size; i++)
        printf("%d: win->base[%d] = %p \n", rank, i, win->base[i]);

    printf("MSG_Win_allocate finished\n");
    fflush(stdout);
#endif

    return;
}

void MSG_Win_deallocate(msg_window_t * win)
{
    MPI_Barrier(win->comm);

#ifdef DEBUG
    int rank;
    MPI_Comm_rank(win->comm, &rank);

    printf("%d: win->base = %p \n", rank, win->base);
    printf("%d: win->my_base = %p \n", rank, win->my_base);
    printf("%d: win->base[%d] = %p \n", rank, rank, win->base[rank]);
#endif

    free(win->my_base);
    free(win->base);

    MPI_Comm_free(&(win->comm));

#ifdef DEBUG
    printf("MSG_Win_deallocate finished\n");
    fflush(stdout);
#endif

    return;   
}

int main(int argc, char * argv[])
{
    int rc;
    int rank, size;
    int provided;
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
    if (provided!=MPI_THREAD_MULTIPLE) 
        MPI_Abort(MPI_COMM_WORLD, 1);

    MPI_Comm_dup(MPI_COMM_WORLD, &MSG_COMM_WORLD);

    MPI_Comm_rank(MSG_COMM_WORLD, &rank);
    MPI_Comm_size(MSG_COMM_WORLD, &size);

    pthread_t Progress_thread;
    rc = pthread_create(&Progress_thread, NULL, &Progress_function, NULL);
    if (rc!=0) 
        MPI_Abort(MSG_COMM_WORLD, rc);

    MPI_Barrier(MSG_COMM_WORLD);

    int target = size>1 ? size-1 : 0;

    {
        int bigcount = 1024*1024;

        msg_window_t win;
        MSG_Win_allocate(MSG_COMM_WORLD, bigcount, &win);

        if (rank==0)
        {
            int smallcount = 1024;

            void * in = safemalloc(smallcount);
            void * out = safemalloc(smallcount);
            
            memset(in, '\1', smallcount);
            memset(out, '\0', smallcount);

            MSG_Win_put(target, &win, 0, smallcount, MPI_BYTE, in);
            MSG_Win_fence(target);
            MSG_Win_get(target, &win, 0, smallcount, MPI_BYTE, out);

            int rc = memcmp(in, out, smallcount);

            if (rc!=0)
                printf("FAIL! \n");
            else
                printf("WIN! \n");
            fflush(stdout);

            free(out);
            free(in);
        }

        MSG_Win_deallocate(&win);
    }

    {
        int bigcount = 1024*1024;

        MPI_Op       op   = MPI_SUM;
        MPI_Datatype type = MPI_DOUBLE;

        int type_size;
        MPI_Type_size(type, &type_size);
        
        msg_window_t win;
        MSG_Win_allocate(MSG_COMM_WORLD, bigcount*type_size, &win);

        if (rank==0)
        {
            int smallcount = 1024;

            double * in = safemalloc(smallcount*type_size);
            double * out = safemalloc(smallcount*type_size);
            
            for (int i=0; i<smallcount; i++)
                in[i] = 0.0;

            printf("MSG_Win_put \n");
            fflush(stdout);
            MSG_Win_put(target, &win, 0, smallcount, type, in);
            MSG_Win_fence(target);

            for (int i=0; i<smallcount; i++)
                in[i] = 12.0;

            printf("MSG_Win_acc \n");
            fflush(stdout);
            MSG_Win_acc(target, &win, 0, smallcount, type, op, in);
            MSG_Win_fence(target);
            MSG_Win_acc(target, &win, 0, smallcount, type, op, in);
            MSG_Win_fence(target);
            MSG_Win_acc(target, &win, 0, smallcount, type, op, in);
            MSG_Win_fence(target);

            printf("MSG_Win_get \n");
            fflush(stdout);
            MSG_Win_get(target, &win, 0, smallcount, type, out);

            int errors = 0;
            for (int i=0; i<smallcount; i++)
#if defined(DEBUG) && 0
                printf("%d: out[%d] = %lf  in[%d] = %lf \n", rank, i, out[i], i, in[i]);
#else
                errors += (int)(out[i] != 3*in[i]);
#endif

            if (errors>0)
                printf("FAIL! \n");
            else
                printf("WIN! \n");
            fflush(stdout);

            free(out);
            free(in);
        }

        MSG_Win_deallocate(&win);
    }

    MPI_Barrier(MSG_COMM_WORLD);

    MSG_CHT_Exit();

    void * rv;
    rc = pthread_join(Progress_thread, &rv);
    if (rc!=0) 
        MPI_Abort(MSG_COMM_WORLD, rc);

    printf("all done \n");
    fflush(stdout);

    MPI_Comm_free(&MSG_COMM_WORLD);

    MPI_Finalize();

    return 0;
} 

MPI and C++

https://svn.mpi-forum.org/trac/mpi-forum-web/ticket/340 fixes important issues with the use of C++ datatypes in MPI.

See http://blogs.cisco.com/performance/the-mpi-c-bindings-what-happened-and-why/ and the follow-up for details on what has changed in MPI-3 with respect to C++ support in MPI.

Performance Considerations

See https://github.com/jeffhammond/HPCInfo/wiki/MPI#Performance_Considerations

Implementation Details

Blue Gene

Some of my presentations for the ALCF Getting Started Workshops are relevant here.