use_dll: rewrite the parallel compilation engine with a fixed number of threads

The previous system would spawn as many threads as there are object files to be
compiled (which could lead to hundreds of threads for large block-decomposed
models). This could pose a memory usage problem (even when not just waiting,
threads require memory for their own stack).
master
Sébastien Villemot 2022-10-14 14:52:11 +02:00
parent e801f1a862
commit fd9902e87b
No known key found for this signature in database
GPG Key ID: 2CECE9350ECEBE4A
3 changed files with 100 additions and 48 deletions

View File

@ -23,6 +23,8 @@
#include <vector> #include <vector>
#include <string> #include <string>
#include <regex> #include <regex>
#include <thread>
#include <algorithm>
#include <cstdlib> #include <cstdlib>
@ -494,6 +496,9 @@ main(int argc, char **argv)
if (use_dll) if (use_dll)
mod_file->use_dll = true; mod_file->use_dll = true;
if (mod_file->use_dll)
ModelTree::initializeMEXCompilationWorkers(max(jthread::hardware_concurrency(), 1U));
if (json == JsonOutputPointType::parsing) if (json == JsonOutputPointType::parsing)
mod_file->writeJsonOutput(basename, json, json_output_mode, onlyjson); mod_file->writeJsonOutput(basename, json, json_output_mode, onlyjson);
@ -528,7 +533,8 @@ main(int argc, char **argv)
compilation (and is not printed in case of compilation failure); also compilation (and is not printed in case of compilation failure); also
avoids potential issues with destroying the thread synchronization avoids potential issues with destroying the thread synchronization
mechanism too soon. */ mechanism too soon. */
ModelTree::joinMEXCompilationThreads(); if (mod_file->use_dll)
ModelTree::terminateMEXCompilationWorkers();
cout << "Preprocessing completed." << endl; cout << "Preprocessing completed." << endl;
return EXIT_SUCCESS; return EXIT_SUCCESS;

View File

