diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..6f4f495 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "C_Cpp.default.configurationProvider": "ms-vscode.cmake-tools", + "clangd.enable": false, + // "C_Cpp.intelliSenseEngine": "disabled" +} \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index fae5e92..8972d07 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,7 @@ project( LANGUAGES CXX) # dependencies +find_package(Boost REQUIRED COMPONENTS fiber headers) find_package(fmt REQUIRED) add_subdirectory(external/pipeline) add_subdirectory(external/pipes) @@ -15,5 +16,13 @@ add_library(pipe INTERFACE) target_compile_features(pipe INTERFACE cxx_std_23) target_include_directories(pipe INTERFACE include) +add_executable(pipeline_asio src/pipeline/main.cpp) +target_compile_features(pipeline_asio PUBLIC cxx_std_23) +target_link_libraries(pipeline_asio Boost::fiber Boost::headers fmt::fmt + pipeline::pipeline joboccara::pipes) + +# Specify the location of the header files +# target_include_directories(pipeline_asio PUBLIC include) + enable_testing() add_subdirectory(tests) diff --git a/include/freepipe/freepipe.hpp b/include/freepipe/freepipe.hpp index 2e99cc5..5621e34 100644 --- a/include/freepipe/freepipe.hpp +++ b/include/freepipe/freepipe.hpp @@ -4,7 +4,9 @@ #pragma once #include +#include #include +// #include #include #include #include @@ -14,19 +16,21 @@ namespace freepipe { template struct Task { - explicit constexpr Task(Args &&...args) + explicit constexpr Task(std::shared_ptr ioc, Args &&...args) requires(sizeof...(Args) > 0) - : args_(std::forward(args)...) {} + : ioc_(std::move(ioc)), args_(std::forward(args)...) {} - explicit constexpr Task() + explicit constexpr Task(std::shared_ptr ioc) requires(sizeof...(Args) == 0) - {} + : ioc_(std::move(ioc)) {} constexpr auto const &operator*() const & { return args_; } constexpr auto &operator*() & { return args_; } constexpr auto &&operator*() && { return std::move(args_); } constexpr auto const &&operator*() const && { return std::move(args_); } + std::shared_ptr ioc_; + private: std::tuple args_; }; @@ -37,9 +41,9 @@ constexpr auto operator|(Task const &task, F &&func) { if constexpr (std::is_void_v) { std::apply(std::forward(func), *task); - return Task<>{}; + return boost::asio::post(task.ioc_, Task<>{task.ioc_}); } else { - return Task{std::apply(std::forward(func), *task)}; + return Task{task.ioc_, std::apply(std::forward(func), *task)}; } } @@ -48,25 +52,100 @@ constexpr auto operator|(Task &&task, F &&func) { using R = std::invoke_result_t; if constexpr (std::is_void_v) { - std::apply(std::forward(func), *std::move(task)); - return Task<>{}; + boost::asio::post(*task.ioc_, [&] { std::apply(std::forward(func), std::move(*task)); }); + return Task<>{task.ioc_}; } else { - return Task{std::apply(std::forward(func), *std::move(task))}; + return Task{task.ioc_, std::apply(std::forward(func), std::move(*task))}; } } +template ... F> +constexpr auto operator|(Task const &task, std::tuple &&func) { + std::tuple...> ret; + + auto l = [&](auto &&f) { + using R = std::invoke_result_t; + + if constexpr (std::is_void_v) { + std::apply(std::forward(f), *task); + return; + } else { + return std::apply(std::forward(f), *task); + } + }; + + std::apply(l, func); + + return std::make_from_tuple(std::apply( + [&](F &&...f) { + return Task...>( + l(std::forward(f), std::index_sequence{})...); + }, + std::move(func))); + + // ( + // [&] { + // using R = std::invoke_result_t; + + // if constexpr (std::is_void_v) { + // std::apply(std::forward(func), *task); + // } else { + // std::get(ret) = std::apply(std::forward(func), *task); + // } + // }(), + // ...); + + // auto ans = std::apply( + // [](auto &&f) { + // return Task( + // if constexpr (std::is_void_v) { return Task<>{}; } else { + // return Task{f}; + // }, + // f...); + // }, + // std::move(func)); + + // return std::make_from_tuple(std::move(ret)); +} + +template ... F> +constexpr auto operator|(Task &&task, std::tuple &&func) { + std::tuple...> ret; + + ( + [&] { + using R = std::invoke_result_t; + + if constexpr (std::is_void_v) { + std::apply(std::forward(func), *std::move(task)); + } else { + std::get(ret) = std::apply(std::forward(func), *std::move(task)); + } + }(), + ...); + + return std::make_from_tuple(std::move(ret)); +} + template > constexpr auto operator|(Task const &task, std::basic_ostream &os) { std::apply([&os](auto &&...args) { ((os << args << '\n'), ...); }, *task); - return Task<>{}; + return Task<>{task.ioc_}; } template > constexpr auto operator|(Task &&task, std::basic_ostream &os) { - std::apply([&os](auto &&...args) { ((os << args << '\n'), ...); }, *std::move(task)); - return Task<>{}; + std::apply([&os](auto &&...args) { ((os << args << '\n'), ...); }, std::move(*task)); + return Task<>{task.ioc_}; } -class Pipe : public Task<> {}; +class Pipe : public Task<> { + // std::shared_ptr io_context_; + +public: + Pipe() : Task<>(std::make_shared()) {} + + explicit Pipe(std::shared_ptr io_context) : Task<>{io_context} {} +}; } // namespace freepipe diff --git a/src/pipeline/main.cpp b/src/pipeline/main.cpp new file mode 100644 index 0000000..5c27786 --- /dev/null +++ b/src/pipeline/main.cpp @@ -0,0 +1,33 @@ +#include +#include // IWYU pragma: keep + +#include +#include +#include +#include + +namespace { +auto do_function(int abc, int def, int /*c*/) -> int { return abc + def; } +} // namespace + +auto main() -> int { + using namespace pipeline; + + auto add = fn([](auto a, auto b) { return a + b; }); + auto double_it = fn([](auto a) { return a * 2; }); + auto square_it = fn([](auto a) { return a * a; }); + auto pipeline = add | double_it | square_it; + + std::cout << pipeline(3, 6) << "\n"; + std::cout << pipeline(4.5, 9.3) << "\n"; + + using namespace pipes; + + auto const source = std::vector{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + std::vector destination; + + source >>= filter([](int i) { return i % 2 == 0; }) >>= + transform([](int i) { return i * 2; }) >>= push_back(destination); + + fmt::print("Transformed vector: {}\n", destination); +} diff --git a/tests/pipe.test.cpp b/tests/pipe.test.cpp index 2a9ddac..67f87b5 100644 --- a/tests/pipe.test.cpp +++ b/tests/pipe.test.cpp @@ -1,11 +1,34 @@ +#include +#include #include #include + +#include #include +#include +#include namespace freepipe { -TEST(PipeTest, Single) { - Pipe p; +class PipeTest : public testing::Test { +protected: + void SetUp() override { + boost::asio::signal_set signals{*io_context_, SIGINT, SIGTERM}; + // signals.async_wait([this](auto, auto) { io_context_->stop(); }); + } + + void TearDown() override { + io_context_->run(); + // Any cleanup code can go here if needed + } + +protected: + std::shared_ptr io_context_ = + std::make_shared(); +}; + +TEST_F(PipeTest, Single) { + Pipe const p{io_context_}; auto r = p | [] { return 42; } | [](auto result) { @@ -22,4 +45,19 @@ TEST(PipeTest, Single) { EXPECT_EQ(*r, std::tuple{0}); } +// TEST_F(PipeTest, Fork) { +// Pipe p; + +// auto r = p | +// std::tuple{[] { return 42; } | [](auto exp) { EXPECT_EQ(exp, 42); }, [] { return 2; }} +// | +// [](auto life, auto mul) { return life * mul; } | +// std::tuple{std::cout, [](auto res) { return res; }} | [](auto res) { +// EXPECT_EQ(res, 84); +// return res; +// }; + +// EXPECT_EQ(*r, std::tuple{84}); +// } + } // namespace freepipe diff --git a/tests/pipeline.test.cpp b/tests/pipeline.test.cpp index 28ba026..191b219 100644 --- a/tests/pipeline.test.cpp +++ b/tests/pipeline.test.cpp @@ -8,7 +8,7 @@ namespace { -void BM_Pipeline(benchmark::State& state) { +void BM_Pipeline(benchmark::State &state) { using namespace pipeline; for (auto _ : state) { @@ -23,15 +23,36 @@ void BM_Pipeline(benchmark::State& state) { } BENCHMARK(BM_Pipeline); -void BM_Pipes(benchmark::State& state) { +void BM_Pipeline2(benchmark::State &state) { + using namespace pipeline; + + for (auto _ : state) { + std::vector ans0, ans1, ans2; + + auto add = fn([](auto a) { return std::tuple{std::get<0>(a) + 1, std::get<1>(a) + 2}; }); + auto push_back_0 = fn([&ans0](auto a) { ans0.push_back(a); }); + auto push_back_1 = fn([&ans1](auto a) { ans1.push_back(a); }); + auto push_back_2 = fn([&ans2](auto a) { ans2.push_back(a); }); + auto filter = fn([](auto a) { return a < 1; }); + + auto pipeline = add | unzip_into(push_back_0, fork_into(push_back_1, filter | push_back_2)); + + pipeline(std::tuple{1, 2}); + benchmark::DoNotOptimize(ans0); + benchmark::DoNotOptimize(ans1); + benchmark::DoNotOptimize(ans2); + } +} +BENCHMARK(BM_Pipeline2); + +void BM_Pipes(benchmark::State &state) { using namespace pipes; for (auto _ : state) { std::vector ans; mux(std::vector{0}, std::vector{1}) >>= - transform([](int a, int b) { return a + b; }) >>= - transform([](int a) { return a * 2; }) >>= + transform([](int a, int b) { return a + b; }) >>= transform([](int a) { return a * 2; }) >>= transform([](int a) { return a * a; }) >>= push_back(ans); benchmark::DoNotOptimize(ans); @@ -39,6 +60,25 @@ void BM_Pipes(benchmark::State& state) { } BENCHMARK(BM_Pipes); +void BM_Pipes2(benchmark::State &state) { + using namespace pipes; + + for (auto _ : state) { + std::vector ans0, ans1, ans2; + + std::vector>{std::tuple{1, 2}} >>= pipes::transform([](auto a) { + return std::tuple{std::get<0>(a) + 1, std::get<1>(a) + 2}; + }) >>= pipes::unzip(pipes::push_back(ans0), + pipes::fork(pipes::push_back(ans1), pipes::filter([](int a) { + return a < 1; + }) >>= pipes::push_back(ans2))); + benchmark::DoNotOptimize(ans0); + benchmark::DoNotOptimize(ans1); + benchmark::DoNotOptimize(ans2); + } +} +BENCHMARK(BM_Pipes2); + void BM_Pipe(benchmark::State &state) { using namespace freepipe;