Tutorial

This is a set of small tutorials on how to convert pthreads applications to run on ArgoDSM. We will start with a very simple example, and then move to real pthreads applications.

We assume that you have already compiled the ArgoDSM libraries and that you have installed them in a user local directory. We will also assume knowledge of the Pthreads library and programming model, as well as very basic knowledge of MPI. If you are not familiar with either, you might find this tutorial confusing.

The Simple Example

This example is meant as an introduction to ArgoDSM and its differences with non-distributed pthreads applications. It is written in C++. For brevity, we assume that the input data is evenly divisible by the number of threads and that the number of threads is evenly divisible by the number of nodes. We also omit error checking code.

Just Pthreads

#include <cassert>
#include <limits>
#include <iostream>
#include <vector>

#include <pthread.h>

struct thread_args {
	int data_begin;
	int data_end;
};

int *data;
int max = std::numeric_limits<int>::min();

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

void* parmax(void* argptr) {
	// Get the arguments
	thread_args* args = static_cast<thread_args*>(argptr);

	// Let's declutter the code a bit, while also avoiding unnecessary global
	// memory accesses
	int data_begin = args->data_begin;
	int data_end = args->data_end;

	// Find the local maximum
	int local_max = std::numeric_limits<int>::min();
	for (int i = data_begin; i < data_end; ++i) {
		if (data[i] > local_max) {
			local_max = data[i];
		}
	}

	// Change the global maximum (if necessary)
	pthread_mutex_lock(&lock);
	if (local_max > max)
		max = local_max;
	pthread_mutex_unlock(&lock);

	return nullptr;
}

int main(int argc, char* argv[]) {
	int data_length = 160000;
	int num_threads = 16;

	// Allocate the array
	data = new int[data_length];

	// Initialize the input data
	for (int i = 0; i < data_length; ++i)
		data[i] = i * 11 + 3;

	// Start the threads
	int chunk = data_length / num_threads;
	std::vector<pthread_t> threads(num_threads);
	std::vector<thread_args> args(num_threads);
	for (int i = 0; i < num_threads; ++i) {
		args[i].data_begin = i * chunk;
		args[i].data_end = args[i].data_begin + chunk;
		pthread_create(&threads[i], nullptr, parmax, &args[i]);
	}
	// Join the threads
	for (auto& t : threads)
		pthread_join(t, nullptr);

	delete[] data;
	// Print the result
	std::cout << "Max found to be " << max << std::endl;
	assert(max == ((data_length - 1) * 11 + 3));
}

The example finds the biggest number in an integer array. The astute reader might realize that the biggest number is actually always the same and it can be determined by a very simple calculation, but this is after all just an example. The array is initialized by the main thread and then the worker threads are started. Each worker threads receives as an argument a part of the array, in the form of [first, last) index. While the worker threads are working, the main thread is waiting for them to finish. After they are done, the main thread accesses the max variable, prints it, and makes sure the result is correct.

The worker threads work only on their local part of the array. After they find the local maximum, then they check and update the global maximum (max) variable. Since multiple threads might access this variable at the same time, it is protected by locks.

Scaling it up with ArgoDSM

The first step is adding the ArgoDSM initialization and finalization function. Specifically, argo::init needs to be run in main before any other functions are called, and argo::finalize should be the last ArgoDSM function called before exiting main. In the current version of ArgoDSM, argo::init has two optional arguments which indicate the amount of shared memory (in bytes) ArgoDSM should allocate and the desired local cache size, respectively. When the values are given to argo::init, they will be used. If one or both are omitted, then the value (in bytes) in the environment variable ARGO_MEMORY_SIZE (for the shared memory size) and ARGO_CACHE_SIZE (for the local cache size) will be used instead. If the environment variables are empty, some default numbers will be used. At the time of this writing they are 8GB and 1GB for shared memory and local cache size, respectively.

In addition to adding the ArgoDSM initialization and finalization functions, there are three main tasks that need to be done to convert a pthreads application to ArgoDSM.

  1. Remove data races.
  2. Allocate shared (ArgoDSM) data using the ArgoDSM allocators.
  3. Replace any Pthreads synchronization primitives with corresponding primitives provided by ArgoDSM.
  4. Take care of the changes introduced by the new multiple process model.

Removing data races (1) is unnecessary in this case, since the pthreads application is standard (both POSIX and C++) compliant, so it is already data race free.

In order to allocate all the shared data in the global memory (2), we need to make two changes. First, we need to switch any calls to new with calls to the ArgoDSM allocators. Specifically, we need to replace new int[data_length] with argo::conew_array<int>(data_length). This allocation function argo::conew_array is run on all the nodes, returning the same pointer to all of them, thus initializing the data variable in all of them. So this:

data = new int[data_length];

needs to become this:

data = argo::conew_array<int>(data_length);

Then, we need to move the max variable on the shared memory. ArgoDSM currently does not support mapping statically allocated variables into the global memory, so we will convert max into a pointer and allocate memory for it on the global memory. We will use the argo::conew_<int> function for this, giving as an initialization argument the same argument as we have in the pthreads example. This means changing this:

int max = std::numeric_limits<int>::min();

to this:

int *max;
... // Somewhere inside main
max = argo::conew_<int>(std::numeric_limits<int>::min());

Note that the ArgoDSM functions can only be used after argo::init has been called, so we need to move the allocation and initialization of max inside the main function.

In addition to the argo::conew_ family of functions, which needs to be called by all the nodes at the same time, there is also the argo::new_ family which behaves like normal new but allocates globally shared memory.

