parallelization system:
* documented the functions masterParallel, fParallel and struct2local * removed useless comments, and added some comments inside the code git-svn-id: https://www.dynare.org/svn/dynare/trunk@3146 ac1d8469-bf42-47a9-8791-bf33cf982152time-shift
parent
76e691c651
commit
7085060e82
|
@ -96,7 +96,7 @@ localVars.NumberOfLines = NumberOfLines;
|
|||
localVars.time = time;
|
||||
localVars.M_ = M_;
|
||||
|
||||
if isnumeric(options_.parallel),% | isunix, % for the moment exclude unix platform from parallel implementation
|
||||
if isnumeric(options_.parallel),
|
||||
fout = McMCDiagnostics_core(localVars,1,npar,0);
|
||||
UDIAG = fout.UDIAG;
|
||||
clear fout
|
||||
|
@ -108,52 +108,6 @@ else
|
|||
end
|
||||
end
|
||||
|
||||
% for j=1:npar
|
||||
% fprintf(' Parameter %d... ',j);
|
||||
% for b = 1:nblck
|
||||
% startline = 0;
|
||||
% for n = 1:NumberOfMcFilesPerBlock
|
||||
% %load([MhDirectoryName '/' mcfiles(n,1,b).name],'x2');
|
||||
% load([MhDirectoryName '/' M_.fname '_mh',int2str(n),'_blck' int2str(b) ...
|
||||
% '.mat'],'x2');
|
||||
% nx2 = size(x2,1);
|
||||
% tmp((b-1)*NumberOfDraws+startline+(1:nx2),1) = x2(:,j);
|
||||
% % clear x2;
|
||||
% startline = startline + nx2;
|
||||
% end
|
||||
% % $$$ %load([MhDirectoryName '/' mcfiles(NumberOfMcFilesPerBlock,1,b).name],'x2');
|
||||
% % $$$ load([MhDirectoryName '/' M_.fname '_mh',int2str(NumberOfMcFilesPerBlock),'_blck' int2str(b) '.mat'],'x2');
|
||||
% % $$$ tmp((b-1)*NumberOfDraws+startline+1:(b-1)*NumberOfDraws+MAX_nruns*(LastFileNumber-1)+LastLineNumber,1) = x2(:,j);
|
||||
% % $$$ clear x2;
|
||||
% % $$$ startline = startline + LastLineNumber;
|
||||
% end
|
||||
% tmp(:,2) = kron(transpose(1:nblck),ones(NumberOfDraws,1));
|
||||
% tmp(:,3) = kron(ones(nblck,1),time');
|
||||
% tmp = sortrows(tmp,1);
|
||||
% ligne = 0;
|
||||
% for iter = Origin:StepSize:NumberOfDraws
|
||||
% ligne = ligne+1;
|
||||
% linea = ceil(0.5*iter);
|
||||
% n = iter-linea+1;
|
||||
% cinf = round(n*ALPHA/2);
|
||||
% csup = round(n*(1-ALPHA/2));
|
||||
% CINF = round(nblck*n*ALPHA/2);
|
||||
% CSUP = round(nblck*n*(1-ALPHA/2));
|
||||
% temp = tmp(find((tmp(:,3)>=linea) & (tmp(:,3)<=iter)),1:2);
|
||||
% UDIAG(ligne,1,j) = temp(CSUP,1)-temp(CINF,1);
|
||||
% moyenne = mean(temp(:,1));%% Pooled mean.
|
||||
% UDIAG(ligne,3,j) = sum((temp(:,1)-moyenne).^2)/(nblck*n-1);
|
||||
% UDIAG(ligne,5,j) = sum(abs(temp(:,1)-moyenne).^3)/(nblck*n-1);
|
||||
% for i=1:nblck
|
||||
% pmet = temp(find(temp(:,2)==i));
|
||||
% UDIAG(ligne,2,j) = UDIAG(ligne,2,j) + pmet(csup,1)-pmet(cinf,1);
|
||||
% moyenne = mean(pmet,1); %% Within mean.
|
||||
% UDIAG(ligne,4,j) = UDIAG(ligne,4,j) + sum((pmet(:,1)-moyenne).^2)/(n-1);
|
||||
% UDIAG(ligne,6,j) = UDIAG(ligne,6,j) + sum(abs(pmet(:,1)-moyenne).^3)/(n-1);
|
||||
% end
|
||||
% end
|
||||
% fprintf('Done! \n');
|
||||
% end
|
||||
UDIAG(:,[2 4 6],:) = UDIAG(:,[2 4 6],:)/nblck;
|
||||
disp(' ')
|
||||
clear pmet temp moyenne CSUP CINF csup cinf n linea iter tmp;
|
||||
|
|
|
@ -30,12 +30,9 @@ end
|
|||
ALPHA = 0.2; % increase too much with the number of simulations.
|
||||
tmp = zeros(NumberOfDraws*nblck,3);
|
||||
UDIAG = zeros(NumberOfLines,6,npar-fpar+1);
|
||||
% keyboard;
|
||||
|
||||
if whoiam
|
||||
% keyboard;
|
||||
waitbarString = ['Please wait... McMCDiagnostics (' int2str(fpar) 'of' int2str(npar) ')...'];
|
||||
% waitbarTitle=['McMCDiagnostics ',Parallel(ThisMatlab).PcName];
|
||||
if Parallel(ThisMatlab).Local,
|
||||
waitbarTitle=['Local '];
|
||||
else
|
||||
|
@ -89,7 +86,6 @@ for j=fpar:npar,
|
|||
end
|
||||
fprintf('Done! \n');
|
||||
if whoiam,
|
||||
% keyboard;
|
||||
waitbarString = [ 'Parameter ' int2str(j) '/' int2str(npar) ' done.'];
|
||||
fMessageStatus((j-fpar+1)/(npar-fpar+1),whoiam,waitbarString, waitbarTitle, Parallel(ThisMatlab), MasterName, DyMo)
|
||||
end
|
||||
|
|
|
@ -1,4 +1,18 @@
|
|||
function fParallel(fblck,nblck,whoiam,ThisMatlab,fname);
|
||||
function fParallel(fblck,nblck,whoiam,ThisMatlab,fname)
|
||||
% In a parallelization context, this function is launched on slave
|
||||
% machines, and acts as a wrapper around the function containing the
|
||||
% computing task itself.
|
||||
%
|
||||
% INPUTS
|
||||
% fblck [int] index number of the first thread to run in this
|
||||
% MATLAB instance
|
||||
% nblck [int] index number of the first thread to run in this
|
||||
% MATLAB instance
|
||||
% whoiam [int] index number of this CPU among all CPUs in the
|
||||
% cluster
|
||||
% ThisMatlab [int] index number of this slave machine in the cluster
|
||||
% (entry in options_.parallel)
|
||||
% fname [string] function to be run, containing the computing task
|
||||
|
||||
% Copyright (C) 2009 Dynare Team
|
||||
%
|
||||
|
@ -32,8 +46,9 @@ diary( [fname,'_',int2str(whoiam),'.log']);
|
|||
% configure dynare environment
|
||||
dynareroot = dynare_config();
|
||||
|
||||
% Load input data
|
||||
load( [fname,'_input'])
|
||||
% keyboard;
|
||||
|
||||
if exist('fGlobalVar'),
|
||||
globalVars = fieldnames(fGlobalVar);
|
||||
for j=1:length(globalVars),
|
||||
|
@ -42,11 +57,13 @@ if exist('fGlobalVar'),
|
|||
struct2local(fGlobalVar);
|
||||
end
|
||||
|
||||
% On UNIX, mount the master working directory through SSH FS
|
||||
if isunix & Parallel(ThisMatlab).Local==0,
|
||||
system(['mkdir ~/MasterRemoteMirror_',fname,'_',int2str(whoiam)]);
|
||||
system(['sshfs ',Parallel(ThisMatlab).user,'@',fInputVar.MasterName,':/',fInputVar.DyMo,' ~/MasterRemoteMirror_',fname,'_',int2str(whoiam)]);
|
||||
end
|
||||
|
||||
% Special hack for MH directory
|
||||
if isfield(fInputVar,'MhDirectoryName') & Parallel(ThisMatlab).Local==0,
|
||||
if isunix,
|
||||
fInputVar.MhDirectoryName = ['~/MasterRemoteMirror_',fname,'_',int2str(whoiam),'/',fInputVar.MhDirectoryName];
|
||||
|
@ -56,27 +73,24 @@ if isfield(fInputVar,'MhDirectoryName') & Parallel(ThisMatlab).Local==0,
|
|||
end
|
||||
|
||||
fInputVar.Parallel = Parallel;
|
||||
% lounch the routine to be run in parallel
|
||||
% Launch the routine to be run in parallel
|
||||
tic,
|
||||
fOutputVar = feval(fname, fInputVar ,fblck, nblck, whoiam, ThisMatlab);
|
||||
toc,
|
||||
if isfield(fOutputVar,'OutputFileName'),
|
||||
OutputFileName = fOutputVar.OutputFileName;
|
||||
% rmfield(fOutputVar,'OutputFileName');
|
||||
else
|
||||
OutputFileName = '';
|
||||
end
|
||||
|
||||
%%% Sincronismo "Esterno" %%%%%%%%%%%%%
|
||||
%%% Ogni Processo quando ha finito lo notifica cancellando un file ...
|
||||
% keyboard;
|
||||
if(whoiam)
|
||||
|
||||
% Save the output result
|
||||
save([ fname,'_output_',int2str(whoiam),'.mat'],'fOutputVar' )
|
||||
|
||||
|
||||
% Inform the master that the job is finished, and transfer the output data
|
||||
if Parallel(ThisMatlab).Local
|
||||
delete(['P_',fname,'_',int2str(whoiam),'End.txt']);
|
||||
|
||||
else
|
||||
if isunix,
|
||||
for j=1:size(OutputFileName,1),
|
||||
|
|
|
@ -1,4 +1,33 @@
|
|||
function [fOutVar,nBlockPerCPU, totCPU] = masterParallel(Parallel,fBlock,nBlock,NamFileInput,fname,fInputVar,fGlobalVar)
|
||||
% Top-level function called on the master computer when parallelizing a task.
|
||||
%
|
||||
% The number of parallelized threads will be equal to (nBlock-fBlock+1).
|
||||
%
|
||||
% INPUTS
|
||||
% Parallel [struct vector] copy of options_.parallel
|
||||
% fBlock [int] index number of the first thread
|
||||
% (between 1 and nBlock)
|
||||
% nBlock [int] index number of the last thread
|
||||
% NamFileInput [cell array] containins the list of input files to be
|
||||
% copied in the working directory of remote slaves
|
||||
% 2 columns, as many lines as there are files
|
||||
% - first column contains directory paths
|
||||
% - second column contains filenames
|
||||
% fname [string] name of the function to be parallelized, and
|
||||
% which will be run on the slaves
|
||||
% fInputVar [struct] structure containing local variables to be used
|
||||
% by fName on the slaves
|
||||
% fGlobalVar [struct] structure containing global variables to be used
|
||||
% by fName on the slaves
|
||||
%
|
||||
% OUTPUT
|
||||
% fOutVar [struct vector] result of the parallel computation, one
|
||||
% struct per thread
|
||||
% nBlockPerCPU [int vector] for each CPU used, indicates the number of
|
||||
% threads run on that CPU
|
||||
% totCPU [int] total number of CPU used (can be lower than
|
||||
% the number of CPU declared in "Parallel", if
|
||||
% the number of required threads is lower)
|
||||
|
||||
% Copyright (C) 2009 Dynare Team
|
||||
%
|
||||
|
@ -17,9 +46,9 @@ function [fOutVar,nBlockPerCPU, totCPU] = masterParallel(Parallel,fBlock,nBlock,
|
|||
% You should have received a copy of the GNU General Public License
|
||||
% along with Dynare. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
global options_
|
||||
|
||||
totCPU=0;
|
||||
|
||||
% Determine my hostname and my working directory
|
||||
DyMo=pwd;
|
||||
fInputVar.DyMo=DyMo;
|
||||
if isunix,
|
||||
|
@ -30,17 +59,20 @@ end
|
|||
MasterName=deblank(MasterName);
|
||||
fInputVar.MasterName = MasterName;
|
||||
|
||||
% Save input data for use by the slaves
|
||||
if exist('fGlobalVar'),
|
||||
save([fname,'_input.mat'],'fInputVar','fGlobalVar')
|
||||
else
|
||||
save([fname,'_input.mat'],'fInputVar')
|
||||
end
|
||||
save([fname,'_input.mat'],'Parallel','-append')
|
||||
|
||||
% Determine the total number of available CPUs, and the number of threads to run on each CPU
|
||||
for j=1:length(Parallel),
|
||||
nCPU(j)=length(Parallel(j).NumCPU);
|
||||
totCPU=totCPU+nCPU(j);
|
||||
end
|
||||
% keyboard;
|
||||
|
||||
nCPU=cumsum(nCPU);
|
||||
offset0 = fBlock-1;
|
||||
if (nBlock-offset0)>totCPU,
|
||||
|
@ -52,37 +84,11 @@ else
|
|||
totCPU = nBlock-offset0;
|
||||
end
|
||||
|
||||
% if totCPU==1,
|
||||
% if Parallel.Local == 1
|
||||
% State= system (['psexec -W ',DyMo, ' -a ',int2str(Parallel.NumCPU(1)),' -realtime matlab -nosplash -nodesktop -minimize -r fParallel(',int2str(fBlock),',',int2str(nBlock),',1,1,''',fname,''')']);
|
||||
%
|
||||
% else
|
||||
% if ~strcmp(Parallel.PcName,MasterName),
|
||||
% delete(['\\',Parallel(1).PcName,'\',Parallel(1).RemoteDrive,'$\',Parallel(1).RemoteFolder,'\*.*']);
|
||||
% adir=ls(['\\',Parallel(1).PcName,'\',Parallel(1).RemoteDrive,'$\',Parallel(1).RemoteFolder,'\']);
|
||||
% for j=3:size(adir,1)
|
||||
% rmdir(['\\',Parallel(1).PcName,'\',Parallel(1).RemoteDrive,'$\',Parallel(1).RemoteFolder,'\',adir(j,:)],'s')
|
||||
% end
|
||||
% end
|
||||
%
|
||||
% system (['xcopy ',fname,'_input.mat "\\',Parallel.PcName,'\',Parallel.RemoteDrive,'$\',Parallel.RemoteFolder,'" /Y']);
|
||||
% for j=1:size(NamFileInput,1)
|
||||
% copyfile([NamFileInput{j,1},NamFileInput{j,2}],['\\',Parallel(1).PcName,'\',Parallel(1).RemoteDrive,'$\',Parallel(1).RemoteFolder,'\',NamFileInput{j,1}])
|
||||
% end
|
||||
%
|
||||
%
|
||||
% State= system (['psexec \\',Parallel.PcName,' -e -u ',Parallel.user,' -p ',Parallel.passwd,' -W ',Parallel.RemoteDrive,':\',Parallel.RemoteFolder,'\ -a ',int2str(Parallel.NumCPU(1)),' -realtime matlab -nosplash -nodesktop -minimize -r fParallel(',int2str(fBlock),',',int2str(nBlock),',1,1,''',fname,''')']);
|
||||
%
|
||||
% system (['xcopy "\\',Parallel.PcName,'\',Parallel.RemoteDrive,'$\',Parallel.RemoteFolder,'\',fname,'_output_1.mat" /Y']);
|
||||
% for j=1:size(NamFileOutput,1)
|
||||
% copyfile(['\\',Parallel(1).PcName,'\',Parallel(1).RemoteDrive,'$\',Parallel(1).RemoteFolder,'\',NamFileOutput{j,1},NamFileOutput{j,2}],NamFileOutput{j,1})
|
||||
% end
|
||||
% end
|
||||
% load([fname,'_output_1.mat'],'fOutputVar');
|
||||
%
|
||||
% else
|
||||
% Clean up remnants of previous runs
|
||||
mydelete(['comp_status_',fname,'*.mat'])
|
||||
mydelete(['P_',fname,'*End.txt']);
|
||||
|
||||
% Create a shell script containing the commands to launch the required tasks on the slaves
|
||||
fid = fopen('ConcurrentCommand1.bat','w+');
|
||||
for j=1:totCPU,
|
||||
|
||||
|
@ -102,7 +108,6 @@ end
|
|||
if isunix,
|
||||
if exist('OCTAVE_VERSION')
|
||||
command1=['octave --eval fParallel\(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',\''',fname,'\''\) &'];
|
||||
% command1=['ssh localhost "cd ',DyMo, '; ',matlabroot,'/bin/matlab -nosplash -nodesktop -minimize -r fParallel\(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',\''',fname,'\''\);" &'];
|
||||
else
|
||||
command1=['matlab -nosplash -nodesktop -minimize -r fParallel\(',int2str(offset+1),',',int2str(sum(nBlockPerCPU(1:j))),',',int2str(j),',',int2str(indPC),',\''',fname,'\''\) &'];
|
||||
end
|
||||
|
@ -114,7 +119,6 @@ end
|
|||
end
|
||||
end
|
||||
else
|
||||
% keyboard;
|
||||
if isunix,
|
||||
[tempo, RemoteName]=system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' "ifconfig | grep \''inet addr:\''| grep -v \''127.0.0.1\'' | cut -d: -f2 | awk \''{ print $1}\''"']);
|
||||
RemoteName=RemoteName(1:end-1);
|
||||
|
@ -136,7 +140,6 @@ end
|
|||
if isunix,
|
||||
system(['ssh ',Parallel(indPC).user,'@',Parallel(indPC).PcName,' rm -fr ',Parallel(indPC).RemoteFolder,'/*']);
|
||||
else
|
||||
% delete(['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder,'\*.*']);
|
||||
mydelete('*.*',['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder,'\']);
|
||||
adir=dir(['\\',Parallel(indPC).PcName,'\',Parallel(indPC).RemoteDrive,'$\',Parallel(indPC).RemoteFolder,'\']);
|
||||
for jdir=3:length(adir)
|
||||
|
@ -189,32 +192,26 @@ end
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
||||
|
||||
end
|
||||
fprintf(fid,'%s\n',command1);
|
||||
end
|
||||
|
||||
fclose(fid);
|
||||
|
||||
% Run the slaves
|
||||
if isunix,
|
||||
system('sh ConcurrentCommand1.bat &');
|
||||
pause(1)
|
||||
else
|
||||
system('ConcurrentCommand1.bat');
|
||||
end
|
||||
|
||||
|
||||
% Wait for the slaves to finish their job, and display some progress information meanwhile
|
||||
t0=cputime;
|
||||
t00=cputime;
|
||||
hh=NaN(1,nBlock);
|
||||
if exist('OCTAVE_VERSION'),
|
||||
diary off;
|
||||
% frmt_done = '';
|
||||
% for j=1:totCPU,
|
||||
% frmt_done = [frmt_done,' %3.f%% '];
|
||||
% end
|
||||
else
|
||||
hfigstatus = figure('name',['Parallel ',fname],...
|
||||
'MenuBar', 'none', ...
|
||||
|
@ -235,8 +232,6 @@ end
|
|||
waitbarString = '';
|
||||
statusString = '';
|
||||
pause(1)
|
||||
% keyboard;
|
||||
% if (cputime-t0)>10,
|
||||
stax = dir(['comp_status_',fname,'*.mat']);
|
||||
for j=1:length(stax),
|
||||
|
||||
|
@ -249,23 +244,11 @@ end
|
|||
status_String{j} = waitbarString;
|
||||
status_Title{j} = waitbarTitle;
|
||||
idCPU(j) = njob;
|
||||
% idThisMatlab(j) = ThisMatlab;
|
||||
% idCPU(j) = min((find(cumBlockPerCPU>=njob)));
|
||||
end
|
||||
if prtfrc==1, delete(stax(j).name), end
|
||||
catch
|
||||
|
||||
end
|
||||
% if ~exist('OCTAVE_VERSION'),
|
||||
% if ishandle(hh(njob)),
|
||||
% waitbar(prtfrc,hh(njob),waitbarString);
|
||||
% if prtfrc==1, close(hh(njob)); end
|
||||
% else
|
||||
% hh(njob) = waitbar(0,waitbarString);
|
||||
% set(hh(njob),'Name',['Parallel ',waitbarTitle]);
|
||||
% end
|
||||
% end
|
||||
|
||||
end
|
||||
if exist('OCTAVE_VERSION'),
|
||||
printf([statusString,'\r'], 100 .* pcerdone);
|
||||
|
@ -273,55 +256,34 @@ end
|
|||
figure(hfigstatus),
|
||||
for j=1:length(stax),
|
||||
axes(hstatus(idCPU(j))),
|
||||
% delete(get(gca,'children'))
|
||||
hpat = findobj(hstatus(idCPU(j)),'Type','patch');
|
||||
if ~isempty(hpat),
|
||||
set(hpat,'XData',[0 0 pcerdone(j) pcerdone(j)])
|
||||
else
|
||||
patch([0 0 pcerdone(j) pcerdone(j)],[0 1 1 0],'r','EdgeColor','r')
|
||||
end
|
||||
% xlabel(status_String{j});
|
||||
title([status_Title{j},' - ',status_String{j}]);
|
||||
end
|
||||
end
|
||||
% disp(' ')
|
||||
% t0=cputime;
|
||||
% end
|
||||
if isempty(dir(['P_',fname,'_*End.txt']))
|
||||
mydelete(['comp_status_',fname,'*.mat'])
|
||||
if ~exist('OCTAVE_VERSION'),
|
||||
% for j=1:length(hh),
|
||||
% if ishandle(hh(j)),
|
||||
% close(hh(j))
|
||||
% end
|
||||
% end
|
||||
close(hfigstatus),
|
||||
else
|
||||
printf('\n');
|
||||
diary on;
|
||||
end
|
||||
|
||||
% for j=1:indPC,
|
||||
% if Parallel(j).Local==0 & ~strcmp(Parallel(indPC).PcName,MasterName),
|
||||
% for jfil = 1:size(NamFileOutput,1)
|
||||
% system (['xcopy "\\',Parallel(j).PcName,'\',Parallel(j).RemoteDrive,'$\',Parallel(j).RemoteFolder,'\',NamFileOutput{jfil,1},NamFileOutput{jfil,2},'" ' ,NamFileOutput{jfil,1},' /Y']);
|
||||
% end
|
||||
% system (['xcopy "\\',Parallel(j).PcName,'\',Parallel(j).RemoteDrive,'$\',Parallel(j).RemoteFolder,'\',fname,'_output_*.mat" /Y']);
|
||||
%
|
||||
% end
|
||||
% end
|
||||
break
|
||||
end
|
||||
end
|
||||
delete([fname,'_input.mat'])
|
||||
|
||||
% Create return value
|
||||
for j=1:totCPU,
|
||||
load([fname,'_output_',int2str(j),'.mat'],'fOutputVar');
|
||||
delete([fname,'_output_',int2str(j),'.mat']);
|
||||
fOutVar(j)=fOutputVar;
|
||||
|
||||
|
||||
end
|
||||
|
||||
delete ConcurrentCommand1.bat
|
||||
|
||||
% end
|
||||
% Cleanup
|
||||
delete([fname,'_input.mat'])
|
||||
delete ConcurrentCommand1.bat
|
||||
|
|
|
@ -1,4 +1,8 @@
|
|||
function struct2local(S),
|
||||
% The argument is a structure possibly containing several fields.
|
||||
% This function will create, in the workspace of the calling function,
|
||||
% as many variables as there are fields in the structure, assigning
|
||||
% them the value of the fields.
|
||||
|
||||
% Copyright (C) 2009 Dynare Team
|
||||
%
|
||||
|
|
Loading…
Reference in New Issue