docs:tutorial:async

The Barbeque Abstract Execution Model (AEM) provides threads synchronization: during the execution of an application, in fact, each processing cycle corresponds to a distinct invocation of the onRun method. In other words, each cycle will end only if all the threads have finished their work and are ready for the next cycle. This is a common paradigm in stream processing applications. However, one could have the need to start one or multiple threads when the application starts, and make them run asynchronously from the “managed threads” until the end of the application execution. This could be true, for instance, in case of a result collection thread as opposed to a number of processing, Barbeque-managed, threads.

In this tutorial an example will be presented where a single thread, acting in this case as a producer, will be run asynchronously wrt a thread-pool of consumer threads. The thread-pool threads are synchronous, following the AEM and thus being run, monitored and then configured each cycle by the RTLib. The asynchronous thread, conversely, is free to run unmanaged until the end of the application execution. This freedom comes at a cost, though: if you don't provide the asynchronous thread with a few synchronization points where it can be stopped and then restarted with different parameters, the only kind of runtime re-configuration support will be the one offered by the Barbeque framework (resource usage limitation, resource mapping, etc), because the runtime parameters of the thread cannot be changed without stopping the thread itself.

This is exactly the above mentioned case: the asynchronous thread has not been provided with synchronization points, because we don't need to change its parameters during runtime.

Let's begin by generating a template application exploiting the bbque-layapp utility. The application will be named AsyncThread.

[BOSPShell BOSP] \> bbque-layapp
 
=== Building a new BBQ application from template ===
Enter application name (use CamelCase, no spaces allowed) [MyApp]: AsyncThread
Select application type (0: CPP) [0]: 0

The application structure is quite simple. We will instantiate an asynchronous thread acting as a producer and a synchronous thread-pool containing the consumers. While the consumer threads will be started and stopped each cycle, the producer will be started during the application setup, then stopped during the RTLib release method. Producer and consumers will access a shared LIFO buffer of fixed maximum size, the producer continually monitoring if the buffer needs additional elements, the consumers waiting for these elements to appear in the buffer.

We don't need to add arguments to the application. Thus, this time we will not touch the pre-generated AsyncThread_main.cc file.

/**
 *       @file  AsyncThread_exc.h
 *      @brief  The AsyncThread BarbequeRTRM application
 *
 * Description: AsyncThread BOSP tutorial
 *
 *     @author  Simone Libutti (slibutti), simone.libutti@polimi.it
 *
 *     Company  Politecnico di Milano
 *   Copyright  Copyright (c) 2014, Simone Libutti
 *
 * This source code is released for free distribution under the terms of the
 * GNU General Public License as published by the Free Software Foundation.
 * =====================================================================================
 */
 
#ifndef ASYNCTHREAD_EXC_H_
#define ASYNCTHREAD_EXC_H_
 
#include <bbque/bbque_exc.h>
#include <vector>
#include <bbque/utils/threadpool.h>
#include <algorithm>
 
using bbque::rtlib::BbqueEXC;
 
class Lifo {
 
	// My LIFO buffer
	std::vector<int> consumables;
	// Maximum items number in the buffer
	int max_consumables_size = 2;
 
	// Sync stuff
	std::mutex lifo_mtx;
	std::condition_variable pop_cv;
 
public:
 
	/**
	 * @brief Inserts an element into the buffer
	 */
	int Push(int element){
 
		// Acquire the lock
		std::unique_lock<std::mutex> lock(lifo_mtx);
 
		// If the buffer is full, exit
		if (consumables.size() >= max_consumables_size)
			return 0;
 
		// If the buffer is not full, add an element and notify it
		consumables.push_back(element);
		pop_cv.notify_one();
 
		// Return the element value for logging purposes
		return element;
 
	}
	/**
	 * @brief Pops and returns the last element from the buffer
	 */
	int Pop(){
 
		// Acquire the lock
		std::unique_lock<std::mutex> lock(lifo_mtx);
 
		// If the buffer is empty, wait
		if (consumables.size() == 0)
			pop_cv.wait(lock);
 
		// Extract the element
		int result = consumables.back();
		consumables.pop_back();
 
		// Return the element
		return result;
 
	}
 
};
 
class AsyncThread : public BbqueEXC {
 
	// Number of performed jobs
	int performed_jobs = 0;
	// True if the expected jobs number has been reached
	bool stop = false;
 
	// A buffer instance
	Lifo my_lifo;
 
	// The asyncronous thread
	std::thread async_thread;
 
	// The syncronous thread pool
	ThreadPool sync_threadpool;
	// Default number of threads activated in the onRun().
	int sync_threads_number =1;
 
public:
 
	// Constructor
	AsyncThread(std::string const & name,
			std::string const & recipe,
			RTLIB_Services_t *rtlib);
 
private:
 