@ -37,10 +37,10 @@
#include <utility> #include <utility>
#include <algorithm> #include <algorithm>
vector<jthread> ModelTree::mex_compilation_threads {}; vector<jthread> ModelTree::mex_compilation_workers {};
condition_variable ModelTree::mex_compilation_cv; condition_variable ModelTree::mex_compilation_cv;
mutex ModelTree::mex_compilation_mut; mutex ModelTree::mex_compilation_mut;
unsigned int ModelTree::mex_compilation_available_processors {max(jthread::hardware_concurrency(), 1U)}; vector<tuple<filesystem::path, set<filesystem::path>, string>> ModelTree::mex_compilation_queue;
set<filesystem::path> ModelTree::mex_compilation_done; set<filesystem::path> ModelTree::mex_compilation_done;
void void
@ -1626,6 +1626,8 @@ ModelTree::findGccOnMacos(const string &mexext)
filesystem::path filesystem::path
ModelTree::compileMEX(const filesystem::path &output_dir, const string &output_basename, const string &mexext, const vector<filesystem::path> &input_files, const filesystem::path &matlabroot, const filesystem::path &dynareroot, bool link) const ModelTree::compileMEX(const filesystem::path &output_dir, const string &output_basename, const string &mexext, const vector<filesystem::path> &input_files, const filesystem::path &matlabroot, const filesystem::path &dynareroot, bool link) const
{ {
assert(!mex_compilation_workers.empty());
const string opt_flags = "-O3 -g0 --param ira-max-conflict-table-size=1 -fno-forward-propagate -fno-gcse -fno-dce -fno-dse -fno-tree-fre -fno-tree-pre -fno-tree-cselim -fno-tree-dse -fno-tree-dce -fno-tree-pta -fno-gcse-after-reload"; const string opt_flags = "-O3 -g0 --param ira-max-conflict-table-size=1 -fno-forward-propagate -fno-gcse -fno-dce -fno-dse -fno-tree-fre -fno-tree-pre -fno-tree-cselim -fno-tree-dse -fno-tree-dce -fno-tree-pta -fno-gcse-after-reload";
filesystem::path compiler; filesystem::path compiler;
@ -1769,37 +1771,10 @@ ModelTree::compileMEX(const filesystem::path &output_dir, const string &output_b
return p.extension() == ".o"; return p.extension() == ".o";
}); });
// std::ostringstream is not copyable, so capture a std::string unique_lock<mutex> lk {mex_compilation_mut};
string cmd_str { cmd.str() }; mex_compilation_queue.emplace_back(output_filename, prerequisites, cmd.str());
mex_compilation_threads.emplace_back([cmd_str, output_filename, prerequisites] lk.unlock();
{ mex_compilation_cv.notify_one();
/* Wait until a logical processor becomes available and all prerequisites
are done */
unique_lock<mutex> lk {mex_compilation_mut};
mex_compilation_cv.wait(lk, [prerequisites]
{
return mex_compilation_available_processors > 0 &&
includes(mex_compilation_done.begin(), mex_compilation_done.end(),
prerequisites.begin(), prerequisites.end());
});
// Signal to other threads that we have grabbed a logical processor
mex_compilation_available_processors--;
lk.unlock();
// Effectively compile
if (system(cmd_str.c_str()))
{
cerr << "Compilation failed" << endl;
exit(EXIT_FAILURE);
}
/* Signal to other threads that we have freed a logical processor and
completed a possible prerequisite */
lk.lock();
mex_compilation_available_processors++;
mex_compilation_done.insert(output_filename);
mex_compilation_cv.notify_all();
});
return output_filename; return output_filename;
} }
@ -1906,9 +1881,74 @@ ModelTree::writeBlockBytecodeAdditionalDerivatives([[maybe_unused]] BytecodeWrit
} }
void void
ModelTree::joinMEXCompilationThreads() ModelTree::initializeMEXCompilationWorkers(int numworkers)
{ {
for (auto &it : mex_compilation_threads) assert(numworkers > 0);
assert(mex_compilation_workers.empty());
cout << "Spawning " << numworkers << " threads for compiling MEX files." << endl;
for (int i {0}; i < numworkers; i++)
mex_compilation_workers.emplace_back([](stop_token stoken)
{
unique_lock<mutex> lk {mex_compilation_mut};
look_for_job:
for (auto it {mex_compilation_queue.begin()}; it != mex_compilation_queue.end(); ++it)
{
/* The following is a copy and not a reference, because we need it
after erasing it, and also after releasing the lock (at which
point the mex_compilation_queue may be modified by others). */
const auto [output, prerequisites, cmd] {*it};
if (includes(mex_compilation_done.begin(), mex_compilation_done.end(),
prerequisites.begin(), prerequisites.end()))
{
mex_compilation_queue.erase(it);
lk.unlock(); // After that point, the iterator may become invalid
if (system(cmd.c_str()))
{
cerr << "Compilation failed" << endl;
exit(EXIT_FAILURE);
}
lk.lock();
mex_compilation_done.insert(output);
/* The object just compiled may be a prerequisite for several
other objects, so notify all waiting workers. Also needed to
notify the main thread when in
ModelTree::terminateMEXCompilationWorkers(). */
mex_compilation_cv.notify_all();
goto look_for_job;
}
}
if (stoken.stop_requested())
return;
mex_compilation_cv.wait(lk);
goto look_for_job;
});
}
void
ModelTree::terminateMEXCompilationWorkers()
{
// Wait until the queue is empty
unique_lock<mutex> lk {mex_compilation_mut};
mex_compilation_cv.wait(lk, [] { return mex_compilation_queue.empty(); });
/* Request stop while still holding the lock, so we are sure that workers are
either compiling or waiting right now. Otherwise there could theoretically
be a race condition where the condition variable is notified just after
the thread has checked for its stoken, and just before it begins waiting;
this would be deadlock. */
for (auto &it : mex_compilation_workers)
it.request_stop();
lk.unlock();
mex_compilation_cv.notify_all();
for (auto &it : mex_compilation_workers)
it.join(); it.join();
} }

