Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
- index 302c8b1c3..811097440 100644
- --- a/libminifi/include/core/ProcessSession.h
- +++ b/libminifi/include/core/ProcessSession.h
- @@ -97,6 +97,8 @@ class ProcessSession : public ReferenceContainer {
- int read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
- // Execute the given write callback against the content
- void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
- + void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback& cb) { write(flow, &cb); }
- + void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback&& cb) { write(flow, &cb); }
- // Replace content with buffer
- void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer);
- // Execute the given write/append callback against the content
- diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
- index b3fafce4f..4247d01c9 100644
- --- a/libminifi/include/io/StreamPipe.h
- +++ b/libminifi/include/io/StreamPipe.h
- @@ -23,6 +23,7 @@
- #include <memory>
- #include <utility>
- #include "BaseStream.h"
- +#include "GeneralUtils.h"
- namespace org {
- namespace apache {
- @@ -37,13 +38,39 @@ class InputStreamCallback {
- virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
- };
- +
- class OutputStreamCallback {
- public:
- virtual ~OutputStreamCallback() = default;
- virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
- +
- };
- namespace internal {
- +template<typename Void, typename R, typename F, typename... Args>
- +struct is_invocable_r_impl : std::false_type {};
- +
- +template<typename R, typename F, typename... Args>
- +struct is_invocable_r_impl<typename std::enable_if<std::is_convertible<decltype(utils::invoke(std::declval<F>(), std::declval<Args>()...)), R>::value>::type, R, F, Args...> : std::true_type {};
- +
- +template<typename R, typename F, typename... Args>
- +using is_invocable_r = is_invocable_r_impl<void, F, Args...>;
- +
- +template<typename F>
- +struct CallableOutputStreamCallback : OutputStreamCallback {
- + static_assert(is_invocable_r<int64_t, F, const std::shared_ptr<io::BaseStream>&>::value, "CallableOutputStreamCallback expects a Callable of stream ptr to int64_t");
- +
- + int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
- + return f(stream);
- + }
- +
- + F f;
- +};
- +
- +template<typename F>
- +CallableOutputStreamCallback<F> outputStreamCallbackFromCallable(F&& callable) {
- + return {std::forward<F>(callable)};
- +}
- inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shared_ptr<io::OutputStream>& dst) {
- uint8_t buffer[4096U];
- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
- index e3ce3681a..cfaca971a 100644
- --- a/libminifi/src/core/ProcessSession.cpp
- +++ b/libminifi/src/core/ProcessSession.cpp
- @@ -242,15 +242,9 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
- }
- void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) {
- - struct BufferOutputStreamCallback : OutputStreamCallback {
- - explicit BufferOutputStreamCallback(gsl::span<const char> buffer) :buffer{buffer} {}
- - int64_t process(const std::shared_ptr<io::BaseStream>& stream) final {
- - return stream->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size());
- - }
- - gsl::span<const char> buffer;
- - };
- - BufferOutputStreamCallback cb{ buffer };
- - write(flow_file, &cb);
- + write(flow_file, minifi::internal::outputStreamCallbackFromCallable{[buffer](const std::shared_ptr<io::BaseStream>& stream) {
- + return stream->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size());
- + }});
- }
- void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement