This commit is contained in:
Luís Murta 2025-08-10 17:30:51 +01:00
parent bd8e17b2ab
commit 640ffe9679
Signed by: satprog
GPG Key ID: 169EF1BBD7049F94
6 changed files with 223 additions and 19 deletions

5
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"C_Cpp.default.configurationProvider": "ms-vscode.cmake-tools",
"clangd.enable": false,
// "C_Cpp.intelliSenseEngine": "disabled"
}

View File

@ -5,6 +5,7 @@ project(
LANGUAGES CXX)
# dependencies
find_package(Boost REQUIRED COMPONENTS fiber headers)
find_package(fmt REQUIRED)
add_subdirectory(external/pipeline)
add_subdirectory(external/pipes)
@ -15,5 +16,13 @@ add_library(pipe INTERFACE)
target_compile_features(pipe INTERFACE cxx_std_23)
target_include_directories(pipe INTERFACE include)
add_executable(pipeline_asio src/pipeline/main.cpp)
target_compile_features(pipeline_asio PUBLIC cxx_std_23)
target_link_libraries(pipeline_asio Boost::fiber Boost::headers fmt::fmt
pipeline::pipeline joboccara::pipes)
# Specify the location of the header files
# target_include_directories(pipeline_asio PUBLIC include)
enable_testing()
add_subdirectory(tests)

View File

@ -4,7 +4,9 @@
#pragma once
#include <any>
#include <boost/asio.hpp>
#include <concepts>
// #include <expected>
#include <functional>
#include <optional>
#include <ostream>
@ -14,19 +16,21 @@
namespace freepipe {
template <typename... Args> struct Task {
explicit constexpr Task(Args &&...args)
explicit constexpr Task(std::shared_ptr<boost::asio::io_context> ioc, Args &&...args)
requires(sizeof...(Args) > 0)
: args_(std::forward<Args>(args)...) {}
: ioc_(std::move(ioc)), args_(std::forward<Args>(args)...) {}
explicit constexpr Task()
explicit constexpr Task(std::shared_ptr<boost::asio::io_context> ioc)
requires(sizeof...(Args) == 0)
{}
: ioc_(std::move(ioc)) {}
constexpr auto const &operator*() const & { return args_; }
constexpr auto &operator*() & { return args_; }
constexpr auto &&operator*() && { return std::move(args_); }
constexpr auto const &&operator*() const && { return std::move(args_); }
std::shared_ptr<boost::asio::io_context> ioc_;
private:
std::tuple<Args...> args_;
};
@ -37,9 +41,9 @@ constexpr auto operator|(Task<Args...> const &task, F &&func) {
if constexpr (std::is_void_v<R>) {
std::apply(std::forward<F>(func), *task);
return Task<>{};
return boost::asio::post(task.ioc_, Task<>{task.ioc_});
} else {
return Task<R>{std::apply(std::forward<F>(func), *task)};
return Task<R>{task.ioc_, std::apply(std::forward<F>(func), *task)};
}
}
@ -48,25 +52,100 @@ constexpr auto operator|(Task<Args...> &&task, F &&func) {
using R = std::invoke_result_t<F, Args...>;
if constexpr (std::is_void_v<R>) {
std::apply(std::forward<F>(func), *std::move(task));
return Task<>{};
boost::asio::post(*task.ioc_, [&] { std::apply(std::forward<F>(func), std::move(*task)); });
return Task<>{task.ioc_};
} else {
return Task<R>{std::apply(std::forward<F>(func), *std::move(task))};
return Task<R>{task.ioc_, std::apply(std::forward<F>(func), std::move(*task))};
}
}
template <typename... Args, std::regular_invocable<Args...>... F>
constexpr auto operator|(Task<Args...> const &task, std::tuple<F...> &&func) {
std::tuple<std::invoke_result_t<F, Args...>...> ret;
auto l = [&](auto &&f) {
using R = std::invoke_result_t<decltype(f), Args...>;
if constexpr (std::is_void_v<R>) {
std::apply(std::forward<decltype(f)>(f), *task);
return;
} else {
return std::apply(std::forward<decltype(f)>(f), *task);
}
};
std::apply(l, func);
return std::make_from_tuple<Task>(std::apply(
[&]<std::size_t... Is>(F &&...f) {
return Task<std::invoke_result_t<F, Args...>...>(
l(std::forward<F>(f), std::index_sequence<Is...>{})...);
},
std::move(func)));
// (
// [&] {
// using R = std::invoke_result_t<F, Args...>;
// if constexpr (std::is_void_v<R>) {
// std::apply(std::forward<F>(func), *task);
// } else {
// std::get<R>(ret) = std::apply(std::forward<F>(func), *task);
// }
// }(),
// ...);
// auto ans = std::apply(
// [](auto &&f) {
// return Task(
// if constexpr (std::is_void_v<decltype(f)>) { return Task<>{}; } else {
// return Task<decltype(f)>{f};
// },
// f...);
// },
// std::move(func));
// return std::make_from_tuple<Task>(std::move(ret));
}
template <typename... Args, std::regular_invocable<Args...>... F>
constexpr auto operator|(Task<Args...> &&task, std::tuple<F...> &&func) {
std::tuple<std::invoke_result_t<F, Args...>...> ret;
(
[&] {
using R = std::invoke_result_t<F, Args...>;
if constexpr (std::is_void_v<R>) {
std::apply(std::forward<F>(func), *std::move(task));
} else {
std::get<R>(ret) = std::apply(std::forward<F>(func), *std::move(task));
}
}(),
...);
return std::make_from_tuple<Task>(std::move(ret));
}
template <typename... Args, typename CharT, typename Traits = std::char_traits<CharT>>
constexpr auto operator|(Task<Args...> const &task, std::basic_ostream<CharT, Traits> &os) {
std::apply([&os](auto &&...args) { ((os << args << '\n'), ...); }, *task);
return Task<>{};
return Task<>{task.ioc_};
}
template <typename... Args, typename CharT, typename Traits = std::char_traits<CharT>>
constexpr auto operator|(Task<Args...> &&task, std::basic_ostream<CharT, Traits> &os) {
std::apply([&os](auto &&...args) { ((os << args << '\n'), ...); }, *std::move(task));
return Task<>{};
std::apply([&os](auto &&...args) { ((os << args << '\n'), ...); }, std::move(*task));
return Task<>{task.ioc_};
}
class Pipe : public Task<> {};
class Pipe : public Task<> {
// std::shared_ptr<boost::asio::io_context> io_context_;
public:
Pipe() : Task<>(std::make_shared<boost::asio::io_context>()) {}
explicit Pipe(std::shared_ptr<boost::asio::io_context> io_context) : Task<>{io_context} {}
};
} // namespace freepipe

