make mh_recover robust to crashed parallel jobs

time-shift
Marco Ratto 2019-07-24 16:16:03 +02:00
parent 6b308ccbd8
commit da4baa5d50
4 changed files with 158 additions and 78 deletions

View File

@ -107,6 +107,18 @@ if nargin>8 && initialize==1
return
end
% Determine the total number of available CPUs, and the number of threads
% to run on each CPU.
[nCPU, totCPU, nBlockPerCPU, totSlaves] = distributeJobs(Parallel, fBlock, nBlock);
parallel_recover = 0;
if isfield(Parallel_info,'parallel_recover') && Parallel_info.parallel_recover
parallel_recover = 1;
end
if parallel_recover ==0
if isfield(Parallel_info,'local_files')
if isempty(NamFileInput)
NamFileInput=Parallel_info.local_files;
@ -147,9 +159,9 @@ if isHybridMatlabOctave || isoctave
end
end
if Strategy==1
totCPU=0;
end
% if Strategy==1
% totCPU=0;
% end
% Determine my hostname and my working directory.
@ -179,11 +191,6 @@ switch Strategy
closeSlave(Parallel,PRCDir,-1);
end
% Determine the total number of available CPUs, and the number of threads
% to run on each CPU.
[nCPU, totCPU, nBlockPerCPU, totSlaves] = distributeJobs(Parallel, fBlock, nBlock);
for j=1:totSlaves
PRCDirSnapshot{j}={};
end
@ -768,8 +775,13 @@ while (ForEver)
end
end
else
for j=1:totSlaves
PRCDirSnapshot{j}={};
end
flag_CloseAllSlaves = 0;
end
% Load and format remote output.
iscrash = 0;
PRCDirSnapshot=dynareParallelGetNewFiles(PRCDir,Parallel(1:totSlaves),PRCDirSnapshot);
@ -902,4 +914,4 @@ switch Strategy
end
end
end
end
end

View File

@ -117,7 +117,7 @@ end
% User doesn't want to use parallel computing, or wants to compute a
% single chain compute sequentially.
if isnumeric(options_.parallel) || (nblck-fblck)==0
if isnumeric(options_.parallel) || (~isempty(fblck) && (nblck-fblck)==0)
fout = posterior_sampler_core(localVars, fblck, nblck, 0);
record = fout.record;
% Parallel in Local or remote machine.
@ -142,6 +142,11 @@ else
end
% from where to get back results
% NamFileOutput(1,:) = {[M_.dname,'/metropolis/'],'*.*'};
if options_.mh_recover && isempty(fblck)
% here we just need to retrieve the output of the completed remote jobs
fblck=1;
options_.parallel_info.parallel_recover = 1;
end
[fout, nBlockPerCPU, totCPU] = masterParallel(options_.parallel, fblck, nblck,NamFileInput,'posterior_sampler_core', localVars, globalVars, options_.parallel_info);
for j=1:totCPU
offset = sum(nBlockPerCPU(1:j-1))+fblck-1;
@ -151,7 +156,7 @@ else
record.FunctionEvalPerIteration(offset+1:sum(nBlockPerCPU(1:j)))=fout(j).record.FunctionEvalPerIteration(offset+1:sum(nBlockPerCPU(1:j)));
record.LastSeeds(offset+1:sum(nBlockPerCPU(1:j)))=fout(j).record.LastSeeds(offset+1:sum(nBlockPerCPU(1:j)));
end
options_.parallel_info.parallel_recover = 0;
end
irun = fout(1).irun;

View File

