@MuffinLover

Что ему не нравится с моим буффером?

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <mpi.h>
#include <math.h>

#define MAX_TASKS 100
#define ITERATIONS_COUNT 100
#define L 2
#define REQUEST_TASK 228
#define STOP_CODE 229
#define TO_RECEIVER 1
#define SEND_WORK_TAG 2

int *tasks;
int size;
int rank;
int offset;
pthread_mutex_t mutex;


void fill_tasks(int iter_count) {
    for(int i = 0; i < size * MAX_TASKS; i++){
        tasks[i] = abs(50 - i % 100) * abs(rank - iter_count % size) * L;
    }
}

int do_tasks() {
    int local_res = 0;
    while(1){
        pthread_mutex_lock(&mutex);
        if(offset != size * MAX_TASKS - 1){
            pthread_mutex_unlock(&mutex);
            break;
        }
        pthread_mutex_unlock(&mutex);
        int weight = tasks[offset];
        for(int j = 0; j < weight; j++){
            local_res += (int)sqrt(j);
        }
        pthread_mutex_lock(&mutex);
        offset++;
        pthread_mutex_unlock(&mutex);

    }


    return local_res;
}

void request_tasks() {
    for(size_t i = 0; i < size; i++){
        if(i == rank) continue;
        int req_code = REQUEST_TASK;
        int help_length;
        MPI_Send(&req_code, 1, MPI_INT, (int)i, TO_RECEIVER, MPI_COMM_WORLD);
        MPI_Recv(&help_length, 1, MPI_INT, (int)i, SEND_WORK_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        if(help_length > 0){
            MPI_Recv(tasks, help_length, MPI_INT, (int)i, SEND_WORK_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            pthread_mutex_lock(&mutex);
            offset = 0;
            pthread_mutex_unlock(&mutex);
        }


    }
}

void worker_func(){
    for(size_t i = 0; i < ITERATIONS_COUNT; i++){
        pthread_mutex_lock(&mutex);
        offset = 0;
        fill_tasks((int)i);
        pthread_mutex_unlock(&mutex);
        int res = do_tasks();

        request_tasks();

        if(rank == 0){
            printf("%d\n", res);
        }
    }
    int stop_code = STOP_CODE;
    MPI_Send(&stop_code, 1, MPI_INT, rank, TO_RECEIVER, MPI_COMM_WORLD);
}

void receiver_func(){
    while(1){
        int req_code_buf;
        MPI_Status status_worker_requester;
        MPI_Recv(&req_code_buf, 1, MPI_INT, MPI_ANY_SOURCE, TO_RECEIVER, MPI_COMM_WORLD, &status_worker_requester);
        if(req_code_buf == STOP_CODE) break;
        // new_offset = offset + size * (int)MAX_TASKS * 0.3

        size_t length = size * MAX_TASKS;
        int new_offset = offset + (int)((int)length * 0.3);
        int tasks_length = new_offset - offset;


        MPI_Send(&tasks_length, 1, MPI_INT, status_worker_requester.MPI_SOURCE, SEND_WORK_TAG, MPI_COMM_WORLD);
        if(new_offset < length - 1){
            int old_offset = offset;
            pthread_mutex_lock(&mutex);
            offset = new_offset;
            pthread_mutex_unlock(&mutex);
            MPI_Send(&tasks[old_offset], tasks_length, MPI_INT, status_worker_requester.MPI_SOURCE, SEND_WORK_TAG, MPI_COMM_WORLD);

        }

    }
}

int main(int argc, char* argv[]) {
    MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, NULL);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    tasks = malloc(size * MAX_TASKS * sizeof(int));
    pthread_t worker, receiver;
    pthread_mutex_init(&mutex, NULL);
    pthread_create(&worker, NULL, (void *(*)(void *)) worker_func, NULL);
    pthread_create(&receiver, NULL, (void *(*)(void *)) receiver_func, NULL);


    
    MPI_Finalize();
    free(tasks);
    return 0;
}


ошибка

Abort(469901317): Fatal error in internal_Recv: Invalid communicator, error stack:
internal_Recv(127): MPI_Recv(buf=0x7f070400cd9c, count=1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, status=0x7f070400cdb0) failed
internal_Recv(57).: Invalid communicator
Abort(402803205): Fatal error in internal_Send: Invalid communicator, error stack:
internal_Send(120): MPI_Send(buf=0x7f95a87d4d8c, count=1, MPI_INT, 1, 1, MPI_COMM_WORLD) failed
internal_Send(57).: Invalid communicator
Abort(67772421): Fatal error in internal_Recv: Invalid communicator, error stack:
internal_Recv(127): MPI_Recv(buf=0x7f95a7fd3d9c, count=1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, status=0x7f95a7fd3db0) failed
internal_Recv(57).: Invalid communicator
Abort(1006772229): Fatal error in internal_Recv: Invalid communicator, error stack:
internal_Recv(127): MPI_Recv(buf=0x7ffafe026d9c, count=1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, status=0x7ffafe026db0) failed
internal_Recv(57).: Invalid communicator
Abort(670703621): Fatal error in internal_Recv: Invalid communicator, error stack:
internal_Recv(127): MPI_Recv(buf=0x7fbe329e5d9c, count=1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, status=0x7fbe329e5db0) failed
internal_Recv(57).: Invalid communicator

a.out:118415 terminated with signal 11 at PC=7fbe3a7f608f SP=7fbe331e6b60. Backtrace:
Abort(403327493): Fatal error in internal_Send: Invalid communicator, error stack:
internal_Send(120): MPI_Send(buf=0x7f070480dd8c, count=1, MPI_INT, 0, 1, MPI_COMM_WORLD) failed
internal_Send(57).: Invalid communicator
Abort(134892037): Fatal error in internal_Send: Invalid communicator, error stack:
internal_Send(120): MPI_Send(buf=0x7ffafe827d8c, count=1, MPI_INT, 0, 1, MPI_COMM_WORLD) failed
internal_Send(57).: Invalid communicator

  • Вопрос задан
  • 271 просмотр
Решения вопроса 1
shurshur
@shurshur
Сисадмин, просто сисадмин...
Думаю, ошибка тут в том, что в процессе выполнения receiver_func значение offset также успевает измениться внутри worker_func, что приводит к выходу за пределы буфера, несмотря на явную проверку offset.

Вообще, надо всегда помнить, что полноценная синхронизация тредов только на mutex невозможна. Между двумя блокировками mutex одним тредом другой тред может успеть выполнить несколько блокировок. Нужно также использовать condvar.

В целом кажется, что этот код написан в попытках сделать хоть что-то параллельно, а не для решения какой-то реальной задачи.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы