// MPQueue, version 4 // copyright by John Neuberger, Nandor Sieben and James Swift #include #include #include #include "MPQueue.h" #define LOAD_DIAGRAM // comment it out if a load diagram is not needed using namespace std; // MPI message tags enum {BOSS, REPORT_TAG, PRE_JOB_TAG, JOB_TAG, PRE_RESULT_TAG, RESULT_TAG, PMSG_SUBMIT, PMSG_RESULT, PMSG_RUN, PMSG_INFO}; queue idle; ofstream loadstream ("load.txt"); #ifdef LOAD_DIAGRAM int start_time; #endif int MPIsize; int MPIrank; // submit by worker to the currently running job queue void MPQsubmit (const Tjob & job) { char outca[job.data.size () + 1]; strcpy (outca, job.data.c_str ()); int sizemsg[3]; sizemsg[0] = PMSG_SUBMIT; sizemsg[1] = job.data.size () + 1; // size already contains the trailing 0 sizemsg[2] = job.type; MPI_Send (sizemsg, 3, MPI_INT, BOSS, PRE_RESULT_TAG, MPI_COMM_WORLD); MPI_Send (outca, sizemsg[1], MPI_CHAR, BOSS, RESULT_TAG, MPI_COMM_WORLD); } // worker ask boss to run job immediately void MPQtask (Tjob & job) { char outca[job.data.size () + 1]; strcpy (outca, job.data.c_str ()); int sizemsg[3]; sizemsg[0] = PMSG_RUN; sizemsg[1] = job.data.size () + 1; sizemsg[2] = job.type; MPI_Send (sizemsg, 3, MPI_INT, BOSS, PRE_RESULT_TAG, MPI_COMM_WORLD); MPI_Send (outca, sizemsg[1], MPI_CHAR, BOSS, RESULT_TAG, MPI_COMM_WORLD); int premsg[2]; MPI_Status status; MPI_Recv (premsg, 2, MPI_INT, BOSS, PRE_JOB_TAG, MPI_COMM_WORLD, &status); char msg[premsg[0]]; MPI_Recv (msg, premsg[0], MPI_CHAR, BOSS, JOB_TAG, MPI_COMM_WORLD, &status); job.data = msg; } // worker ask boss to run job immediately void MPQinfo (int &queuesize, int &sent) { int sizemsg[3]; sizemsg[0] = PMSG_INFO; sizemsg[1] = 0; sizemsg[2] = 0; MPI_Send (sizemsg, 3, MPI_INT, BOSS, PRE_RESULT_TAG, MPI_COMM_WORLD); int premsg[2]; MPI_Status status; MPI_Recv (premsg, 2, MPI_INT, BOSS, PRE_JOB_TAG, MPI_COMM_WORLD, &status); queuesize = premsg[0]; sent = premsg[1]; } void MPQreleaseworkers () { int msg[2]; msg[0] = msg[1] = 0; int premsg; MPI_Status status; for (int i = 1; i < MPIsize; i++) { MPI_Send (msg, 2, MPI_INT, i, PRE_JOB_TAG, MPI_COMM_WORLD); } } void MPQsharedata (const Tjob & job) // function must be negative { MPI_Status status; int info[2]; info[0] = job.data.size () + 1; info[1] = -job.type; char outca[info[0]]; strcpy (outca, job.data.c_str ()); for (int i = 1; i < MPIsize; i++) { MPI_Send (info, 2, MPI_INT, i, PRE_JOB_TAG, MPI_COMM_WORLD); } MPI_Bcast (outca, info[0], MPI_CHAR, BOSS, MPI_COMM_WORLD); } #ifdef LOAD_DIAGRAM void printtime (size_t quel, int sent) { time_t rawtime, curtime; curtime = time (NULL); rawtime = curtime - start_time; loadstream << rawtime << "\t" << quel + sent << "\t" << quel << "\t" << sent << "\t" << ctime (&curtime) << flush; } #endif void MPQrunjobs (Tjobqueue & inqueue, Tjobqueue & outqueue) { MPI_Status status, status2; Tjob job; int info[2]; int sent = 0; int idlesize=idle.size(); while (!inqueue.empty () || idle.size() < idlesize) { if (!inqueue.empty () && !idle.empty() ) { #ifdef LOAD_DIAGRAM printtime (inqueue.size (), MPIsize-1-idle.size()); #endif if (inqueue.empty ()) { cerr << "inqueue is empty\n"; exit (1); } job = inqueue.front (); inqueue.pop (); info[0] = job.data.size () + 1; info[1] = job.type; char outca[info[0]]; strcpy (outca, job.data.c_str ()); MPI_Send (info, 2, MPI_INT, idle.front(), PRE_JOB_TAG, MPI_COMM_WORLD); MPI_Send (outca, info[0], MPI_CHAR, idle.front(), JOB_TAG, MPI_COMM_WORLD); idle.pop(); } else { int premsg[3]; // wait for results MPI_Recv (premsg, 3, MPI_INT, MPI_ANY_SOURCE, PRE_RESULT_TAG, MPI_COMM_WORLD, &status); int length = premsg[1]; char msg[length]; int worker = status.MPI_SOURCE; if (premsg[0] != PMSG_INFO) { MPI_Recv (msg, length, MPI_CHAR, worker, RESULT_TAG, MPI_COMM_WORLD, &status2); job.data = msg; job.type = premsg[2]; #ifdef LOAD_DIAGRAM printtime (inqueue.size (), MPIsize-1-idle.size()); #endif } switch (premsg[0]) { case PMSG_SUBMIT: // new submit inqueue.push (job); break; case PMSG_RESULT: // finished with result if (length > 1) outqueue.push (job); idle.push(worker); break; case PMSG_RUN:{ string out; MPQswitch (job); char outca[job.data.size () + 1]; strcpy (outca, job.data.c_str ()); info[0] = job.data.size () + 1; info[1] = job.type; MPI_Send (info, 2, MPI_INT, worker, PRE_JOB_TAG, MPI_COMM_WORLD); MPI_Send (outca, info[0], MPI_CHAR, worker, JOB_TAG, MPI_COMM_WORLD); } break; case PMSG_INFO: info[0] = inqueue.size (); info[1] = idlesize-idle.size(); MPI_Send (info, 2, MPI_INT, worker, PRE_JOB_TAG, MPI_COMM_WORLD); break; } } } } void worker () { int premsg[2]; MPI_Status status; while (1) { MPI_Recv (premsg, 2, MPI_INT, BOSS, PRE_JOB_TAG, MPI_COMM_WORLD, &status); if (premsg[0] == 0) { // stop signal MPI_Finalize (); exit (0); } char msg[premsg[0]]; if (premsg[1] < 0) { // sharedata MPI_Bcast (msg, premsg[0], MPI_CHAR, BOSS, MPI_COMM_WORLD); string out; Tjob job; job.type= -premsg[1]; job.data = msg; MPQswitch (job); continue; } MPI_Recv (msg, premsg[0], MPI_CHAR, BOSS, JOB_TAG, MPI_COMM_WORLD, &status); string out; Tjob job; job.type= premsg[1]; job.data = msg; MPQswitch (job); char outca[job.data.size () + 1]; strcpy (outca, job.data.c_str ()); int sizemsg[3]; sizemsg[0] = PMSG_RESULT; sizemsg[1] = job.data.size () + 1; sizemsg[2] = premsg[1]; MPI_Send (sizemsg, 3, MPI_INT, BOSS, PRE_RESULT_TAG, MPI_COMM_WORLD); MPI_Send (outca, sizemsg[1], MPI_CHAR, BOSS, RESULT_TAG, MPI_COMM_WORLD); } } void MPQinit (int argc, char *argv[]) { MPI_Init (&argc, &argv); MPI_Comm_rank (MPI_COMM_WORLD, &MPIrank); MPI_Comm_size (MPI_COMM_WORLD, &MPIsize); } void MPQstart () { if (MPIrank != 0) { worker (); } #ifdef LOAD_DIAGRAM start_time = time (NULL); printtime (0, 0); #endif for (int i=1; i