MPINodeStatus = {MPI_NODE_COUNT-1,1}["-1"]; /* reset semaphore */ path = HYPHY_BASE_DIRECTORY + "TemplateBatchFiles" + DIRECTORY_SEPARATOR + "Utility" + DIRECTORY_SEPARATOR + "ReadDelimitedFiles.bf"; ExecuteAFile (path); /* load analysis settings */ /* this gets read first -> /Library/WebServer/Documents/upload/g7097skphqi84tm71imk60agb7/paths.txt this becomes 0 -> Automatic this becomes 1 -> N/A this becomes 2 -> HIV 25% this becomes 3 -> All */ fscanf ("/home/art/wip/swenson/dutch_v3/prelim/_remote.txt", "Lines", paths); _settings = {}; _settings["0"] = "Custom"; _settings["1"] = "CTGAACACATCTGTAGAAATTAATTGTACAAGACCCAACAACAATACAAGAAAAAGAATCCGTATCCAGAGAGGACCAGGGAGAGCATTTGTTACAATAGGAAAAATAGGAAATATGAGACAAGCACATTGTAACATTAGTAGAGCAAAATGGAATAACACTTTAAAACAGATAGCTAGCAAATTAAGAGAACAATTTGGAAATAATAAAACAATAATCTTTAAGCAATCCTCAGGAGGGGACCCAGAAATTGTAACGCACAGT"; _settings["2"] = "HIV 25%"; _settings["3"] = "All"; jobsInProgress = Columns(paths); for (_job = 0; _job < Columns(paths); _job = _job+1) { //fprintf (stdout, paths[_job], "\n"); received = SendAJob (_job); if (received) { jobsInProgress = jobsInProgress - 1; } } fprintf (stdout, "Master node is waiting for ", jobsInProgress, " more jobs to finish.\n"); for (_wait = 0; _wait < jobsInProgress; _wait = _wait+1) { MPIReceive (-1, fromNode, theJob); fprintf (stdout, "Received job from node ", fromNode, "\n"); } fprintf (stdout, "Analysis complete.\n"); function SendAJob (job) { /* items = splitOnRegExp (paths[job], "/"); filename = items[Abs(items)-1]; stem = (paths[job])[0][Abs(paths[job]) - Abs(filename) - 1]; items = splitOnRegExp (filename, "\\."); prefix = items[0]+"."+items[1]; */ cmdString = ""; cmdString * 256; cmdString * "stdinRedirect = {};\n"; cmdString * "stdinRedirect[\"00\"] = \"Universal\";\n"; target = paths[job]^{{" "}{""}}; cmd = "stdinRedirect[\"01\"] = \""+target+"\";\n"; cmdString * cmd; // alignment mode (automatic or select from list) cmd = "stdinRedirect[\"02\"] = \""+_settings[0]+"\";\n"; cmdString * cmd; if (_settings[0] == "Automatic") { cmd = "stdinRedirect[\"03\"] = \""+_settings[2]+"\";\n"; cmdString * cmd; cmd = "stdinRedirect[\"04\"] = \""+_settings[3]+"\";\n"; cmdString * cmd; cmdString * "stdinRedirect[\"05\"] = \"CSV\";\n"; cmd = "stdinRedirect[\"06\"] = \"" + target + ".csv"; cmdString * cmd; cmdString * "\";\n"; } else { // reference sequence cmd = "stdinRedirect[\"03\"] = \""+_settings[1]+"\";\n"; cmdString * cmd; // substitution matrix cmd = "stdinRedirect[\"04\"] = \""+_settings[2]+"\";\n"; cmdString * cmd; // filtering option cmd = "stdinRedirect[\"05\"] = \""+_settings[3]+"\";\n"; cmdString * cmd; cmdString * "stdinRedirect[\"06\"] = \"CSV\";\n"; cmd = "stdinRedirect[\"07\"] = \"" + target + ".csv"; cmdString * cmd; cmdString * "\";\n"; } cmdString * "ExecuteAFile (\"/home/art/wip/swenson/dutch_v3/scripts/emelineV2.5.bf\", stdinRedirect);"; cmdString * 0; for (nodeID = 0; nodeID < MPI_NODE_COUNT -1; nodeID = nodeID + 1) { if (MPINodeStatus[nodeID] < 0) /* semaphore indicates available node */ { fprintf (stdout, "Sending job ", job, " to idle node ", nodeID+1, "\n"); MPISend(nodeID+1, cmdString); MPINodeStatus[nodeID] = job; break; } } if (nodeID == MPI_NODE_COUNT-1) /* looped through semaphore with no idle nodes */ { MPIReceive (-1, fromNode, theJob); /* wait until one of the node completes its job */ oldJob = MPINodeStatus[fromNode-1]; fprintf (stdout, "Sending job ", job, " to node ", fromNode, " that just finished job\n"); MPISend (fromNode, cmdString); MPINodeStatus[fromNode-1] = job; return 1; /* indicate that job is received */ } return 0; }