A functional map is a function that applies a callback function to each element in an array and returns a list of callback return values. For example, in pseudocode, map(["hello", "world"], fn(x) => x + " meow")
would return ["hello meow", "world meow"]
Since function pointers can be passed as parameters in C, it is possible to implement a functional map like below:
void** fp_map(void** array, size_t len, void* (*execute)(void*))
{
// Allocate memory for return items
void** returns = malloc(sizeof(void*) * len);
if (returns == NULL) err(42, "Malloc failed, buy more ram");
// Map values
for (int i = 0; i < len; ++i)
returns[i] = execute(array[i]);
return returns;
}
If I write the following anonymous function in my main method, it would map ["hello", "world"]
to ["hello meow", "world meow"]
:
int main() {
char* arr[] = {"hello", "world"};
char** arr2 = fp_map((void**) arr, 2, ({ void* _func_ (void* x) {
char* buf = malloc(sizeof(char) * (strlen(x) + 7));
strcpy(buf, x);
strcat(buf, " meow");
return buf;
}; _func_; }));
for (int i = 0; i < 3; ++i)
printf("%s\n", arr2[i]);
}
Now, I want to implement a parallel map to speed things up. Since this is purely functional, calls to the callback function with the same parameters would return the same return values. How can I use multithreading so that each call to execute()
runs on a different thread, but still have the results return in an ordered array?
I have written the following code, in which I create a context for the thread, then for every calculation I spawn a separate thread. Join all the threads and return the value.
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <threads.h>
#define ERRORON(expr) \
do { \
if (expr) { \
fprintf(stderr, "ERROR: %s\n", #expr); \
exit(1); \
} \
} while (0)
#define ARRLEN(x) (sizeof(x) / sizeof(*x))
struct mythread_context {
void **returns;
void *(*execute)(void *);
void **array;
size_t i;
};
int mythread(void *arg) {
const struct mythread_context *ctx = arg;
// Execute the stuff to execute.
ctx->returns[ctx->i] = ctx->execute(ctx->array[ctx->i]);
return 0;
}
void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
// Allocate memory for return items
void **returns = malloc(sizeof(*returns) * len);
ERRORON(!returns);
// Allocate memory for threads and contextes.
thrd_t *threads = malloc(sizeof(*threads) * len);
ERRORON(!threads);
struct mythread_context *ctxs = malloc(sizeof(*ctxs) * len);
ERRORON(!ctxs);
for (size_t i = 0; i < len; ++i) {
const struct mythread_context thisctx = {
.returns = returns,
.execute = execute,
.array = array,
.i = i,
};
ctxs[i] = thisctx;
// Start a thread for every returns, execute and array index.
int ret = thrd_create(&threads[i], mythread, &ctxs[i]);
ERRORON(ret != thrd_success);
}
for (size_t i = 0; i < len; ++i) {
// Join all threads. They will assing to returns separately concurrently.
int ret = thrd_join(threads[i], NULL);
ERRORON(ret != thrd_success);
}
free(threads);
free(ctxs);
return returns;
}
void *appnend_to_char(void *x) {
char *buf = malloc(sizeof(char) * (strlen(x) + 7));
strcpy(buf, x);
strcat(buf, " meow");
return buf;
}
int main() {
const char *arr[] = {"hello", "world"};
char **arr2 = (char **)fp_map((void **)arr, ARRLEN(arr), appnend_to_char);
for (size_t i = 0; i < ARRLEN(arr); ++i) {
printf("%s\n", arr2[i]);
}
// free memory
for (size_t i = 0; i < ARRLEN(arr); ++i) {
free(arr2[i]);
}
free(arr2);
}
Alternatively, you can just seamlessly integrate with OpenMP, with just:
void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
void **returns = malloc(sizeof(*returns) * len);
ERRORON(!returns);
size_t i;
#pragma omp parallel for
for (size_t i = 0; i < len; ++i) {
returns[i] = execute(array[i]);
}
return returns;
}
Notes:
({
is a GCC extension, not part of C language. There are no lambdas or anonymous functions in C programming langauge.malloc
coudl also be optimized.