@ -137,14 +137,37 @@ for curr_block = fblck:nblck
% If the state set by master is incompatible with the slave, we only reseed
set_dynare_seed(options_.DynareRandomStreams.seed+curr_block);
end
mh_recover_flag=0;
if (options_.load_mh_file~=0) && (fline(curr_block)>1) && OpenOldFile(curr_block) %load previous draws and likelihood
load([BaseName '_mh' int2str(NewFile(curr_block)) '_blck' int2str(curr_block) '.mat'])
x2 = [x2;zeros(InitSizeArray(curr_block)-fline(curr_block)+1,npar)];
logpo2 = [logpo2;zeros(InitSizeArray(curr_block)-fline(curr_block)+1,1)];
OpenOldFile(curr_block) = 0;
else
x2 = zeros(InitSizeArray(curr_block),npar);
logpo2 = zeros(InitSizeArray(curr_block),1);
if options_.mh_recover && exist([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat'],'file')==2
load([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat']);
draw_iter = size(neval_this_chain,2)+1;
draw_index_current_file = draw_iter;
feval_this_chain = sum(sum(neval_this_chain));
feval_this_file = sum(sum(neval_this_chain));
if feval_this_chain>draw_iter-1
% non Metropolis type of sampler
accepted_draws_this_chain = draw_iter-1;
accepted_draws_this_file = draw_iter-1;
else
accepted_draws_this_chain = 0;
accepted_draws_this_file = 0;
end
mh_recover_flag=1;
set_dynare_random_generator_state(LastSeeds.(['file' int2str(NewFile(curr_block))]).Unifor, LastSeeds.(['file' int2str(NewFile(curr_block))]).Normal);
last_draw(curr_block,:)=x2(draw_iter-1,:);
last_posterior(curr_block)=logpo2(draw_iter-1);
else
x2 = zeros(InitSizeArray(curr_block),npar);
logpo2 = zeros(InitSizeArray(curr_block),1);
end
end
%Prepare waiting bars
if whoiam
@ -158,13 +181,15 @@ for curr_block = fblck:nblck
hh = dyn_waitbar(0,[bar_title ' (' int2str(curr_block) '/' int2str(options_.mh_nblck) ')...']);
set(hh,'Name',bar_title);
end
accepted_draws_this_chain = 0;
accepted_draws_this_file = 0;
feval_this_chain = 0;
feval_this_file = 0;
draw_index_current_file = fline(curr_block); %get location of first draw in current block
draw_iter = 1;
if mh_recover_flag==0
accepted_draws_this_chain = 0;
accepted_draws_this_file = 0;
feval_this_chain = 0;
feval_this_file = 0;
draw_iter = 1;
draw_index_current_file = fline(curr_block); %get location of first draw in current block
end
sampler_options.curr_block = curr_block;
while draw_iter <= nruns(curr_block)
[par, logpost, accepted, neval] = posterior_sampler_iteration(TargetFun, last_draw(curr_block,:), last_posterior(curr_block), sampler_options,dataset_,dataset_info,options_,M_,estim_params_,bayestopt_,mh_bounds,oo_);
@ -173,6 +198,7 @@ for curr_block = fblck:nblck
last_draw(curr_block,:) = par;
logpo2(draw_index_current_file) = logpost;
last_posterior(curr_block) = logpost;
neval_this_chain(:, draw_iter) = neval;
feval_this_chain = feval_this_chain + sum(neval);
feval_this_file = feval_this_file + sum(neval);
accepted_draws_this_chain = accepted_draws_this_chain + accepted;
@ -187,7 +213,7 @@ for curr_block = fblck:nblck
end
if save_tmp_file
[LastSeeds.(['file' int2str(NewFile(curr_block))]).Unifor, LastSeeds.(['file' int2str(NewFile(curr_block))]).Normal] = get_dynare_random_generator_state();
save([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat'],'x2','logpo2','LastSeeds');
save([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat'],'x2','logpo2','LastSeeds','neval_this_chain','accepted_draws_this_chain','accepted_draws_this_file','feval_this_chain','feval_this_file');
end
end
if (draw_index_current_file == InitSizeArray(curr_block)) || (draw_iter == nruns(curr_block)) % Now I save the simulations, either because the current file is full or the chain is done
@ -195,7 +221,7 @@ for curr_block = fblck:nblck
if save_tmp_file
delete([BaseName '_mh_tmp_blck' int2str(curr_block) '.mat']);
end
save([BaseName '_mh' int2str(NewFile(curr_block)) '_blck' int2str(curr_block) '.mat'],'x2','logpo2','LastSeeds');
save([BaseName '_mh' int2str(NewFile(curr_block)) '_blck' int2str(curr_block) '.mat'],'x2','logpo2','LastSeeds','accepted_draws_this_chain','accepted_draws_this_file','feval_this_chain','feval_this_file');
fidlog = fopen([MetropolisFolder '/metropolis.log'],'a');
fprintf(fidlog,['\n']);
fprintf(fidlog,['%% Mh' int2str(NewFile(curr_block)) 'Blck' int2str(curr_block) ' (' datestr(now,0) ')\n']);
@ -239,10 +265,12 @@ for curr_block = fblck:nblck
draw_iter=draw_iter+1;
draw_index_current_file = draw_index_current_file + 1;
end% End of the simulations for one mh-block.
record.AcceptanceRatio(curr_block) = accepted_draws_this_chain/(draw_iter-1);
record.FunctionEvalPerIteration(curr_block) = feval_this_chain/(draw_iter-1);
dyn_waitbar_close(hh);
[record.LastSeeds(curr_block).Unifor, record.LastSeeds(curr_block).Normal] = get_dynare_random_generator_state();
if nruns(curr_block)
record.AcceptanceRatio(curr_block) = accepted_draws_this_chain/(draw_iter-1);
record.FunctionEvalPerIteration(curr_block) = feval_this_chain/(draw_iter-1);
[record.LastSeeds(curr_block).Unifor, record.LastSeeds(curr_block).Normal] = get_dynare_random_generator_state();
end
OutputFileName(block_iter,:) = {[MetropolisFolder,filesep], [ModelName '_mh*_blck' int2str(curr_block) '.mat']};
end% End of the loop over the mh-blocks.
@ -250,4 +278,4 @@ end% End of the loop over the mh-blocks.
myoutput.record = record;
myoutput.irun = draw_index_current_file;
myoutput.NewFile = NewFile;
myoutput.OutputFileName = OutputFileName;
myoutput.OutputFileName = OutputFileName;

View File

@ -376,28 +376,30 @@ elseif options_.mh_recover
ExpectedNumberOfMhFiles = ExpectedNumberOfMhFilesPerBlock*NumberOfBlocks;
% How many mh files do we actually have ?
AllMhFiles = dir([BaseName '_mh*_blck*.mat']);
TotalNumberOfMhFiles = length(AllMhFiles);
TotalNumberOfMhFiles = length(AllMhFiles)-length(dir([BaseName '_mh_tmp*_blck*.mat']));
% Quit if no crashed mcmc chain can be found as there are as many files as expected
if (TotalNumberOfMhFiles==ExpectedNumberOfMhFiles)
disp('Estimation::mcmc: It appears that you don''t need to use the mh_recover option!')
disp(' You have to edit the mod file and remove the mh_recover option')
disp(' in the estimation command')
error('Estimation::mcmc: mh_recover option not required!')
if (TotalNumberOfMhFiles==ExpectedNumberOfMhFiles)
if isnumeric(options_.parallel)
disp('Estimation::mcmc: It appears that you don''t need to use the mh_recover option!')
disp(' You have to edit the mod file and remove the mh_recover option')
disp(' in the estimation command')
error('Estimation::mcmc: mh_recover option not required!')
end
end
% 2. Something needs to be done; find out what
% Count the number of saved mh files per block.
NumberOfMhFilesPerBlock = zeros(NumberOfBlocks,1);
for b = 1:NumberOfBlocks
BlckMhFiles = dir([BaseName '_mh*_blck' int2str(b) '.mat']);
NumberOfMhFilesPerBlock(b) = length(BlckMhFiles);
NumberOfMhFilesPerBlock(b) = length(BlckMhFiles)-length(dir([BaseName '_mh_tmp*_blck' int2str(b) '.mat']));
end
% Find FirstBlock (First block), an integer targeting the crashed mcmc chain.
FirstBlock = 1; %initialize
FBlock = zeros(NumberOfBlocks,1);
while FirstBlock <= NumberOfBlocks
if NumberOfMhFilesPerBlock(FirstBlock) < ExpectedNumberOfMhFilesPerBlock
disp(['Estimation::mcmc: Chain ' int2str(FirstBlock) ' is not complete!'])
break
% The mh_recover session will start from chain FirstBlock.
FBlock(FirstBlock)=1;
else
disp(['Estimation::mcmc: Chain ' int2str(FirstBlock) ' is complete!'])
end
@ -406,51 +408,84 @@ elseif options_.mh_recover
%% 3. Overwrite default settings for
% How many mh-files are saved in this block?
NumberOfSavedMhFilesInTheCrashedBlck = NumberOfMhFilesPerBlock(FirstBlock);
ExistingDrawsInLastMCFile=0; %initialize: no MCMC draws of current MCMC are in file from last run
% Check whether last present file is a file included in the last MCMC run
if ~LastFileFullIndicator
if NumberOfSavedMhFilesInTheCrashedBlck==NewFile(FirstBlock) %only that last file exists, but no files from current MCMC
loaded_results=load([BaseName '_mh' int2str(NewFile(FirstBlock)) '_blck' int2str(FirstBlock) '.mat']);
%check whether that last file was filled
if size(loaded_results.x2,1)==MAX_nruns %file is full
NewFile(FirstBlock)=NewFile(FirstBlock)+1; %set first file to be created to next one
FirstLine(FirstBlock) = 1; %use first line of next file
ExistingDrawsInLastMCFile=MAX_nruns-record.MhDraws(end-1,3);
else
ExistingDrawsInLastMCFile=0;
ExistingDrawsInLastMCFile=zeros(NumberOfBlocks,1); %initialize: no MCMC draws of current MCMC are in file from last run
% Check whether last present file is a file included in the last MCMC run
update_record=0;
for k=1:NumberOfBlocks
FirstBlock = k;
if FBlock(k)
NumberOfSavedMhFilesInTheCrashedBlck=NumberOfMhFilesPerBlock(k);
if ~LastFileFullIndicator
if NumberOfSavedMhFilesInTheCrashedBlck==NewFile(FirstBlock) %only that last file exists, but no files from current MCMC
loaded_results=load([BaseName '_mh' int2str(NewFile(FirstBlock)) '_blck' int2str(FirstBlock) '.mat']);
%check whether that last file was filled
if size(loaded_results.x2,1)==MAX_nruns %file is full
NewFile(FirstBlock)=NewFile(FirstBlock)+1; %set first file to be created to next one
FirstLine(FirstBlock) = 1; %use first line of next file
ExistingDrawsInLastMCFile(FirstBlock)=MAX_nruns-record.MhDraws(end-1,3);
else
ExistingDrawsInLastMCFile(FirstBlock)=0;
end
end
elseif LastFileFullIndicator
ExistingDrawsInLastMCFile(FirstBlock)=0;
if NumberOfSavedMhFilesInTheCrashedBlck==NewFile(FirstBlock) %only the last file exists, but no files from current MCMC
NewFile(FirstBlock)=NewFile(FirstBlock)+1; %set first file to be created to next one
end
end
% % Correct the number of saved mh files if the crashed Metropolis was not the first session (so
% % that NumberOfSavedMhFilesInTheCrashedBlck is the number of saved mh files in the crashed chain
% % of the current session).
% if OldMhExists
% NumberOfSavedMhFilesInTheCrashedBlck = NumberOfSavedMhFilesInTheCrashedBlck - LastFileNumberInThePreviousMh;
% end
% NumberOfSavedMhFiles = NumberOfSavedMhFilesInTheCrashedBlck+LastFileNumberInThePreviousMh;
% Correct initial conditions.
if NumberOfSavedMhFilesInTheCrashedBlck>0 && NumberOfSavedMhFilesInTheCrashedBlck<ExpectedNumberOfMhFilesPerBlock
loaded_results=load([BaseName '_mh' int2str(NumberOfSavedMhFilesInTheCrashedBlck) '_blck' int2str(FirstBlock) '.mat']);
ilogpo2(FirstBlock) = loaded_results.logpo2(end);
ix2(FirstBlock,:) = loaded_results.x2(end,:);
nruns(FirstBlock)=nruns(FirstBlock)-ExistingDrawsInLastMCFile(FirstBlock)-(NumberOfSavedMhFilesInTheCrashedBlck-LastFileNumberInThePreviousMh)*MAX_nruns;
%reset seed if possible
if isfield(loaded_results,'LastSeeds')
record.InitialSeeds(FirstBlock).Unifor=loaded_results.LastSeeds.(['file' int2str(NumberOfSavedMhFilesInTheCrashedBlck)]).Unifor;
record.InitialSeeds(FirstBlock).Normal=loaded_results.LastSeeds.(['file' int2str(NumberOfSavedMhFilesInTheCrashedBlck)]).Normal;
else
fprintf('Estimation::mcmc: You are trying to recover a chain generated with an older Dynare version.\n')
fprintf('Estimation::mcmc: I am using the default seeds to continue the chain.\n')
end
if update_record
update_last_mh_history_file(MetropolisFolder, ModelName, record);
else
write_mh_history_file(MetropolisFolder, ModelName, record);
end
update_record=1;
end
end
elseif LastFileFullIndicator
ExistingDrawsInLastMCFile=0;
if NumberOfSavedMhFilesInTheCrashedBlck==NewFile(FirstBlock) %only the last file exists, but no files from current MCMC
NewFile(FirstBlock)=NewFile(FirstBlock)+1; %set first file to be created to next one
end
end
% % Correct the number of saved mh files if the crashed Metropolis was not the first session (so
% % that NumberOfSavedMhFilesInTheCrashedBlck is the number of saved mh files in the crashed chain
% % of the current session).
% if OldMhExists
% NumberOfSavedMhFilesInTheCrashedBlck = NumberOfSavedMhFilesInTheCrashedBlck - LastFileNumberInThePreviousMh;
% end
% NumberOfSavedMhFiles = NumberOfSavedMhFilesInTheCrashedBlck+LastFileNumberInThePreviousMh;
% Correct initial conditions.
if NumberOfSavedMhFilesInTheCrashedBlck<ExpectedNumberOfMhFilesPerBlock
loaded_results=load([BaseName '_mh' int2str(NumberOfSavedMhFilesInTheCrashedBlck) '_blck' int2str(FirstBlock) '.mat']);
ilogpo2(FirstBlock) = loaded_results.logpo2(end);
ix2(FirstBlock,:) = loaded_results.x2(end,:);
nruns(FirstBlock)=nruns(FirstBlock)-ExistingDrawsInLastMCFile-(NumberOfSavedMhFilesInTheCrashedBlck-LastFileNumberInThePreviousMh)*MAX_nruns;
%reset seed if possible
if isfield(loaded_results,'LastSeeds')
record.InitialSeeds(FirstBlock).Unifor=loaded_results.LastSeeds.(['file' int2str(NumberOfSavedMhFilesInTheCrashedBlck)]).Unifor;
record.InitialSeeds(FirstBlock).Normal=loaded_results.LastSeeds.(['file' int2str(NumberOfSavedMhFilesInTheCrashedBlck)]).Normal;
else
fprintf('Estimation::mcmc: You are trying to recover a chain generated with an older Dynare version.\n')
fprintf('Estimation::mcmc: I am using the default seeds to continue the chain.\n')
loaded_results=load([BaseName '_mh' int2str(ExpectedNumberOfMhFilesPerBlock) '_blck' int2str(FirstBlock) '.mat']);
ilogpo2(FirstBlock) = loaded_results.logpo2(end);
ix2(FirstBlock,:) = loaded_results.x2(end,:);
nruns(FirstBlock)=0;
%reset seed if possible
if isfield(loaded_results,'LastSeeds')
record.LastSeeds(FirstBlock).Unifor=loaded_results.LastSeeds.(['file' int2str(ExpectedNumberOfMhFilesPerBlock)]).Unifor;
record.LastSeeds(FirstBlock).Normal=loaded_results.LastSeeds.(['file' int2str(ExpectedNumberOfMhFilesPerBlock)]).Normal;
else
fprintf('Estimation::mcmc: You are trying to recover a chain generated with an older Dynare version.\n')
fprintf('Estimation::mcmc: I am using the default seeds to continue the chain.\n')
end
if isfield(loaded_results,'accepted_draws_this_chain')
record.AcceptanceRatio(FirstBlock)=loaded_results.accepted_draws_this_chain/record.MhDraws(end,1);
record.FunctionEvalPerIteration(FirstBlock)=loaded_results.accepted_draws_this_chain/record.MhDraws(end,1);
end
record.LastLogPost(FirstBlock)=loaded_results.logpo2(end);
record.LastParameters(FirstBlock,:)=loaded_results.x2(end,:);
update_last_mh_history_file(MetropolisFolder, ModelName, record);
end
write_mh_history_file(MetropolisFolder, ModelName, record);
end
FirstBlock = find(FBlock==1,1);
end
function [d,bayestopt_]=set_proposal_density_to_previous_value(record,options_,bayestopt_,d)