function [fOutVar,nBlockPerCPU, totCPU] = masterParallel(Parallel,fBlock,nBlock,NamFileInput,fname,fInputVar,fGlobalVar,Parallel_info,initialize) % PARALLEL CONTEXT % This is the most important function for the management of DYNARE parallel % computing. % It is the top-level function called on the master computer when parallelizing a task. % % This function has two main computational strategies for managing the % matlab worker (slave process): % % 0 Simple Close/Open Stategy: % In this case the new Matlab instances (slave process) are open when % necessary and then closed. This can happen many times during the % simulation of a model. % % 1 Always Open Strategy: % In this case we have a more sophisticated management of slave processes, % which are no longer closed at the end of each job. The slave processes % wait for a new job (if it exists). If a slave does not receive a new job after a % fixed time it is destroyed. This solution removes the computational % time necessary to Open/Close new Matlab instances. % % The first (point 0) is the default Strategy % i.e.(Parallel_info.leaveSlaveOpen=0). This value can be changed by the % user in xxx.mod file or it is changed by the programmer if it is necessary to % reduce the overall computational time. See for example the % prior_posterior_statistics.m. % % The number of parallelized threads will be equal to (nBlock-fBlock+1). % % Treatment of global variables: % Global variables used within the called function are wrapped and passed by storing their % values at the start of the parallel computation in a file via % storeGlobalVars.m. This file is then loaded in the separate, % independent slave Matlab sessions. By keeping them separate, no % interaction via global variables can take place. % % INPUTS % o Parallel [struct vector] copy of options_.parallel % o fBlock [int] index number of the first thread % (between 1 and nBlock) % o nBlock [int] index number of the last thread % o NamFileInput [cell array] contains the list of input files to be % copied in the working directory of remote slaves % 2 columns, as many lines as there are files % - first column contains directory paths % - second column contains filenames % o fname [string] name of the function to be parallelized, and % which will be run on the slaves % o fInputVar [struct] structure containing local variables to be used % by fName on the slaves % o fGlobalVar [struct] structure containing global variables to be used % by fName on the slaves % o Parallel_info [] % o initialize [] % % OUTPUT % o fOutVar [struct vector] result of the parallel computation, one % struct per thread % o nBlockPerCPU [int vector] for each CPU used, indicates the number of % threads run on that CPU % o totCPU [int] total number of CPUs used (can be lower than % the number of CPUs declared in "Parallel", if % the number of required threads is lower) % Copyright © 2009-2017 Dynare Team % % This file is part of Dynare. % % Dynare is free software: you can redistribute it and/or modify % it under the terms of the GNU General Public License as published by % the Free Software Foundation, either version 3 of the License, or % (at your option) any later version. % % Dynare is distributed in the hope that it will be useful, % but WITHOUT ANY WARRANTY; without even the implied warranty of % MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the % GNU General Public License for more details. % % You should have received a copy of the GNU General Public License % along with Dynare. If not, see . % If islocal==0, create a new directory for remote computation. % This directory is named using current data and time, % is used only one time and then deleted. persistent PRCDir % PRCDir = Present Remote Computational Directory! Strategy=Parallel_info.leaveSlaveOpen; islocal = 1; isHybridMatlabOctave = Parallel_info.isHybridMatlabOctave; for j=1:length(Parallel) islocal=islocal*Parallel(j).Local; end if nargin>8 && initialize==1 if islocal == 0 PRCDir=CreateTimeString(); assignin('base','PRCDirTmp',PRCDir), evalin('base','options_.parallel_info.RemoteTmpFolder=PRCDirTmp;') evalin('base','clear PRCDirTmp,') else % Delete the traces (if existing) of last local session of computations. mydelete(['slaveParallel_input*.mat']); end return end % Determine the total number of available CPUs, and the number of threads % to run on each CPU. [nCPU, totCPU, nBlockPerCPU, totSlaves] = distributeJobs(Parallel, fBlock, nBlock); parallel_recover = 0; if isfield(Parallel_info,'parallel_recover') && Parallel_info.parallel_recover parallel_recover = 1; end if parallel_recover ==0 if isfield(Parallel_info,'local_files') if isempty(NamFileInput) NamFileInput=Parallel_info.local_files; else NamFileInput=[NamFileInput;Parallel_info.local_files]; end end % Deactivate some 'Parallel/Warning' messages in Octave! % Comment the line 'warning('off');' in order to view the warning messages % in Octave! if isoctave warning('off') end % check if there are function_handles in the input or global vars when % octave is used if isHybridMatlabOctave || isoctave fInputNames = fieldnames(fInputVar); for j=1:length(fInputNames) TargetVar = fInputVar.(fInputNames{j}); if isa(TargetVar,'function_handle') TargetVar=func2str(TargetVar); fInputVar.(fInputNames{j})=TargetVar; end end if exist('fGlobalVar','var') && ~isempty(fGlobalVar) fInputNames = fieldnames(fGlobalVar); for j=1:length(fInputNames) TargetVar = fGlobalVar.(fInputNames{j}); if isa(TargetVar,'function_handle') TargetVar=func2str(TargetVar); fGlobalVar.(fInputNames{j})=TargetVar; end end end end % if Strategy==1 % totCPU=0; % end % Determine my hostname and my working directory. DyMo=pwd; % fInputVar.DyMo=DyMo; if ispc [tempo, MasterName]=system('hostname'); MasterName=deblank(MasterName); end % fInputVar.MasterName = MasterName; % Save input data for use by the slaves. switch Strategy case 0 storeGlobalVars([fname,'_input.mat']); save([fname,'_input.mat'],'fInputVar','Parallel','-append') case 1 if exist('fGlobalVar','var') save(['temp_input.mat'],'fInputVar','fGlobalVar') else save(['temp_input.mat'],'fInputVar') end save(['temp_input.mat'],'Parallel','-append') closeSlave(Parallel,PRCDir,-1); end for j=1:totSlaves PRCDirSnapshot{j}={}; end offset0 = fBlock-1; % Clean up remnants of previous runs. mydelete(['comp_status_',fname,'*.mat']); mydelete(['P_',fname,'*End.txt']); mydelete([fname,'_output_*.mat']); mydelete('slaveParallel_break.mat'); dynareParallelDelete([fname,'_output_*.mat'],PRCDir,Parallel); dynareParallelDelete(['comp_status_',fname,'*.mat'],PRCDir,Parallel); dynareParallelDelete('slaveParallel_break.mat',PRCDir,Parallel); % Create a shell script containing the commands to launch the required % tasks on the slaves. fid = fopen('ConcurrentCommand1.bat','w+'); % Create the directory devoted to remote computation. if isempty(PRCDir) && ~islocal error('PRCDir not initialized!') else dynareParallelMkDir(PRCDir,Parallel(1:totSlaves)); end % Testing Zone % 1. Display the User Strategy: % if Strategy==0 % disp('User Strategy Now Is Open/Close (0)'); % else % disp('User Strategy Now Is Always Open (1)'); % end % 2. Display the output of 'NEW' distributeJobs.m: % % fBlock % nBlock % % % nCPU % totCPU % nBlockPerCPU % totSlaves % % keyboard % End for j=1:totCPU if Strategy==1 command1 = ' '; end indPC=min(find(nCPU>=j)); % According to the information contained in configuration file, compThread can limit MATLAB % to a single computational thread. By default, MATLAB makes use of the multithreading % capabilities of the computer on which it is running. Nevertheless % exsperimental results show as matlab native % multithreading limit the performaces when the parallel computing is active. if strcmp('true',Parallel(indPC).SingleCompThread) compThread = '-singleCompThread'; else compThread = ''; end nthreads=Parallel(indPC).NumberOfThreadsPerJob; if indPC>1 nCPU0 = nCPU(indPC-1); else nCPU0=0; end offset = sum(nBlockPerCPU(1:j-1))+offset0; % Create a file used to monitoring if a parallel block (core) % computation is finished or not. fid1=fopen(['P_',fname,'_',int2str(j),'End.txt'],'w+'); fclose(fid1); if Strategy==1 fblck = offset+1; nblck = sum(nBlockPerCPU(1:j)); save temp_input.mat fblck nblck fname -append; copyfile('temp_input.mat',['slaveJob',int2str(j),'.mat']); if Parallel(indPC).Local ==0 fid1=fopen(['stayalive',int2str(j),'.txt'],'w+'); fclose(fid1); dynareParallelSendFiles(['stayalive',int2str(j),'.txt'],PRCDir,Parallel(indPC)); mydelete(['stayalive',int2str(j),'.txt']); end % Wait for possibly local alive CPU to start the new job or close by % internal criteria. pause(1); newInstance = 0; % Check if j CPU is already alive. if isempty(dynareParallelDir(['P_slave_',int2str(j),'End.txt'],PRCDir,Parallel(indPC))) fid1=fopen(['P_slave_',int2str(j),'End.txt'],'w+'); fclose(fid1); if Parallel(indPC).Local==0 dynareParallelSendFiles(['P_slave_',int2str(j),'End.txt'],PRCDir,Parallel(indPC)); delete(['P_slave_',int2str(j),'End.txt']); end newInstance = 1; storeGlobalVars( ['slaveParallel_input',int2str(j),'.mat']); save( ['slaveParallel_input',int2str(j),'.mat'],'Parallel','-append'); % Prepare global vars for Slave. end else % If the computation is executed remotely all the necessary files % are created localy, then copied in remote directory and then % deleted (loacal)! save( ['slaveParallel_input',int2str(j),'.mat'],'j'); if Parallel(indPC).Local==0 dynareParallelSendFiles(['P_',fname,'_',int2str(j),'End.txt'],PRCDir,Parallel(indPC)); delete(['P_',fname,'_',int2str(j),'End.txt']); dynareParallelSendFiles(['slaveParallel_input',int2str(j),'.mat'],PRCDir,Parallel(indPC)); delete(['slaveParallel_input',int2str(j),'.mat']); end end % set affinity range on win CPU's affinity_range = [1:nthreads]+(j-1-nCPU0)*nthreads; my_affinity = int2str(Parallel(indPC).CPUnbr(affinity_range(1))); for jaff=2:length(affinity_range) my_affinity = [my_affinity ',' int2str(Parallel(indPC).CPUnbr(affinity_range(jaff)))]; end % % % int2str(Parallel(indPC).CPUnbr(j-nCPU0)) % DA SINTETIZZARE: %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % The following 'switch - case' code is the core of this function! switch Strategy case 0 if Parallel(indPC).Local == 1 % 0.1 Run on the local machine (localhost). if ~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem) % Hybrid computing Windows <-> Unix! if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa! command1=[Parallel(indPC).MatlabOctavePath,' -f --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')" &']; else command1=[Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')" &']; end else % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa! if Parallel_info.use_psexec token1 = ['psexec -accepteula -d -W "',DyMo, '" -a ',my_affinity,' -low ']; else itmp = intmin('uint64'); cpus = eval([ '[' my_affinity ']' ])+1; for icpu=1:length(cpus) itmp = bitset(itmp,cpus(icpu)); end hex_affinity = dec2hex(itmp); token1 = ['start /B /D "',DyMo, '" /affinity ',hex_affinity,' /LOW ']; end if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') command1=[token1,Parallel(indPC).MatlabOctavePath,' -f --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"']; else command1=[token1,Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"']; end end else % 0.2 Parallel(indPC).Local==0: Run using network on remote machine or also on local machine. if j==nCPU0+1 dynareParallelSendFiles([fname,'_input.mat'],PRCDir,Parallel(indPC)); dynareParallelSendFiles(NamFileInput,PRCDir,Parallel(indPC)); end if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)) % Hybrid computing Windows <-> Unix! if ispc token='start /B '; else token = ''; end if ~isempty(Parallel(indPC).Port) ssh_token = ['-p ',Parallel(indPC).Port]; else ssh_token = ''; end % To manage the diferences in Unix/Windows OS syntax. remoteFile=['remoteDynare',int2str(j)]; fidRemote=fopen([remoteFile,'.m'],'w+'); if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa! remoteString=['default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')']; command1=[token, 'ssh ',ssh_token,' ',Parallel(indPC).UserName,'@',Parallel(indPC).ComputerName,' "cd ',Parallel(indPC).RemoteDirectory,'/',PRCDir, '; ',Parallel(indPC).MatlabOctavePath,' -f --eval ',remoteFile,' " &']; else remoteString=['addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')']; command1=[token, 'ssh ',ssh_token,' ',Parallel(indPC).UserName,'@',Parallel(indPC).ComputerName,' "cd ',Parallel(indPC).RemoteDirectory,'/',PRCDir, '; ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r ',remoteFile,';" &']; end fprintf(fidRemote,'%s\n',remoteString); fclose(fidRemote); dynareParallelSendFiles([remoteFile,'.m'],PRCDir,Parallel(indPC)); delete([remoteFile,'.m']); else if ~strcmpi(Parallel(indPC).ComputerName,MasterName) % 0.3 Run on a remote machine! % Hybrid computing Matlab(Master)-> Octave(Slaves) and Vice Versa! if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') command1=['psexec \\',Parallel(indPC).ComputerName,' -accepteula -d -e -u ',Parallel(indPC).UserName,' -p ',Parallel(indPC).Password,' -W "',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\" -a ',my_affinity, ... ' -low ',Parallel(indPC).MatlabOctavePath,' -f --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"']; else command1=['psexec \\',Parallel(indPC).ComputerName,' -accepteula -d -e -u ',Parallel(indPC).UserName,' -p ',Parallel(indPC).Password,' -W "',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\" -a ',my_affinity, ... ' -low ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"']; end else % 0.4 Run on the local machine via the network % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa! if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') command1=['psexec \\',Parallel(indPC).ComputerName,' -accepteula -d -e -W "',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\" -a ',my_affinity, ... ' -low ',Parallel(indPC).MatlabOctavePath,' -f --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"']; else command1=['psexec \\',Parallel(indPC).ComputerName,' -accepteula -d -e -W "',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\" -a ',my_affinity, ... ' -low ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); fParallel(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',''',fname,''')"']; end end end end case 1 if Parallel(indPC).Local == 1 && newInstance % 1.1 Run on the local machine. if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)) % Hybrid computing Windows <-> Unix! if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') % Hybrid computing Matlab(Master)-> Octave(Slaves) and Vice Versa! command1=[Parallel(indPC).MatlabOctavePath,' -f --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')" &']; else command1=[Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')" &']; end else % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa! if Parallel_info.use_psexec token1 = ['psexec -accepteula -d -W "',DyMo, '" -a ',my_affinity,' -low ']; else itmp = intmin('uint64'); cpus = eval([ '[' my_affinity ']' ])+1; for icpu=1:length(cpus) itmp = bitset(itmp,cpus(icpu)); end hex_affinity = dec2hex(itmp); token1 = ['start /B /D "',DyMo, '" /affinity ',hex_affinity,' /LOW ']; end if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') command1=[token1,Parallel(indPC).MatlabOctavePath,' -f --eval "default_save_options(''-v7'');addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"']; else command1=[token1,Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"']; end end elseif Parallel(indPC).Local==0 % 1.2 Run using network on remote machine or also on local machine. if j==nCPU0+1 dynareParallelSendFiles(NamFileInput,PRCDir,Parallel(indPC)); end dynareParallelSendFiles(['P_',fname,'_',int2str(j),'End.txt'],PRCDir,Parallel(indPC)); delete(['P_',fname,'_',int2str(j),'End.txt']); if newInstance dynareParallelSendFiles(['slaveJob',int2str(j),'.mat'],PRCDir,Parallel(indPC)); delete(['slaveJob',int2str(j),'.mat']); dynareParallelSendFiles(['slaveParallel_input',int2str(j),'.mat'],PRCDir,Parallel(indPC)) if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)) % Hybrid computing Windows <-> Unix! if ispc token='start /B '; else token = ''; end if ~isempty(Parallel(indPC).Port) ssh_token = ['-p ',Parallel(indPC).Port]; else ssh_token = ''; end % To manage the diferences in Unix/Windows OS syntax. remoteFile=['remoteDynare',int2str(j)]; fidRemote=fopen([remoteFile,'.m'],'w+'); if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') % Hybrid computing Matlab(Master)-> Octave(Slaves) and Vice Versa! remoteString=['default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),');']; command1=[token, 'ssh ',ssh_token,' ',Parallel(indPC).UserName,'@',Parallel(indPC).ComputerName,' "cd ',Parallel(indPC).RemoteDirectory,'/',PRCDir '; ',Parallel(indPC).MatlabOctavePath,' -f --eval ',remoteFile,' " &']; else remoteString=['addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),');']; command1=[token, 'ssh ',ssh_token,' ',Parallel(indPC).UserName,'@',Parallel(indPC).ComputerName,' "cd ',Parallel(indPC).RemoteDirectory,'/',PRCDir '; ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r ',remoteFile,';" &']; end fprintf(fidRemote,'%s\n',remoteString); fclose(fidRemote); dynareParallelSendFiles([remoteFile,'.m'],PRCDir,Parallel(indPC)); delete([remoteFile,'.m']); else if ~strcmpi(Parallel(indPC).ComputerName,MasterName) % 1.3 Run on a remote machine. % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa! if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') command1=['psexec \\',Parallel(indPC).ComputerName,' -accepteula -d -e -u ',Parallel(indPC).UserName,' -p ',Parallel(indPC).Password,' -W "',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\" -a ',my_affinity, ... ' -low ',Parallel(indPC).MatlabOctavePath,' -f --eval "default_save_options(''-v7'');addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"']; else command1=['psexec \\',Parallel(indPC).ComputerName,' -accepteula -d -e -u ',Parallel(indPC).UserName,' -p ',Parallel(indPC).Password,' -W "',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\" -a ',my_affinity, ... ' -low ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"']; end else % 1.4 Run on the local machine via the network. % Hybrid computing Matlab(Master)->Octave(Slaves) and Vice Versa! if regexpi([Parallel(indPC).MatlabOctavePath], 'octave') command1=['psexec \\',Parallel(indPC).ComputerName,' -accepteula -d -e -W "',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\" -a ',my_affinity, ... ' -low ',Parallel(indPC).MatlabOctavePath,' -f --eval "default_save_options(''-v7''); addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"']; else command1=['psexec \\',Parallel(indPC).ComputerName,' -accepteula -d -e -W "',Parallel(indPC).RemoteDrive,':\',Parallel(indPC).RemoteDirectory,'\',PRCDir,'\" -a ',my_affinity, ... ' -low ',Parallel(indPC).MatlabOctavePath,' -nosplash -nodesktop -minimize ',compThread,' -r "addpath(''',Parallel(indPC).DynarePath,'''), dynareroot = dynare_config(); slaveParallel(',int2str(j),',',int2str(indPC),')"']; end end end else % When the user user strategy is equal to 1, you must % do PRCDirSnapshot here to to avoid problems of % synchronization. if isempty(PRCDirSnapshot{indPC}) PRCDirSnapshot(indPC)=dynareParallelSnapshot(PRCDir,Parallel(indPC)); PRCDirSnapshotInit(indPC) = PRCDirSnapshot(indPC); else PRCDirSnapshot(indPC)=dynareParallelGetNewFiles(PRCDir,Parallel(indPC),PRCDirSnapshot(indPC)); end dynareParallelSendFiles(['slaveJob',int2str(j),'.mat'],PRCDir,Parallel(indPC)); delete(['slaveJob',int2str(j),'.mat']); end end end fprintf(fid,'%s\n',command1); end % In This way we are sure that the file 'ConcurrentCommand1.bat' is % closed and then it can be deleted! while (1) StatusOfCC1_bat = fclose(fid); if StatusOfCC1_bat==0 break end end % Snapshot of the contents of all the directories involved in parallel % computing. This is necessary when I want to copy continuously the files produced by % the slaves ... % If the compuation is 'Local' it is not necessary to do it ... if Strategy==0 || newInstance % See above. PRCDirSnapshot=dynareParallelSnapshot(PRCDir,Parallel(1:totSlaves)); PRCDirSnapshotInit = PRCDirSnapshot; % Run the slaves. if ~ispc system('sh ConcurrentCommand1.bat &'); pause(1) else if isoctave % Redirect the standard output to the file 'OctaveStandardOutputMessage.txt'! % This file is saved in the Model directory. system('ConcurrentCommand1.bat > OctaveStandardOutputMessage.txt'); else system('ConcurrentCommand1.bat'); end end end % For matlab enviroment with options_.console_mode = 0: % create a parallel (local/remote) specialized computational status bars! global options_ % Create a parallel (local/remote) specialized computational status bars! if isoctave || options_.console_mode diary off; if isoctave printf('\n'); else fprintf('\n'); end else hfigstatus = figure('name',['Parallel ',fname],... 'DockControls','off', ... 'IntegerHandle','off', ... 'Interruptible','off', ... 'MenuBar', 'none', ... 'NumberTitle','off', ... 'Renderer','Painters', ... 'Resize','off'); ncol = ceil(totCPU/10); hspace = 0.9/ncol; hstatus(1) = axes('position',[0.05/ncol 0.92 0.9/ncol 0.03], ... 'box','on','xtick',[],'ytick',[],'xlim',[0 1],'ylim',[0 1]); set(hstatus(1),'Units','pixels') hpixel = get(hstatus(1),'Position'); hfigure = get(hfigstatus,'Position'); hfigure(4)=hpixel(4)*10/3*min(10,totCPU); set(hfigstatus,'Position',hfigure) set(hstatus(1),'Units','normalized'), vspace = max(0.1,1/totCPU); vstart = 1-vspace+0.2*vspace; for j=1:totCPU jrow = mod(j-1,10)+1; jcol = ceil(j/10); hstatus(j) = axes('position',[0.05/ncol+(jcol-1)/ncol vstart-vspace*(jrow-1) 0.9/ncol 0.3*vspace], ... 'box','on','xtick',[],'ytick',[],'xlim',[0 1],'ylim',[0 1]); hpat(j) = patch([0 0 0 0],[0 1 1 0],'r','EdgeColor','r'); htit(j) = title(['Initialize ...']); end cumBlockPerCPU = cumsum(nBlockPerCPU); end pcerdone = NaN(1,totCPU); idCPU = NaN(1,totCPU); % Wait for the slaves to finish their job, and display some progress % information meanwhile. % Caption for console mode computing ... if options_.console_mode || isoctave if ~isoctave if strcmpi([Parallel(indPC).MatlabOctavePath], 'octave') RjInformation='Hybrid Computing Is Active: Remote jobs are computed by Octave!'; fprintf([RjInformation,'\n\n']); end end fnameTemp=fname; L=length(fnameTemp); PoCo=strfind(fnameTemp,'_core'); for i=PoCo:L if i==PoCo fnameTemp(i)=' '; else fnameTemp(i)='.'; end end for i=1:L if fnameTemp(i)=='_' fnameTemp(i)=' '; end end fnameTemp(L)=''; Information=['Parallel ' fnameTemp ' Computing ...']; if isoctave if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)) && (Strategy==0) printf('\n'); pause(2); end printf([Information,'\n\n']); else fprintf([Information,'\n\n']); end end % Testing Zone % Check the new copy file strategy ... global NuoviFilecopiati NuoviFilecopiati=zeros(1,totSlaves); % End ForEver=1; statusString = ''; flag_CloseAllSlaves=0; while (ForEver) waitbarString = ''; statusString0 = repmat('\b',1,length(sprintf(statusString, 100 .* pcerdone))); statusString = ''; pause(1) try if islocal ==0 dynareParallelGetFiles(['comp_status_',fname,'*.mat'],PRCDir,Parallel(1:totSlaves)); end catch end for j=1:totCPU try if ~isempty(['comp_status_',fname,int2str(j),'.mat']) load(['comp_status_',fname,int2str(j),'.mat']); % whoCloseAllSlaves = who(['comp_status_',fname,int2str(j),'.mat','CloseAllSlaves']); if exist('CloseAllSlaves') && flag_CloseAllSlaves==0 flag_CloseAllSlaves=1; whoiamCloseAllSlaves=j; closeSlave(Parallel(1:totSlaves),PRCDir,1); end end pcerdone(j) = prtfrc; idCPU(j) = njob; if isoctave || options_.console_mode if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)) statusString = [statusString, int2str(j), ' %3.f%% done! ']; else statusString = [statusString, int2str(j), ' %3.f%% done! ']; end else status_String{j} = waitbarString; status_Title{j} = waitbarTitle; end catch % ME % To define! if isoctave || options_.console_mode if (~ispc || strcmpi('unix',Parallel(indPC).OperatingSystem)) statusString = [statusString, int2str(j), ' %3.f%% done! ']; else statusString = [statusString, int2str(j), ' %3.f%% done! ']; end end end end if isoctave || options_.console_mode if isoctave printf([statusString,'\r'], 100 .* pcerdone); else if ~isempty(statusString) fprintf([statusString0,statusString], 100 .* pcerdone); end end else for j=1:totCPU try set(hpat(j),'XData',[0 0 pcerdone(j) pcerdone(j)]); set(htit(j),'String',[status_Title{j},' - ',status_String{j}]); catch end end end % Check if the slave(s) has generated some new files remotely. % 1. The files .log and .txt are not copied. % 2. The comp_status_*.mat files are managed separately. if isoctave % to avoid synchronism problems try PRCDirSnapshot=dynareParallelGetNewFiles(PRCDir,Parallel(1:totSlaves),PRCDirSnapshot); catch end else PRCDirSnapshot=dynareParallelGetNewFiles(PRCDir,Parallel(1:totSlaves),PRCDirSnapshot); end if isempty(dynareParallelDir(['P_',fname,'_*End.txt'],PRCDir,Parallel(1:totSlaves))) HoTuttiGliOutput=0; for j=1:totCPU % Checking if the remote computation is finished and if we copied all the output here. if ~isempty(dir([fname,'_output_',int2str(j),'.mat'])) HoTuttiGliOutput=HoTuttiGliOutput+1; else indPC=min(find(nCPU>=j)); dynareParallelGetFiles([fname,'_output_',int2str(j),'.mat'],PRCDir,Parallel(indPC)); end end if HoTuttiGliOutput==totCPU mydelete(['comp_status_',fname,'*.mat']); if isoctave || options_.console_mode if isoctave printf('\n'); printf(['End Parallel Session ....','\n\n']); else fprintf('\n'); fprintf(['End Parallel Session ....','\n\n']); end diary on; else close(hfigstatus) end break else disp('Waiting for output files from slaves ...') end end end else for j=1:totSlaves PRCDirSnapshot{j}={}; end flag_CloseAllSlaves = 0; end % Load and format remote output. iscrash = 0; PRCDirSnapshot=dynareParallelGetNewFiles(PRCDir,Parallel(1:totSlaves),PRCDirSnapshot); for j=1:totCPU indPC=min(find(nCPU>=j)); load([fname,'_output_',int2str(j),'.mat'],'fOutputVar'); delete([fname,'_output_',int2str(j),'.mat']); if isfield(fOutputVar,'OutputFileName') && Parallel(indPC).Local==0 % Check if input files have been updated! OutputFileName=fOutputVar.OutputFileName; tmp0=''; for i=1:size(NamFileInput,1) FileList = regexp(strrep(PRCDirSnapshot{indPC},'\','/'),strrep(strrep([NamFileInput{i,:}],'\','/'),'*','(\w*)'),'match'); for k=1:length(FileList) if ~isempty(FileList{k}) if isempty(tmp0) tmp0=FileList{k}{1}; else tmp0=char(tmp0,FileList{k}{1}); end end end end for i=1:size(OutputFileName,1) tmp1=''; FileList = regexp(cellstr(tmp0),strrep(strrep([OutputFileName{i,:}],'\','/'),'*','(\w*)'),'match'); FileList0 = regexp(cellstr(tmp0),strrep([OutputFileName{i,2}],'*','(\w*)'),'match'); for k=1:length(FileList) if ~isempty(FileList{k}) if isempty(tmp1) tmp1=FileList0{k}{1}; else tmp1=char(tmp1,FileList0{k}{1}); end end end for k=1:size(tmp1,1) dynareParallelGetFiles([OutputFileName(i,1) {tmp1(k,:)}],PRCDir,Parallel(indPC)); end end % check if some output file is missing! for i=1:size(OutputFileName,1) tmp1=dynareParallelDir([OutputFileName{i,:}],PRCDir,Parallel(indPC)); tmp1 = regexp(cellstr(tmp1),strrep([OutputFileName{i,2}],'*','(\w*)'),'match'); tmp1 = char(tmp1{:}); tmp2=ls([OutputFileName{i,:}]); for ij=1:size(tmp1,1) icheck = regexp(cellstr(tmp2),tmp1(ij,:),'once'); isOutputFileMissing=1; for ik=1:size(tmp2,1) if ~isempty(icheck{ik}) isOutputFileMissing=0; end end if isOutputFileMissing dynareParallelGetFiles([OutputFileName(i,1) {tmp1(ij,:)}],PRCDir,Parallel(indPC)); end end end end if isfield(fOutputVar,'error') disp(['Job number ',int2str(j),' crashed with error:']); iscrash=1; disp([fOutputVar.error.message]); for jstack=1:length(fOutputVar.error.stack) fOutputVar.error.stack(jstack) end elseif flag_CloseAllSlaves==0 fOutVar(j)=fOutputVar; elseif j==whoiamCloseAllSlaves fOutVar=fOutputVar; end end if flag_CloseAllSlaves==1 closeSlave(Parallel(1:totSlaves),PRCDir,-1); end if iscrash error('Remote jobs crashed'); end pause(1) % Wait for all remote diary off completed % Cleanup. dynareParallelGetFiles('*.log',PRCDir,Parallel(1:totSlaves)); switch Strategy case 0 for indPC=1:min(find(nCPU>=totCPU)) if Parallel(indPC).Local == 0 dynareParallelRmDir(PRCDir,Parallel(indPC)); end if isempty(dir('dynareParallelLogFiles')) [A B C]=rmdir('dynareParallelLogFiles'); mkdir('dynareParallelLogFiles'); end try copyfile('*.log','dynareParallelLogFiles'); mydelete([fname,'*.log']); catch end mydelete(['*_core*_input*.mat']); end delete ConcurrentCommand1.bat case 1 delete(['temp_input.mat']) if newInstance if isempty(dir('dynareParallelLogFiles')) [A B C]=rmdir('dynareParallelLogFiles'); mkdir('dynareParallelLogFiles'); end end copyfile('*.log','dynareParallelLogFiles'); if newInstance delete ConcurrentCommand1.bat end dynareParallelDelete(['comp_status_',fname,'*.mat'],PRCDir,Parallel); for indPC=1:min(find(nCPU>=totCPU)) if Parallel(indPC).Local == 0 dynareParallelDeleteNewFiles(PRCDir,Parallel(indPC),PRCDirSnapshotInit(indPC),'.log'); for ifil=1:size(NamFileInput,1) dynareParallelDelete([NamFileInput{ifil,:}],PRCDir,Parallel(indPC)); end end end end