Advertisement
szaszm01

ProcessSession::write lvalue and rvalue ref overloads, create OutputStreamCallback from Callable

May 20th, 2021
1,192
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 3.93 KB | None | 0 0
  1. diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
  2. index 302c8b1c3..811097440 100644
  3. --- a/libminifi/include/core/ProcessSession.h
  4. +++ b/libminifi/include/core/ProcessSession.h
  5. @@ -97,6 +97,8 @@ class ProcessSession : public ReferenceContainer {
  6. int read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
  7. // Execute the given write callback against the content
  8. void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
  9. +  void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback& cb) { write(flow, &cb); }
  10. +  void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback&& cb) { write(flow, &cb); }
  11. // Replace content with buffer
  12. void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer);
  13. // Execute the given write/append callback against the content
  14. diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
  15. index b3fafce4f..4247d01c9 100644
  16. --- a/libminifi/include/io/StreamPipe.h
  17. +++ b/libminifi/include/io/StreamPipe.h
  18. @@ -23,6 +23,7 @@
  19. #include <memory>
  20. #include <utility>
  21. #include "BaseStream.h"
  22. +#include "GeneralUtils.h"
  23.  
  24. namespace org {
  25. namespace apache {
  26. @@ -37,13 +38,39 @@ class InputStreamCallback {
  27.  
  28. virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
  29. };
  30. +
  31. class OutputStreamCallback {
  32. public:
  33. virtual ~OutputStreamCallback() = default;
  34. virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
  35. +
  36. };
  37.  
  38. namespace internal {
  39. +template<typename Void, typename R, typename F, typename... Args>
  40. +struct is_invocable_r_impl : std::false_type {};
  41. +
  42. +template<typename R, typename F, typename... Args>
  43. +struct is_invocable_r_impl<typename std::enable_if<std::is_convertible<decltype(utils::invoke(std::declval<F>(), std::declval<Args>()...)), R>::value>::type, R, F, Args...> : std::true_type {};
  44. +
  45. +template<typename R, typename F, typename... Args>
  46. +using is_invocable_r = is_invocable_r_impl<void, F, Args...>;
  47. +
  48. +template<typename F>
  49. +struct CallableOutputStreamCallback : OutputStreamCallback {
  50. +  static_assert(is_invocable_r<int64_t, F, const std::shared_ptr<io::BaseStream>&>::value, "CallableOutputStreamCallback expects a Callable of stream ptr to int64_t");
  51. +
  52. +  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
  53. +    return f(stream);
  54. +  }
  55. +
  56. +  F f;
  57. +};
  58. +
  59. +template<typename F>
  60. +CallableOutputStreamCallback<F> outputStreamCallbackFromCallable(F&& callable) {
  61. +  return {std::forward<F>(callable)};
  62. +}
  63.  
  64. inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shared_ptr<io::OutputStream>& dst) {
  65. uint8_t buffer[4096U];
  66. diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
  67. index e3ce3681a..cfaca971a 100644
  68. --- a/libminifi/src/core/ProcessSession.cpp
  69. +++ b/libminifi/src/core/ProcessSession.cpp
  70. @@ -242,15 +242,9 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
  71. }
  72.  
  73. void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) {
  74. -  struct BufferOutputStreamCallback : OutputStreamCallback {
  75. -    explicit BufferOutputStreamCallback(gsl::span<const char> buffer) :buffer{buffer} {}
  76. -    int64_t process(const std::shared_ptr<io::BaseStream>& stream) final {
  77. -      return stream->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size());
  78. -    }
  79. -    gsl::span<const char> buffer;
  80. -  };
  81. -  BufferOutputStreamCallback cb{ buffer };
  82. -  write(flow_file, &cb);
  83. +  write(flow_file, minifi::internal::outputStreamCallbackFromCallable{[buffer](const std::shared_ptr<io::BaseStream>& stream) {
  84. +    return stream->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size());
  85. +  }});
  86. }
  87.  
  88. void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement