#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <mpi.h>
#include <math.h>
#define MAX_TASKS 100
#define ITERATIONS_COUNT 100
#define L 2
#define REQUEST_TASK 228
#define STOP_CODE 229
#define TO_RECEIVER 1
#define SEND_WORK_TAG 2
int *tasks;
int size;
int rank;
int offset;
pthread_mutex_t mutex;
void fill_tasks(int iter_count) {
for(int i = 0; i < size * MAX_TASKS; i++){
tasks[i] = abs(50 - i % 100) * abs(rank - iter_count % size) * L;
}
}
int do_tasks() {
int local_res = 0;
while(1){
pthread_mutex_lock(&mutex);
if(offset != size * MAX_TASKS - 1){
pthread_mutex_unlock(&mutex);
break;
}
pthread_mutex_unlock(&mutex);
int weight = tasks[offset];
for(int j = 0; j < weight; j++){
local_res += (int)sqrt(j);
}
pthread_mutex_lock(&mutex);
offset++;
pthread_mutex_unlock(&mutex);
}
return local_res;
}
void request_tasks() {
for(size_t i = 0; i < size; i++){
if(i == rank) continue;
int req_code = REQUEST_TASK;
int help_length;
MPI_Send(&req_code, 1, MPI_INT, (int)i, TO_RECEIVER, MPI_COMM_WORLD);
MPI_Recv(&help_length, 1, MPI_INT, (int)i, SEND_WORK_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if(help_length > 0){
MPI_Recv(tasks, help_length, MPI_INT, (int)i, SEND_WORK_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
pthread_mutex_lock(&mutex);
offset = 0;
pthread_mutex_unlock(&mutex);
}
}
}
void worker_func(){
for(size_t i = 0; i < ITERATIONS_COUNT; i++){
pthread_mutex_lock(&mutex);
offset = 0;
fill_tasks((int)i);
pthread_mutex_unlock(&mutex);
int res = do_tasks();
request_tasks();
if(rank == 0){
printf("%d\n", res);
}
}
int stop_code = STOP_CODE;
MPI_Send(&stop_code, 1, MPI_INT, rank, TO_RECEIVER, MPI_COMM_WORLD);
}
void receiver_func(){
while(1){
int req_code_buf;
MPI_Status status_worker_requester;
MPI_Recv(&req_code_buf, 1, MPI_INT, MPI_ANY_SOURCE, TO_RECEIVER, MPI_COMM_WORLD, &status_worker_requester);
if(req_code_buf == STOP_CODE) break;
// new_offset = offset + size * (int)MAX_TASKS * 0.3
size_t length = size * MAX_TASKS;
int new_offset = offset + (int)((int)length * 0.3);
int tasks_length = new_offset - offset;
MPI_Send(&tasks_length, 1, MPI_INT, status_worker_requester.MPI_SOURCE, SEND_WORK_TAG, MPI_COMM_WORLD);
if(new_offset < length - 1){
int old_offset = offset;
pthread_mutex_lock(&mutex);
offset = new_offset;
pthread_mutex_unlock(&mutex);
MPI_Send(&tasks[old_offset], tasks_length, MPI_INT, status_worker_requester.MPI_SOURCE, SEND_WORK_TAG, MPI_COMM_WORLD);
}
}
}
int main(int argc, char* argv[]) {
MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, NULL);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
tasks = malloc(size * MAX_TASKS * sizeof(int));
pthread_t worker, receiver;
pthread_mutex_init(&mutex, NULL);
pthread_create(&worker, NULL, (void *(*)(void *)) worker_func, NULL);
pthread_create(&receiver, NULL, (void *(*)(void *)) receiver_func, NULL);
MPI_Finalize();
free(tasks);
return 0;
}
ошибка
Abort(469901317): Fatal error in internal_Recv: Invalid communicator, error stack:
internal_Recv(127): MPI_Recv(buf=0x7f070400cd9c, count=1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, status=0x7f070400cdb0) failed
internal_Recv(57).: Invalid communicator
Abort(402803205): Fatal error in internal_Send: Invalid communicator, error stack:
internal_Send(120): MPI_Send(buf=0x7f95a87d4d8c, count=1, MPI_INT, 1, 1, MPI_COMM_WORLD) failed
internal_Send(57).: Invalid communicator
Abort(67772421): Fatal error in internal_Recv: Invalid communicator, error stack:
internal_Recv(127): MPI_Recv(buf=0x7f95a7fd3d9c, count=1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, status=0x7f95a7fd3db0) failed
internal_Recv(57).: Invalid communicator
Abort(1006772229): Fatal error in internal_Recv: Invalid communicator, error stack:
internal_Recv(127): MPI_Recv(buf=0x7ffafe026d9c, count=1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, status=0x7ffafe026db0) failed
internal_Recv(57).: Invalid communicator
Abort(670703621): Fatal error in internal_Recv: Invalid communicator, error stack:
internal_Recv(127): MPI_Recv(buf=0x7fbe329e5d9c, count=1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, status=0x7fbe329e5db0) failed
internal_Recv(57).: Invalid communicator
a.out:118415 terminated with signal 11 at PC=7fbe3a7f608f SP=7fbe331e6b60. Backtrace:
Abort(403327493): Fatal error in internal_Send: Invalid communicator, error stack:
internal_Send(120): MPI_Send(buf=0x7f070480dd8c, count=1, MPI_INT, 0, 1, MPI_COMM_WORLD) failed
internal_Send(57).: Invalid communicator
Abort(134892037): Fatal error in internal_Send: Invalid communicator, error stack:
internal_Send(120): MPI_Send(buf=0x7ffafe827d8c, count=1, MPI_INT, 0, 1, MPI_COMM_WORLD) failed
internal_Send(57).: Invalid communicator