View File

@ -339,16 +339,19 @@ private:
/*! Maps endogenous type specific IDs to equation numbers */ /*! Maps endogenous type specific IDs to equation numbers */
vector<int> endo2eq; vector<int> endo2eq;
// Stores threads for compiling MEX files in parallel // Stores workers used for compiling MEX files in parallel
static vector<jthread> mex_compilation_threads; static vector<jthread> mex_compilation_workers;
/* The following variables implement the thread synchronization mechanism for /* The following variables implement the thread synchronization mechanism for
limiting the number of concurrent GCC processes and tracking dependencies limiting the number of concurrent GCC processes and tracking dependencies
between object files. */ between object files. */
static condition_variable mex_compilation_cv; static condition_variable mex_compilation_cv;
static mutex mex_compilation_mut; static mutex mex_compilation_mut;
static unsigned int mex_compilation_available_processors; /* Object/MEX files waiting to be compiled (with their prerequisites as 2nd
static set<filesystem::path> mex_compilation_done; // Object/MEX files already compiled element and compilation command as the 3rd element) */
static vector<tuple<filesystem::path, set<filesystem::path>, string>> mex_compilation_queue;
// Object/MEX files already compiled
static set<filesystem::path> mex_compilation_done;
/* Compute a pseudo-Jacobian whose all elements are either zero or one, /* Compute a pseudo-Jacobian whose all elements are either zero or one,
depending on whether the variable symbolically appears in the equation */ depending on whether the variable symbolically appears in the equation */
@ -499,12 +502,11 @@ private:
static string findGccOnMacos(const string &mexext); static string findGccOnMacos(const string &mexext);
#endif #endif
/* Compiles a MEX file (if link=true) or an object file to be linked later /* Compiles a MEX file (if link=true) or an object file to be linked later
into a MEX file (if link=false). The compilation is done in a separate into a MEX file (if link=false). The compilation is done in separate
asynchronous thread, so the call to this function is not blocking. The worker threads working in parallel, so the call to this function is not
number of concurrently running GCC processes is dynamically limited to the blocking. The dependency of a linked MEX file upon intermediary objects is
number of available logical processors. The dependency of a linked MEX nicely handled. Returns the name of the output file (to be reused later as
file upon intermediary objects is nicely handled. Returns the name of the input file if link=false). */
output file (to be reused later as input file if link=false). */
filesystem::path compileMEX(const filesystem::path &output_dir, const string &output_basename, const string &mexext, const vector<filesystem::path> &input_files, const filesystem::path &matlabroot, const filesystem::path &dynareroot, bool link = true) const; filesystem::path compileMEX(const filesystem::path &output_dir, const string &output_basename, const string &mexext, const vector<filesystem::path> &input_files, const filesystem::path &matlabroot, const filesystem::path &dynareroot, bool link = true) const;
public: public:
@ -552,8 +554,12 @@ public:
If no such equation can be found, throws an ExprNode::MatchFailureExpression */ If no such equation can be found, throws an ExprNode::MatchFailureExpression */
expr_t getRHSFromLHS(expr_t lhs) const; expr_t getRHSFromLHS(expr_t lhs) const;
// Calls join() on all MEX compilation threads // Initialize the MEX compilation workers
static void joinMEXCompilationThreads(); static void initializeMEXCompilationWorkers(int numworkers);
/* Terminates all MEX compilation workers (after they have emptied the
waiting queue) */
static void terminateMEXCompilationWorkers();
//! Returns all the equation tags associated to an equation //! Returns all the equation tags associated to an equation
map<string, string> map<string, string>