We can now move on to the synchronization primitives(3). For this example we will use the argo::globallock::global_tas_lock available in ArgoDSM, but the argo::simple_lock, which will be made available soon, should be used instead. The global_tas_lock requires as an argument a boolean variable to spin on, which of course has to be allocated on the global memory.

lock_field = argo::conew_<argo::globallock::global_tas_lock::internal_field_type>();
lock = new argo::globallock::global_tas_lock(lock_field);

Finally, we need to take into consideration the fact that ArgoDSM runs multiple processes (which also means multiple threads) from the beginning of the program execution (4). Essentially, each node has one process which has one main thread. For each of those processes, we want to initialize the non-shared global data individually (this is why we are using argo::conew_) but at the same time we want only one node to initialize the shared global data. If multiple nodes where to initialize the same data at the same time, we would have a data race. Usually, the simplest way to do that is to have Node 0 initialize all the data and then synchronize with all the other nodes. We achieve that by using an if statement in conjunction with argo::barrier. The same goes for any work done by the application after the threads have finished; we want to make sure that work that needs to be done by only one thread will be done by only one node.

if (argo::node_id() == 0) {
	for (int i = 0; i < data_length; ++i)
		data[i] = i * 11 + 3;
}
argo::barrier();

We also need to remember that each node starts its own threads, so we should either change how many threads each node starts or how we partition the shared data. In this example we decided to simple evenly split the number of threads between the available nodes.

local_num_threads = num_threads / argo::number_of_nodes();
...
int chunk = data_length / num_threads;
std::vector<pthread_t> threads(local_num_threads);
std::vector<thread_args> args(local_num_threads);
for (int i = 0; i < local_num_threads; ++i) {
	int global_tid = (argo::node_id() * local_num_threads) + i;
	args[i].data_begin = global_tid * chunk;
	args[i].data_end = args[i].data_begin + chunk;
	pthread_create(&threads[i], nullptr, parmax, &args[i]);
}

The final result of all the changes is this:

#include <cassert>
#include <limits>
#include <iostream>
#include <vector>

#include <pthread.h>

#include "argo.hpp"

struct thread_args {
	int data_begin;
	int data_end;
};

int *data;
int *max;

argo::globallock::global_tas_lock::internal_field_type *lock_field;
argo::globallock::global_tas_lock *lock;

void* parmax(void* argptr) {
	// Get the arguments
	thread_args* args = static_cast<thread_args*>(argptr);

	// Let's declutter the code a bit, while also avoiding unnecessary global
	// memory accesses
	int data_begin = args->data_begin;
	int data_end = args->data_end;

	// Find the local maximum
	int local_max = std::numeric_limits<int>::min();
	for (int i = data_begin; i < data_end; ++i) {
		if (data[i] > local_max) {
			local_max = data[i];
		}
	}

	// Change the global maximum (if necessary)
	lock->lock();
	if (local_max > *max)
		*max = local_max;
	lock->unlock();

	return nullptr;
}

int main(int argc, char* argv[]) {
	int data_length = 160000;
	int num_threads = 16;
	int local_num_threads;

	// We totally need 1GB for this application
	argo::init(1*1024*1024*1024UL);

	local_num_threads = num_threads / argo::number_of_nodes();

	// Initialize the lock
	lock_field = argo::conew_<argo::globallock::global_tas_lock::internal_field_type>();
	lock = new argo::globallock::global_tas_lock(lock_field);
	// Allocate the array
	data = argo::conew_array<int>(data_length);
	max = argo::conew_<int>(std::numeric_limits<int>::min());

	// Initialize the input data
	if (argo::node_id() == 0) {
		for (int i = 0; i < data_length; ++i)
			data[i] = i * 11 + 3;
	}
	
	// Make sure initialization is done and distribute the changes
	argo::barrier();

	// Start the threads
	int chunk = data_length / num_threads;
	std::vector<pthread_t> threads(local_num_threads);
	std::vector<thread_args> args(local_num_threads);
	for (int i = 0; i < local_num_threads; ++i) {
		int global_tid = (argo::node_id() * local_num_threads) + i;
		args[i].data_begin = global_tid * chunk;
		args[i].data_end = args[i].data_begin + chunk;
		pthread_create(&threads[i], nullptr, parmax, &args[i]);
	}
	// Join the threads
	for (auto &t : threads)
		pthread_join(t, nullptr);

	// Make sure everyone is done and get the changes
	argo::barrier();

	argo::codelete_array(data);
        delete lock;
	argo::codelete_(lock_field);
	// Print the result
	if (argo::node_id() == 0)
		std::cout << "Max found to be " << *max << std::endl;
	assert(*max == ((data_length - 1) * 11 + 3));

	argo::finalize();
}

We can now compile the application. We assume that ${ARGO_INSTALL_DIRECTORY} is where you installed ArgoDSM. If the ArgoDSM files are already in your LIBRARY_PATH, INCLUDE_PATH, and LD_LIBRARY_PATH, you can skip the -L..., -I..., and -Wl,-rpath,... switches. If you have no idea what we are talking about, you should ask your system administrator.

mpic++ -O3 -std=c++17 -o argo_example            \
       -L${ARGO_INSTALL_DIRECTORY}/lib           \
       -I${ARGO_INSTALL_DIRECTORY}/include/argo  \
       -Wl,-rpath,${ARGO_INSTALL_DIRECTORY}/lib  \
       argo_example.cpp -largo -largobackend-mpi

This should produce an executable file that can be run with MPI. For instructions on how to run MPI applications you should contact your local support office. A generic example with OpenMPI on a cluster utilizing Slurm might look like this:

mpirun --map-by ppr:1:node  \
       --mca pml ucx        \
       --mca osc ucx        \
       ./argo_example