#include #include #include #include #include #include #define tWorkRequest 15 #define tWorkOrder 16 #define tWorkDone 17 #define tHeartBeat 18 #define tQuit 19 #define sWaiting 0 #define sWorking 1 #define sDone 2 int send_text(char *str, int dest, int tag) { long size = strlen(str)+1; MPI_Send( &size, 1, MPI_INT, dest, tag, MPI_COMM_WORLD); return MPI_Send(str, strlen(str) + 1, MPI_CHAR, dest, tag, MPI_COMM_WORLD); } int recv_text(char **str, int src, int tag) { long size; MPI_Status status; MPI_Recv( &size, 1, MPI_INT, src, tag, MPI_COMM_WORLD, &status); *str = (char *)malloc( size ); return MPI_Recv( *str, size, MPI_CHAR, src, tag, MPI_COMM_WORLD, &status); } void clean_str( char *str) { int i; for (i = 0; i < strlen(str); i++) { if (str[i] == '\n' || str[i] == '\r') str[i] = 0; } } char quit = 0; void catch_int(int sig_num) { /* re-set the signal handler again to catch_int, for next time */ signal(SIGINT, catch_int); quit = 1; /* and print the message */ fprintf(stderr, "Caught Quit Signal\n"); fflush(stderr); } int main(int argc, char **argv) { int group_size = 1, group_rank = 0; FILE *work_file, *cp_file; char buffer[4048], **command_strings, *command_status, *command_buffer; long command_count, i, write_pos, data_size, commands_left, open_requests; int flag, msg_src, msg_tag, tmp_int; MPI_Status status; signal(SIGINT, catch_int); signal(SIGTERM, catch_int); MPI_Init( &argc, &argv ); MPI_Comm_size ( MPI_COMM_WORLD, &group_size ); MPI_Comm_rank ( MPI_COMM_WORLD, &group_rank ); if (group_rank == 0) { fprintf(stderr, "Entering Master Section (pid:%d)\n", getpid() ); /*the master section load the work file*/ command_count = 0; data_size = 0; work_file = fopen( argv[1], "r"); while (fgets(buffer, sizeof(buffer), work_file)) { clean_str(buffer); command_count++; data_size += strlen( buffer ) + 1; } command_strings = (char **)malloc(sizeof(char *) * command_count ); command_strings[0] = (char *)malloc(sizeof(char) * data_size); fseek( work_file, 0, SEEK_SET ); i = 0; write_pos = 0; while (fgets(buffer, sizeof(buffer), work_file)) { clean_str(buffer); command_strings[i] = command_strings[0] + write_pos; memcpy( command_strings[i], buffer, strlen(buffer) + 1); write_pos += strlen(buffer) + 1; i++; } fclose(work_file); command_status = (char *)malloc(sizeof(char) * command_count); for (i = 0; i < command_count; i++) { command_status[i] = sWaiting; } commands_left = command_count; sprintf(buffer, "%s.check", argv[1]); if ( (cp_file = fopen( buffer, "r")) ) { fprintf(stderr, "Resuming work\n"); while (fgets( buffer,sizeof(buffer), cp_file)) { tmp_int = atoi( buffer ); if (tmp_int >= 0 && tmp_int < command_count) { commands_left--; command_status[tmp_int] = sDone; } } fclose(cp_file); } open_requests = 0; do { /*deal with work requests*/ flag = 0; do { MPI_Iprobe( MPI_ANY_SOURCE, tWorkRequest, MPI_COMM_WORLD, &flag, &status ); if (flag) { msg_src = status.MPI_SOURCE; msg_tag = status.MPI_TAG; MPI_Recv( &tmp_int, 1, MPI_INT, msg_src, msg_tag, MPI_COMM_WORLD, &status); /*find new work*/ for (i = 0; i < command_count; i++) { if ( command_status[i] == sWaiting) { send_text(command_strings[i], msg_src, tWorkOrder); MPI_Send( &i, 1, MPI_INT, msg_src, tWorkOrder, MPI_COMM_WORLD); command_status[i] = sWorking; i = command_count; commands_left--; fprintf(stderr, "Assigning %d of %d\n", command_count - commands_left, command_count); open_requests++; } } } } while (flag); /*deal with work replies*/ flag = 0; do { MPI_Iprobe( MPI_ANY_SOURCE, tWorkDone, MPI_COMM_WORLD, &flag, &status ); if (flag) { msg_src = status.MPI_SOURCE; msg_tag = status.MPI_TAG; MPI_Recv( &tmp_int, 1, MPI_INT, msg_src, msg_tag, MPI_COMM_WORLD, &status); command_status[ tmp_int ] = sDone; open_requests--; } } while (flag); sleep(1); if (commands_left <= 0 && open_requests<= 0) quit = 1; } while (!quit); /*write the stat file*/ sprintf(buffer, "%s.check", argv[1]); cp_file = fopen( buffer, "w" ); for (i = 0; i < command_count; i++) { if (command_status[i] == sDone) fprintf(cp_file, "%d\n", i); } fclose(cp_file); for (i = 1; i < group_size; i++) { MPI_Send( &tmp_int, 1, MPI_INT, i, tQuit, MPI_COMM_WORLD ); } } else { fprintf(stderr, "Entering Slave Section (pid: %d)\n", getpid() ); /*the slave section*/ MPI_Send( &tmp_int, 1, MPI_INT, 0, tWorkRequest, MPI_COMM_WORLD ); do { MPI_Iprobe( 0, tWorkOrder, MPI_COMM_WORLD, &flag, &status ); if (flag) { fprintf(stderr, "Howdy\n"); msg_src = status.MPI_SOURCE; msg_tag = status.MPI_TAG; recv_text( &command_buffer, 0, tWorkOrder ); MPI_Recv( &tmp_int, 1, MPI_INT, msg_src, msg_tag, MPI_COMM_WORLD, &status); fprintf(stderr, "Doing working: %s\n", command_buffer); system( command_buffer ); free(command_buffer); MPI_Send( &tmp_int, 1, MPI_INT, 0, tWorkDone, MPI_COMM_WORLD ); MPI_Send( &tmp_int, 1, MPI_INT, 0, tWorkRequest, MPI_COMM_WORLD ); } MPI_Iprobe( 0, tQuit, MPI_COMM_WORLD, &flag, &status ); if (flag) { quit = 1; } } while (!quit); } MPI_Finalize(); return 0; }