33
src/pipeline/main.cpp Normal file
View File

@ -0,0 +1,33 @@
#include <fmt/base.h>
#include <fmt/ranges.h> // IWYU pragma: keep
#include <iostream>
#include <pipeline/pipeline.hpp>
#include <pipes/pipes.hpp>
#include <vector>
namespace {
auto do_function(int abc, int def, int /*c*/) -> int { return abc + def; }
} // namespace
auto main() -> int {
using namespace pipeline;
auto add = fn([](auto a, auto b) { return a + b; });
auto double_it = fn([](auto a) { return a * 2; });
auto square_it = fn([](auto a) { return a * a; });
auto pipeline = add | double_it | square_it;
std::cout << pipeline(3, 6) << "\n";
std::cout << pipeline(4.5, 9.3) << "\n";
using namespace pipes;
auto const source = std::vector<int>{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
std::vector<int> destination;
source >>= filter([](int i) { return i % 2 == 0; }) >>=
transform([](int i) { return i * 2; }) >>= push_back(destination);
fmt::print("Transformed vector: {}\n", destination);
}

View File

@ -1,11 +1,34 @@
#include <boost/asio/io_context.hpp>
#include <boost/asio/signal_set.hpp>
#include <freepipe/freepipe.hpp>
#include <gtest/gtest.h>
#include <csignal>
#include <iostream>
#include <memory>
#include <tuple>
namespace freepipe {
TEST(PipeTest, Single) {
Pipe p;
class PipeTest : public testing::Test {
protected:
void SetUp() override {
boost::asio::signal_set signals{*io_context_, SIGINT, SIGTERM};
// signals.async_wait([this](auto, auto) { io_context_->stop(); });
}
void TearDown() override {
io_context_->run();
// Any cleanup code can go here if needed
}
protected:
std::shared_ptr<boost::asio::io_context> io_context_ =
std::make_shared<boost::asio::io_context>();
};
TEST_F(PipeTest, Single) {
Pipe const p{io_context_};
auto r = p | [] { return 42; } |
[](auto result) {
@ -22,4 +45,19 @@ TEST(PipeTest, Single) {
EXPECT_EQ(*r, std::tuple{0});
}
// TEST_F(PipeTest, Fork) {
// Pipe p;
// auto r = p |
// std::tuple{[] { return 42; } | [](auto exp) { EXPECT_EQ(exp, 42); }, [] { return 2; }}
// |
// [](auto life, auto mul) { return life * mul; } |
// std::tuple{std::cout, [](auto res) { return res; }} | [](auto res) {
// EXPECT_EQ(res, 84);
// return res;
// };
// EXPECT_EQ(*r, std::tuple{84});
// }
} // namespace freepipe

View File

@ -8,7 +8,7 @@
namespace {
void BM_Pipeline(benchmark::State& state) {
void BM_Pipeline(benchmark::State &state) {
using namespace pipeline;
for (auto _ : state) {
@ -23,15 +23,36 @@ void BM_Pipeline(benchmark::State& state) {
}
BENCHMARK(BM_Pipeline);
void BM_Pipes(benchmark::State& state) {
void BM_Pipeline2(benchmark::State &state) {
using namespace pipeline;
for (auto _ : state) {
std::vector<int> ans0, ans1, ans2;
auto add = fn([](auto a) { return std::tuple{std::get<0>(a) + 1, std::get<1>(a) + 2}; });
auto push_back_0 = fn([&ans0](auto a) { ans0.push_back(a); });
auto push_back_1 = fn([&ans1](auto a) { ans1.push_back(a); });
auto push_back_2 = fn([&ans2](auto a) { ans2.push_back(a); });
auto filter = fn([](auto a) { return a < 1; });
auto pipeline = add | unzip_into(push_back_0, fork_into(push_back_1, filter | push_back_2));
pipeline(std::tuple{1, 2});
benchmark::DoNotOptimize(ans0);
benchmark::DoNotOptimize(ans1);
benchmark::DoNotOptimize(ans2);
}
}
BENCHMARK(BM_Pipeline2);
void BM_Pipes(benchmark::State &state) {
using namespace pipes;
for (auto _ : state) {
std::vector<int> ans;
mux(std::vector<int>{0}, std::vector<int>{1}) >>=
transform([](int a, int b) { return a + b; }) >>=
transform([](int a) { return a * 2; }) >>=
transform([](int a, int b) { return a + b; }) >>= transform([](int a) { return a * 2; }) >>=
transform([](int a) { return a * a; }) >>= push_back(ans);
benchmark::DoNotOptimize(ans);
@ -39,6 +60,25 @@ void BM_Pipes(benchmark::State& state) {
}
BENCHMARK(BM_Pipes);
void BM_Pipes2(benchmark::State &state) {
using namespace pipes;
for (auto _ : state) {
std::vector<int> ans0, ans1, ans2;
std::vector<std::tuple<int, int>>{std::tuple{1, 2}} >>= pipes::transform([](auto a) {
return std::tuple{std::get<0>(a) + 1, std::get<1>(a) + 2};
}) >>= pipes::unzip(pipes::push_back(ans0),
pipes::fork(pipes::push_back(ans1), pipes::filter([](int a) {
return a < 1;
}) >>= pipes::push_back(ans2)));
benchmark::DoNotOptimize(ans0);
benchmark::DoNotOptimize(ans1);
benchmark::DoNotOptimize(ans2);
}
}
BENCHMARK(BM_Pipes2);
void BM_Pipe(benchmark::State &state) {
using namespace freepipe;