docs:tutorial:tp

In this tutorial, the RTLib ThreadPool facility will be presented.

Why using this facilty

The ThreadPool class exposes a small set of methods to easily manage a pool of concurrent threads, each executing the same code, allowing to activate only a fraction of these threads each computing cycle (in this case, the onRun method).

The interface

The ThreadPool class follows the Barbeque execution model, i.e. setup, [configuration, run]*, release. Here is a list of the exposed methods.

ThreadPool:Setup

The Setup method is needed to set the pool size and to choose the function which will be executed by each thread. It should be called during the Bbque onSetup method.

/**
 * @brief ThreadPool setup
 *
 * @param max_p Maximum parallelism, e.g. the pool threads number
 * @param fn Function to be run by each thread, already bound
 */
void Setup(int max_p, std::function<void ()> fn);

ThreadPool:Configure

The Configure method is needed to change, if needed, the arguments of the function which will be executed by each thread. If you want, you can even choose to execute a different function! It should be called during the Bbque onConfigure method.

/**
 * @brief Updates the function called by each thread
 *
 * @param fn Function to replace, already bound
 */
void Configure(std::function<void ()> fn);

ThreadPool:Start

The Start method is needed to run a certain amount of threads. It should be called during the Bbque onRun method. The synchronization of the threads is completely transparent to the user.

/**
 * @brief Starts the specified number of threads
 *
 * @param threads_number is the number of threads to start
 */
void Start(int threads_number);

ThreadPool:Start

The Release method is needed to release the threads and perform a clean-up. It should be called during the Bbque onRelease method. The synchronization of the threads is completely transparent to the user.

/**
 * @brief Releasing the threads
 */
void Release();

Usage example

Let's see a quick example. In this tutorial, we exploit a bbque-layapp generated application, which we called ThrPoolTutorial. If you followed the previous tutorials, you are quite familiar with the development workflow. Here is the application header.

/**
 *       @file  ThrPoolTutorial_exc.h
 *      @brief  The ThrPoolTutorial BarbequeRTRM application
 *
 * Description: to be done...
 *
 *     @author  Simone Libutti (slibutti), simone.libutti@polimi.it
 *
 *     Company  Politecnico di Milano
 *   Copyright  Copyright (c) 2014, Simone Libutti
 *
 * This source code is released for free distribution under the terms of the
 * GNU General Public License as published by the Free Software Foundation.
 * =====================================================================================
 */
 
#ifndef ThrPoolTutorial_EXC_H_
#define ThrPoolTutorial_EXC_H_
 
#include <bbque/bbque_exc.h>
#include <bbque/utils/threadpool.h>
 
using bbque::rtlib::BbqueEXC;
 
class ThrPoolTutorial : public BbqueEXC {
 
	// Max number of threads
	int parallelism;
	// Number of threads provided by the user (0 if bbque managed)
	int threads;
	// Current number of threads
	int threshold = 0;
 
	// The thread pool
	ThreadPool my_thr_pool;
 
	// Total jobs to perform
	int total_jobs;
	// Jobs currently performed
	int performed_jobs = 0;
 
public:
 
	/**
	 * @brief Constructor
	 */
	ThrPoolTutorial(std::string const & name,
			int tn,
			int jobs,
			std::string const & recipe,
			RTLIB_Services_t *rtlib);
 
	/**
	 * @brief Function run by the single thread
	 */
	void SingleThread(int);
 
private:
 
	RTLIB_ExitCode_t onSetup();
	RTLIB_ExitCode_t onConfigure(uint8_t awm_id);
	RTLIB_ExitCode_t onRun();
	RTLIB_ExitCode_t onMonitor();
	RTLIB_ExitCode_t onRelease();
 
};
 
#endif // ThrPoolTutorial_EXC_H_

As you can see, the ThrPoolTutorial constructor exploits two application arguments, which are:

  1. A number of threads defined by the user only if he does not want the thread number to be reconfigured during runtime
  2. A number of jobs to perform (each thread activated during the onRun method performs a single job)

Notice that the ThreadPool class is available if including <bbque/utils/threadpool.h>.

The member methods are the usual ones plus the SingleThread method, which is the function executed by each thread. Finally, a small set of variables are declared:

  • Parallelism is the maximum number of threads, i.e. the ThreadPool size
  • Threads is an argument of the application. Its default value is zero. If this value is grater than zero, the application will always run that number of threads, without regard of the current AWM
  • Threshold is the number of threads that sould be run each cycle, according to the current AWM
  • my_thr_pool is an instance of the ThreadPool class
  • total_jobs and performed_jobs are needed to track the current and total number of jobs to execute

This setup allows the application code to be developed in a more fluid way. Let's see the main file.

/**
 *       @file  ThrPoolTutorial_exc.cc
 *      @brief  The ThrPoolTutorial BarbequeRTRM application
 *
 * Description: to be done...
 *
 *     @author  Simone Libutti (slibutti), simone.libutti@polimi.it
 *
 *     Company  Politecnico di Milano
 *   Copyright  Copyright (c) 20XX, Simone Libutti
 *
 * This source code is released for free distribution under the terms of the
 * GNU General Public License as published by the Free Software Foundation.
 * =====================================================================================
 */
 
 