	// RTLib
	RTLIB_ExitCode_t onSetup();
	RTLIB_ExitCode_t onConfigure(uint8_t awm_id);
	RTLIB_ExitCode_t onRun();
	RTLIB_ExitCode_t onMonitor();
	RTLIB_ExitCode_t onRelease();
 
	// Consumer and producer functions
	void Produce();
	void Consume();
 
};
 
#endif // ASYNCTHREAD_EXC_H_

Giving an eye to the AsyncThread class, you can see the definition of a single thread, which will be run asynchronously, and an instance of the ThreadPool class. Then, there are two privare methods to access the Lifo class instance: one to push an item in the LIFO, the other to consume an item from the LIFO.

/**
 *       @file  AsyncThread_exc.cc
 *      @brief  The AsyncThread BarbequeRTRM application
 *
 * Description: AsyncThread BOSP tutorial
 *
 *     @author  Simone Libutti (slibutti), simone.libutti@polimi.it
 *
 *     Company  Politecnico di Milano
 *   Copyright  Copyright (c) 2014, Simone Libutti
 *
 * This source code is released for free distribution under the terms of the
 * GNU General Public License as published by the Free Software Foundation.
 * =====================================================================================
 */
 
#include "AsyncThread_exc.h"
 
#include <cstdio>
#include <bbque/utils/utility.h>
 
AsyncThread::AsyncThread(std::string const & name,
		std::string const & recipe,
		RTLIB_Services_t *rtlib) :
	BbqueEXC(name, recipe, rtlib) {}
 
RTLIB_ExitCode_t AsyncThread::onSetup() {
 
	// Starting the asyncronous thread
	logger->Notice("AsyncThread::onSetup(): starting the asyncronous producer thread.");
	async_thread = std::thread(&AsyncThread::Produce, this);
 
	// Setting up the syncronous threads. The threadpool size is set,
	// in this case, to the number of system CPUs
	logger->Notice("AsyncThread::onSetup(): setting up the consumers threadpool.");
	sync_threadpool.Setup(std::thread::hardware_concurrency(),
		std::bind(&AsyncThread::Consume, this));
 
	return RTLIB_OK;
}
 
RTLIB_ExitCode_t AsyncThread::onConfigure(uint8_t awm_id) {
 
	// Configuring the application. In this case, for the sake of clarity,
	// I simply change the threads number according to the current awm_id
	int proposed = 3 + awm_id;
	int max = std::thread::hardware_concurrency();
 
	// sync_threads_number must not be grater than the threadpool size
	logger->Notice("AsyncThread::onConfigure(): Configuring the Consumer Threads");
	sync_threads_number = std::min(proposed, max);
 
	return RTLIB_OK;
}
 
RTLIB_ExitCode_t AsyncThread::onRun() {
 
	// Starting the syncronous threads
	logger->Notice("AsyncThread::onRun(): Running [%d] Consumer Threads", sync_threads_number);
	sync_threadpool.Start(sync_threads_number);
 
	return RTLIB_OK;
}
 
RTLIB_ExitCode_t AsyncThread::onMonitor() {
 
	logger->Notice("AsyncThread::onMonitor()  : cycle [%d] finished", Cycles());
 
	// Updating jobs number
	performed_jobs += sync_threads_number;
 
	// Return after 20 jobs had been performed
	if (performed_jobs >= 20)
		return RTLIB_EXC_WORKLOAD_NONE;
 
	return RTLIB_OK;
}
 
RTLIB_ExitCode_t AsyncThread::onRelease() {
 
	logger->Notice("AsyncThread::onRelease()  : waiting for async thread to stop");
	// Signalling the asyncronous thread to stop
	stop = true;
	async_thread.join();
 
	logger->Notice("AsyncThread::onRelease()  : waiting for sync threads to stop");
	// Signalling the syncronous threads to stop
	sync_threadpool.Release();
 
	return RTLIB_OK;
}
 
void AsyncThread::Produce() {
 
	while ( stop == false ) {
 
		// Pushing a number in [1..100] in the buffer
		int result = my_lifo.Push(random()%100 +1);
 
		// If result = 0, the buffer was already full
		if ( result != 0 )
			logger->Info("AsyncThread::Produce(): Produced a query: [%d]", result);
 
	}
 
	logger->Info("AsyncThread::Produce(): Stopping");
 
}
 
void AsyncThread::Consume() {
 
	// Extracting a number from the buffer
	int q = my_lifo.Pop();
	logger->Info("AsyncThread::Consume(): Consuming a query: [%d]", q);
 
}

The code is simple and well commented, so you should not have problems understanding it. As you can see, the best practice is to start the asynchronous thread in the setup phase. This thread will continue running without regard of the AEM. Just remember, at the end of the application execution, to exploit the onRelease method to perform a clean-up stopping all the running threads.

docs/tutorial/async.txt · Last modified: 2014/06/27 18:03 by slibutti

Page Tools