Skip to content
Commits on Source (6)
cmake_minimum_required(VERSION 3.2)
project(thread_pool)
include(GNUInstallDirs)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/bin)
......@@ -8,21 +10,31 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/bin)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -march=native")
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
option(thread_pool_build_tests "Build thread_pool unit tests" OFF)
add_library(thread_pool STATIC
src/thread_pool.cpp)
target_link_libraries(thread_pool Threads::Threads)
target_include_directories(thread_pool PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
install(TARGETS thread_pool DESTINATION lib)
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/thread_pool DESTINATION include)
install(TARGETS thread_pool DESTINATION ${CMAKE_INSTALL_LIBDIR})
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/thread_pool DESTINATION ${CMAKE_INSTALL_INCLUDEDIR})
if (thread_pool_build_tests)
add_executable(thread_pool_test test/thread_pool_test.cpp)
if (NOT TARGET gtest_main)
add_subdirectory(vendor/googletest/googletest EXCLUDE_FROM_ALL)
endif()
target_link_libraries(thread_pool_test thread_pool gtest_main)
endif(thread_pool_build_tests)
endif()
# Thread pool
[![Latest GitHub release](https://img.shields.io/github/release/rvaser/thread_pool.svg)](https://github.com/rvaser/thread_pool/releases/latest)
![image](https://travis-ci.org/rvaser/thread_pool.svg?branch=master)
A c++ thread pool implementation inspired by https://github.com/progschj/ThreadPool.
......@@ -15,10 +16,10 @@ Application uses following software:
## Instalation
CmakeLists is provided in the project root folder. By running the following commands:
By running the following commands:
```bash
git clone https://github.com/rvaser/thread_pool thread_pool
git clone https://github.com/rvaser/thread_pool.git thread_pool
cd thread_pool
mkdir build
cd build
......@@ -26,9 +27,11 @@ cmake -DCMAKE_BUILD_TYPE=Release ..
make
```
a library named libthread_pool.a will appear in `build/lib` directory. To link the library with your code, add `-Iinclude/ -Lbuild/lib -lthread_pool -lpthread --std=c++11` while compiling and include `thread_pool/thread_pool.hpp` in your desired source files. Optionally, you can run `sudo make install` to install thread_pool library to your machine which lets you exclude `-Iinclude/ -Lbuild/lib` while compiling.
a library named `libthread_pool.a` will appear in `build/lib` directory. Optionally, you can run `sudo make install` to install thread_pool library to your machine.
Alternatively, add the project to your CMakeLists.txt file with `add_subdirectory(vendor/thread_pool EXCLUDE_FROM_ALL)` and `target_link_libraries(your_exe thread_pool pthread)` commands.
Alternatively, add the project to your `CMakeLists.txt` file with `add_subdirectory(vendor/thread_pool EXCLUDE_FROM_ALL)` and `target_link_libraries(your_exe thread_pool)` commands.
To build unit tests run `git submodule update --init` and add `-Dthread_pool_build_tests=ON` while running `cmake`. After installation, an executable named `thread_pool_test` will be created in `build/bin`.
## Usage
......@@ -53,11 +56,10 @@ std::shared_ptr<thread_pool::ThreadPool> thread_pool =
// create storage for return values of function1 and function2
std::vector<std::future<int>> thread_futures;
for (int i = 0; i < num_tasks; ++i) {
for (std::uint32_t i = 0; i < num_tasks; ++i) {
// be sure to use std::ref() when passing references!
thread_futures.emplace_back(thread_pool->submit_task(function1,
std::ref(data), index, ...));
thread_futures.emplace_back(thread_pool->submit_task(function2, a, b));
thread_futures.emplace_back(thread_pool->submit(function1, std::ref(data), index, ...));
thread_futures.emplace_back(thread_pool->submit(function2, a, b));
}
// wait for threads to finish
......@@ -68,8 +70,8 @@ for (auto& it: thread_futures) {
// new set of tasks running function3
std::vector<std::future<void>> thread_futures2;
for (int i = 0; i < num_tasks2; ++i) {
thread_futures2.emplace_back(thread_pool->submit_task(function3));
for (std::uint32_t i = 0; i < num_tasks2; ++i) {
thread_futures2.emplace_back(thread_pool->submit(function3));
}
for (auto& it2: thread_futures2) {
it.wait();
......
libthread-pool (2.0.1-1) unstable; urgency=medium
* New upstream version
-- Andreas Tille <tille@debian.org> Tue, 17 Sep 2019 22:18:26 +0200
libthread-pool (1.0.0-4) unstable; urgency=medium
[ Helmut Grohne ]
......
......@@ -5,7 +5,7 @@ Last-Update: Sun, 26 Aug 2018 14:53:02 +0200
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -5,7 +5,7 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PRO
@@ -7,7 +7,7 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PRO
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/bin)
......@@ -13,4 +13,4 @@ Last-Update: Sun, 26 Aug 2018 14:53:02 +0200
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
......@@ -4,7 +4,7 @@ Description: Build shared and static lib
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -11,14 +11,22 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
@@ -17,7 +17,10 @@ find_package(Threads REQUIRED)
option(thread_pool_build_tests "Build thread_pool unit tests" OFF)
......@@ -15,7 +15,8 @@ Description: Build shared and static lib
+add_library(thread_pool_static STATIC
src/thread_pool.cpp)
target_include_directories(thread_pool PUBLIC
target_link_libraries(thread_pool Threads::Threads)
@@ -26,7 +29,12 @@ target_include_directories(thread_pool P
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>)
......@@ -23,8 +24,8 @@ Description: Build shared and static lib
+ $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
+ $<INSTALL_INTERFACE:include>)
+
install(TARGETS thread_pool DESTINATION lib)
install(TARGETS thread_pool DESTINATION ${CMAKE_INSTALL_LIBDIR})
+install(TARGETS thread_pool_static DESTINATION lib)
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/thread_pool DESTINATION include)
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/thread_pool DESTINATION ${CMAKE_INSTALL_INCLUDEDIR})
if (thread_pool_build_tests)
......@@ -9,11 +9,11 @@ Description: Add soversion
-project(thread_pool)
+project(thread_pool LANGUAGES CXX VERSION 1.0.0)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib)
@@ -17,6 +17,11 @@ add_library(thread_pool SHARED
add_library(thread_pool_static STATIC
src/thread_pool.cpp)
include(GNUInstallDirs)
@@ -25,6 +25,11 @@ add_library(thread_pool_static STATIC
target_link_libraries(thread_pool Threads::Threads)
+set_target_properties(thread_pool
+ PROPERTIES
......
......@@ -21,9 +21,10 @@ endif
override_dh_install:
dh_install
file-rename 's/_static\.a/.a/' `find debian -name "lib*_static.a"`
mv debian/tmp/usr/lib/*.a debian/tmp/usr/lib/$(DEB_BUILD_GNU_TYPE)
d-shlibmove --commit \
--multiarch \
--devunversioned \
--exclude-la \
--movedev debian/tmp/usr/include/* usr/include \
debian/tmp/usr/lib/*.so
debian/tmp/usr/lib/*/*.so
......@@ -6,9 +6,10 @@
#pragma once
#include <stdint.h>
#include <cstdint>
#include <memory>
#include <vector>
#include <string>
#include <queue>
#include <mutex>
#include <thread>
......@@ -19,40 +20,42 @@
namespace thread_pool {
static const std::string version = "v2.0.1";
class Semaphore;
std::unique_ptr<Semaphore> createSemaphore(uint32_t value);
std::unique_ptr<Semaphore> createSemaphore(std::uint32_t value);
class ThreadPool;
std::unique_ptr<ThreadPool> createThreadPool(uint32_t num_threads =
std::unique_ptr<ThreadPool> createThreadPool(std::uint32_t num_threads =
std::thread::hardware_concurrency() / 2);
class Semaphore {
public:
~Semaphore() = default;
uint32_t value() const {
std::uint32_t value() const {
return value_;
}
void wait();
void post();
friend std::unique_ptr<Semaphore> createSemaphore(uint32_t value);
friend std::unique_ptr<Semaphore> createSemaphore(std::uint32_t value);
private:
Semaphore(uint32_t value);
Semaphore(std::uint32_t value);
Semaphore(const Semaphore&) = delete;
const Semaphore& operator=(const Semaphore&) = delete;
std::mutex mutex_;
std::condition_variable condition_;
uint32_t value_;
std::uint32_t value_;
};
class ThreadPool {
public:
~ThreadPool();
uint32_t num_threads() const {
std::uint32_t num_threads() const {
return threads_.size();
}
......@@ -61,7 +64,7 @@ public:
}
template<typename T, typename... Ts>
auto submit_task(T&& routine, Ts&&... params)
auto submit(T&& routine, Ts&&... params)
-> std::future<typename std::result_of<T(Ts...)>::type> {
auto task = std::make_shared<std::packaged_task<typename std::result_of<T(Ts...)>::type()>>(
......@@ -82,9 +85,9 @@ public:
return task_result;
}
friend std::unique_ptr<ThreadPool> createThreadPool(uint32_t num_threads);
friend std::unique_ptr<ThreadPool> createThreadPool(std::uint32_t num_threads);
private:
ThreadPool(uint32_t num_threads);
ThreadPool(std::uint32_t num_threads);
ThreadPool(const ThreadPool&) = delete;
const ThreadPool& operator=(const ThreadPool&) = delete;
......
......@@ -4,26 +4,25 @@
* @brief ThreadPool class source file
*/
#include <stdlib.h>
#include <exception>
#include "thread_pool/thread_pool.hpp"
namespace thread_pool {
std::unique_ptr<Semaphore> createSemaphore(uint32_t value) {
std::unique_ptr<Semaphore> createSemaphore(std::uint32_t value) {
return std::unique_ptr<Semaphore>(new Semaphore(value));
}
std::unique_ptr<ThreadPool> createThreadPool(uint32_t num_threads) {
std::unique_ptr<ThreadPool> createThreadPool(std::uint32_t num_threads) {
if (num_threads == 0) {
fprintf(stderr, "[thread_pool::createThreadPool] error: "
throw std::invalid_argument("[thread_pool::createThreadPool] error: "
"invalid number of threads!");
exit(1);
}
return std::unique_ptr<ThreadPool>(new ThreadPool(num_threads));
}
Semaphore::Semaphore(uint32_t value)
Semaphore::Semaphore(std::uint32_t value)
: value_(value) {
}
......@@ -39,13 +38,13 @@ void Semaphore::wait() {
--value_;
}
ThreadPool::ThreadPool(uint32_t num_threads) {
ThreadPool::ThreadPool(std::uint32_t num_threads) {
queue_sem_ = createSemaphore(1);
active_sem_ = createSemaphore(0);
terminate_ = false;
for (uint32_t i = 0; i < num_threads; ++i) {
for (std::uint32_t i = 0; i < num_threads; ++i) {
threads_.emplace_back(ThreadPool::worker_thread, this);
thread_identifiers_.emplace_back(threads_.back().get_id());
}
......@@ -54,7 +53,7 @@ ThreadPool::ThreadPool(uint32_t num_threads) {
ThreadPool::~ThreadPool() {
terminate_ = true;
for (uint32_t i = 0; i < threads_.size(); ++i) {
for (std::uint32_t i = 0; i < threads_.size(); ++i) {
active_sem_->post();
}
for (auto& it: threads_) {
......
......@@ -22,29 +22,33 @@ public:
};
TEST(ThreadPoolTest_, CreateThreadPoolError) {
EXPECT_DEATH((thread_pool::createThreadPool(0)),
".thread_pool::createThreadPool. error: invalid number of threads!");
try {
auto thread_pool = thread_pool::createThreadPool(0);
} catch (std::invalid_argument& exception) {
EXPECT_STREQ(exception.what(), "[thread_pool::createThreadPool] error: "
"invalid number of threads!");
}
}
TEST_F(ThreadPoolTest, ParallelCalculation) {
std::vector<std::vector<uint32_t>> data(10);
std::vector<std::vector<std::uint32_t>> data(10);
for (auto& it: data) {
it.reserve(100000);
for (uint32_t i = 0; i < 100000; ++i) {
for (std::uint32_t i = 0; i < 100000; ++i) {
it.push_back(i);
}
}
auto do_some_calculation = [](std::vector<uint32_t>& src) -> void {
for (uint32_t i = 0; i < src.size() - 1; ++i) {
auto do_some_calculation = [](std::vector<std::uint32_t>& src) -> void {
for (std::uint32_t i = 0; i < src.size() - 1; ++i) {
src[i] = (src[i] * src[i + 1]) / (src[i] - src[i + 1] * 3);
}
};
std::vector<std::future<void>> thread_futures;
for (uint32_t i = 0; i < data.size(); ++i) {
thread_futures.emplace_back(thread_pool->submit_task(do_some_calculation,
for (std::uint32_t i = 0; i < data.size(); ++i) {
thread_futures.emplace_back(thread_pool->submit(do_some_calculation,
std::ref(data[i])));
}
......@@ -56,8 +60,8 @@ TEST_F(ThreadPoolTest, ParallelCalculation) {
TEST_F(ThreadPoolTest, ThreadIdentifiers) {
const auto& identifiers = thread_pool->thread_identifiers();
std::unordered_map<std::thread::id, uint32_t> thread_map;
uint32_t thread_id = 0;
std::unordered_map<std::thread::id, std::uint32_t> thread_map;
std::uint32_t thread_id = 0;
for (const auto& it: identifiers) {
thread_map[it] = thread_id++;
}
......@@ -67,7 +71,7 @@ TEST_F(ThreadPoolTest, ThreadIdentifiers) {
auto barrier = thread_pool::createSemaphore(0);
auto checkpoint = thread_pool::createSemaphore(0);
auto check_thread_id = [&barrier, &checkpoint](
std::unordered_map<std::thread::id, uint32_t>& thread_map) -> int32_t {
std::unordered_map<std::thread::id, std::uint32_t>& thread_map) -> std::int32_t {
checkpoint->post();
barrier->wait();
......@@ -78,27 +82,27 @@ TEST_F(ThreadPoolTest, ThreadIdentifiers) {
return -1;
};
std::vector<std::future<int32_t>> thread_futures;
for (uint32_t i = 0; i < thread_id; ++i) {
thread_futures.emplace_back(thread_pool->submit_task(check_thread_id,
std::vector<std::future<std::int32_t>> thread_futures;
for (std::uint32_t i = 0; i < thread_id; ++i) {
thread_futures.emplace_back(thread_pool->submit(check_thread_id,
std::ref(thread_map)));
}
for (uint32_t i = 0; i < thread_id; ++i) {
for (std::uint32_t i = 0; i < thread_id; ++i) {
checkpoint->wait();
}
for (uint32_t i = 0; i < thread_id; ++i) {
for (std::uint32_t i = 0; i < thread_id; ++i) {
barrier->post();
}
std::unordered_set<int32_t> thread_identifiers;
std::unordered_set<std::int32_t> thread_identifiers;
for (auto& it: thread_futures) {
it.wait();
thread_identifiers.emplace(it.get());
}
EXPECT_EQ(thread_id, thread_identifiers.size());
for (uint32_t i = 0; i < thread_id; ++i) {
for (std::uint32_t i = 0; i < thread_id; ++i) {
EXPECT_EQ(1U, thread_identifiers.count(i));
}
}