#include "ThrPoolTutorial_exc.h"
 
#include <cstdio>
#include <bbque/utils/utility.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
 
#include <bbque/monitors/operating_point.h>
namespace ba = bbque::rtlib::as;
extern ba::OperatingPointsList opList;
 
// Setup logging
#undef  BBQUE_LOG_MODULE
#define BBQUE_LOG_MODULE "aem.ThrPoolTutorial"
#undef  BBQUE_LOG_UID
#define BBQUE_LOG_UID GetChUid()
 
ThrPoolTutorial::ThrPoolTutorial(std::string const & name,
		int tn,
		int jobs,
		std::string const & recipe,
		RTLIB_Services_t *rtlib) :
	BbqueEXC(name, recipe, rtlib),
	parallelism(std::thread::hardware_concurrency()),
	threads(tn),
	total_jobs(jobs) {
 
	// Using user-defined threads number, if specified
	if ( threads > 0 ) {
		logger->Warn("Forcing threads number to %d", threads);
		parallelism = threads;
	}
	else logger->Notice("Max threads number is %d", parallelism);
 
	logger->Notice("%d jobs requested", total_jobs);
}
 
RTLIB_ExitCode_t ThrPoolTutorial::onSetup() {
 
	/*
	 * ThreadPool setup: sets the maximum threads number, and a bound
	 * function to be executed by each thread
	 */
	my_thr_pool.Setup(parallelism,
		std::bind(&ThrPoolTutorial::SingleThread, this, 0));
 
	return RTLIB_OK;
}
 
RTLIB_ExitCode_t ThrPoolTutorial::onConfigure(uint8_t awm_id) {
 
	// Number of remaining jobs to perform
	int remaining_jobs = total_jobs - performed_jobs;
 
	/*
	 * If threads > 0, the user chose a fixed threads number. Else, the one
	 * specified by the recipe will be used.
	 */
	if ( threads > 0 ) threshold = parallelism;
	else threshold = opList[awm_id].parameters["thread"];
 
	/*
	 *In any case, I'm not exploiting more threads than the number of
	 *remaining jobs
	 */
	if ( threshold < remaining_jobs ) threshold = remaining_jobs;
 
	logger->Warn("OnConfigure: Exploiting %d threads", threshold);
 
	/*
	 * You can change the threads function parameter, or even the function
	 * itself! Here I pass the awm_id value to the function, which will print it.
	 */
	my_thr_pool.Configure(std::bind(&ThrPoolTutorial::SingleThread, this, (int)awm_id));
 
	return RTLIB_OK;
}
 
RTLIB_ExitCode_t ThrPoolTutorial::onRun() {
 
	// Just start threshold threads, and enjoy!
	my_thr_pool.Start(threshold);
 
	return RTLIB_OK;
}
 
RTLIB_ExitCode_t ThrPoolTutorial::onMonitor() {
 
	logger->Notice("Cycle %d", Cycles());
 
	if ( performed_jobs == total_jobs ) {
		logger->Notice("Performed %d jobs in %d cycles", performed_jobs, Cycles());
		return RTLIB_EXC_WORKLOAD_NONE;
	}
 
	return RTLIB_OK;
}
 
RTLIB_ExitCode_t ThrPoolTutorial::onRelease(){
 
	// Stopping the threads and cleaning up
	my_thr_pool.Release();
	return RTLIB_OK;
 
}
 
/**
 * @brief Single cycle workload
 *
 * Each cycle, each thread either sleeps or performs this workload.
 */
void ThrPoolTutorial::SingleThread(int cn) {
 
	logger->Notice("Running with awmd_id %d", cn);
	performed_jobs++;
 
}

Here we won't comment the additional code needed to choose the number of threads to be executed (it is quite easy to understand, anyway). Let's focus on the ThreadPool usage:

  • onSetup(): here, you only have to Setup() the threadpool. Apart from this single line of code, no additional code is needed.
  • onConfigure(): here, you only have to Configure() the threadpool. Apart from this single line of code, no additional code is needed.
  • onRun(): In the onConfigure method, a threads number has been chosen (according to the current AWM id or to the fixed user-defined threads number). Now, you only have to Start() the threadpool. Apart from this single line of code, no additional code is needed.
  • onRelease(): As above. The threads are synchronized and stopped in a transparent way, with a single line of code.

Try to code a thread-pool application, and enjoy! If you want the complete example, here is the ThrPoolTutorial_main souce code, which in any case is the bbque-layapp generated code, with the addition of the code needed to take threads and jobs number from command line:

/**
 *       @file  ThrPoolTutorial_main.cc
 *      @brief  The ThrPoolTutorial BarbequeRTRM application
 *
 * Description: to be done...
 *
 *     @author  Simone Libutti (slibutti), simone.libutti@polimi.it
 *
 *     Company  Politecnico di Milano
 *   Copyright  Copyright (c) 20XX, Simone Libutti
 *
 * This source code is released for free distribution under the terms of the
 * GNU General Public License as published by the Free Software Foundation.
 * =====================================================================================
 */
 
#include <cstdio>
#include <iostream>
#include <random>
#include <cstring>
#include <memory>
 
#include <libgen.h>
 
#include <boost/program_options/options_description.hpp>
#include <boost/program_options/parsers.hpp>
#include <boost/program_options/variables_map.hpp>
 
#include "version.h"
#include "ThrPoolTutorial_exc.h"
#include <bbque/utils/utility.h>
 
// Setup logging
#undef  BBQUE_LOG_MODULE
#define BBQUE_LOG_MODULE "ThrPoolTutorial"
 
namespace po = boost::program_options;
 
/**
 * @brief A pointer to an EXC
 */
typedef std::shared_ptr<BbqueEXC> pBbqueEXC_t;
 
/**
 * The decription of each ThrPoolTutorial parameters
 */
po::options_description opts_desc("ThrPoolTutorial Configuration Options");
 
/**
 * The map of all ThrPoolTutorial parameters values
 */
po::variables_map opts_vm;
 
/**
 * The services exported by the RTLib
 */
RTLIB_Services_t *rtlib;
 
/**
 * @brief The recipe to use for all the EXCs
 */
std::string recipe;
 
/**
 * @brief The EXecution Context (EXC) registered
 */
pBbqueEXC_t pexc;
 
/**
 * @brief Number of threads. 0 if to be AWM dependant
 */
int threads;
 
/**
 * @brief Number of jobs to perform
 */
int jobs;
 
void ParseCommandLine(int argc, char *argv[]) {
	// Parse command line params
	try {
	po::store(po::parse_command_line(argc, argv, opts_desc), opts_vm);
	} catch(...) {
		std::cout << "Usage: " << argv[0] << " [options]\n";
		std::cout << opts_desc << std::endl;
		::exit(EXIT_FAILURE);
	}
	po::notify(opts_vm);
 
	// Check for help request
	if (opts_vm.count("help")) {
		std::cout << "Usage: " << argv[0] << " [options]\n";
		std::cout << opts_desc << std::endl;
		::exit(EXIT_SUCCESS);
	}
 
	// Check for version request
	if (opts_vm.count("version")) {
		std::cout << "ThrPoolTutorial (ver. " << g_git_version << ")\n";
		std::cout << "Copyright (C) 2011 Politecnico di Milano\n";
		std::cout << "\n";
		std::cout << "Built on " <<
			__DATE__ << " " <<
			__TIME__ << "\n";
		std::cout << "\n";
		std::cout << "This is free software; see the source for "
			"copying conditions.  There is NO\n";
		std::cout << "warranty; not even for MERCHANTABILITY or "
			"FITNESS FOR A PARTICULAR PURPOSE.";
		std::cout << "\n" << std::endl;
		::exit(EXIT_SUCCESS);
	}
}
 
int main(int argc, char *argv[]) {
 
	opts_desc.add_options()
		("help,h", "print this help message")
		("version,v", "print program version")
		("threads,t", po::value<int>(&threads)->
			default_value(0),
			"fixes the threads number, regardless of the AWM")
		("jobs,j", po::value<int>(&jobs)->
			default_value(100),
			"number of jobs to perform")
 
		("recipe,r", po::value<std::string>(&recipe)->
			default_value("ThrPoolTutorial"),
			"recipe name (for all EXCs)")
	;
 
	ParseCommandLine(argc, argv);
 
	// Welcome screen
	fprintf(stdout, FI(".:: ThrPoolTutorial (ver. %s) ::.\n"), g_git_version);
	fprintf(stdout, FI("Built: " __DATE__  " " __TIME__ "\n"));
 
 
	// Initializing the RTLib library and setup the communication channel
	// with the Barbeque RTRM
	fprintf(stderr, FI("STEP 0. Initializing RTLib, application [%s]...\n"),
			::basename(argv[0]));
	RTLIB_Init(::basename(argv[0]), &rtlib);
	assert(rtlib);
 
 
	fprintf(stderr, FI("STEP 1. Registering EXC using [%s] recipe...\n"),
			recipe.c_str());
	pexc = pBbqueEXC_t(new ThrPoolTutorial("ThrPoolTutorial", threads, jobs,
								recipe, rtlib));
	if (!pexc->isRegistered())
		return RTLIB_ERROR;
 
 
	fprintf(stderr, FI("STEP 2. Starting EXC control thread...\n"));
	pexc->Start();
 
 
	fprintf(stderr, FI("STEP 3. Waiting for EXC completion...\n"));
	pexc->WaitCompletion();
 
 
	fprintf(stderr, FI("STEP 4. Disabling EXC...\n"));
	pexc = NULL;
 
 
	fprintf(stderr, FI("===== ThrPoolTutorial DONE! =====\n"));
	return EXIT_SUCCESS;
 
}
docs/tutorial/tp.txt ยท Last modified: 2014/06/25 16:25 by slibutti

Page Tools