AWS SDK for C++

AWS SDK for C++ Version 1.11.612

Loading...
Searching...
No Matches
Queue.h
1
5#pragma once
6
7#include <aws/core/client/ClientConfiguration.h>
8#include <aws/queues/Queues_EXPORTS.h>
9#include <chrono>
10#include <thread>
11#include <atomic>
12#include <functional>
13
14namespace Aws
15{
16 namespace Queues
17 {
18 static const char* MEM_TAG = "Aws::Queues::Queue";
19
24 template<typename MESSAGE_TYPE>
25 class Queue
26 {
27 typedef std::function<void(const Queue*, const MESSAGE_TYPE&, bool&)> MessageReceivedEventHandler;
28 typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageDeleteFailedEventHandler;
29 typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageDeleteSuccessEventHandler;
30 typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageSendFailedEventHandler;
31 typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageSendSuccessEventHandler;
32
33 public:
40 Queue(unsigned pollingFrequency) :
41 m_continue(true), m_pollingFrequencyMs(pollingFrequency), m_pollingThread(nullptr)
42 {
43 }
44
45 virtual ~Queue()
46 {
48 }
49
50 virtual MESSAGE_TYPE Top() const = 0;
51 virtual void Delete(const MESSAGE_TYPE&) = 0;
52 virtual void Push(const MESSAGE_TYPE&) = 0;
53
60 {
61 if(!m_pollingThread)
62 {
63 m_continue = true;
64 m_pollingThread = Aws::MakeUnique<std::thread>(MEM_TAG, &Queue::Main, this);
65 }
66 }
67
75 {
76 m_continue = false;
77 if(m_pollingThread)
78 {
79 m_pollingThread->join();
80 m_pollingThread = nullptr;
81 }
82 }
83
84 inline void SetMessageReceivedEventHandler(const MessageReceivedEventHandler& messageHandler) { m_messageReceivedHandler = messageHandler; }
85 inline void SetMessageDeleteFailedEventHandler(const MessageDeleteFailedEventHandler& messageHandler) { m_messageDeleteFailedHandler = messageHandler; }
86 inline void SetMessageDeleteSuccessEventHandler(const MessageDeleteSuccessEventHandler& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; }
87 inline void SetMessageSendFailedEventHandler(const MessageSendFailedEventHandler& messageHandler) { m_messageSendFailedHandler = messageHandler; }
88 inline void SetMessageSendSuccessEventHandler(const MessageSendSuccessEventHandler& messageHandler) { m_messageSendSuccessHandler = messageHandler; }
89
90 inline void SetMessageReceivedEventHandler(MessageReceivedEventHandler&& messageHandler) { m_messageReceivedHandler = messageHandler; }
91 inline void SetMessageDeleteFailedEventHandler(MessageDeleteFailedEventHandler&& messageHandler) { m_messageDeleteFailedHandler = messageHandler; }
92 inline void SetMessageDeleteSuccessEventHandler(MessageDeleteSuccessEventHandler&& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; }
93 inline void SetMessageSendFailedEventHandler(MessageSendFailedEventHandler&& messageHandler) { m_messageSendFailedHandler = messageHandler; }
94 inline void SetMessageSendSuccessEventHandler(MessageSendSuccessEventHandler&& messageHandler) { m_messageSendSuccessHandler = messageHandler; }
95
96 inline const MessageReceivedEventHandler& GetMessageReceivedEventHandler() const { return m_messageReceivedHandler; }
97 inline const MessageDeleteFailedEventHandler& GetMessageDeleteFailedEventHandler() const { return m_messageDeleteFailedHandler; }
98 inline const MessageDeleteSuccessEventHandler& GetMessageDeleteSuccessEventHandler() const { return m_messageDeleteSuccessHandler; }
99 inline const MessageSendFailedEventHandler& GetMessageSendFailedEventHandler() const { return m_messageSendFailedHandler; }
100 inline const MessageSendSuccessEventHandler& GetMessageSendSuccessEventHandler() const { return m_messageSendSuccessHandler; }
101
102 protected:
103 std::atomic<bool> m_continue;
104
105 private:
106 void Main()
107 {
108 while(m_continue)
109 {
110 auto start = std::chrono::system_clock::now();
112 bool deleteMessage = false;
113
115 if (receivedHandler)
116 {
118 }
119
120 if (deleteMessage)
121 {
123 }
124
125 if(m_continue)
126 {
127 auto stop = std::chrono::system_clock::now();
128 auto timeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(stop - start);
129
130 if (m_pollingFrequencyMs >= timeTaken.count())
131 {
132 std::this_thread::sleep_for(std::chrono::milliseconds(m_pollingFrequencyMs - timeTaken.count()));
133 }
134 }
135 }
136 }
137
138 unsigned m_pollingFrequencyMs;
139 Aws::UniquePtr<std::thread> m_pollingThread;
140
141 // Handlers
142 MessageReceivedEventHandler m_messageReceivedHandler;
143 MessageDeleteFailedEventHandler m_messageDeleteFailedHandler;
144 MessageDeleteSuccessEventHandler m_messageDeleteSuccessHandler;
145 MessageSendFailedEventHandler m_messageSendFailedHandler;
146 MessageSendSuccessEventHandler m_messageSendSuccessHandler;
147 };
148 }
149}
const MessageDeleteFailedEventHandler & GetMessageDeleteFailedEventHandler() const
Definition Queue.h:97
void SetMessageDeleteSuccessEventHandler(const MessageDeleteSuccessEventHandler &messageHandler)
Definition Queue.h:86
const MessageSendFailedEventHandler & GetMessageSendFailedEventHandler() const
Definition Queue.h:99
void StopPolling()
Definition Queue.h:74
const MessageDeleteSuccessEventHandler & GetMessageDeleteSuccessEventHandler() const
Definition Queue.h:98
virtual MESSAGE_TYPE Top() const =0
void SetMessageSendFailedEventHandler(const MessageSendFailedEventHandler &messageHandler)
Definition Queue.h:87
const MessageSendSuccessEventHandler & GetMessageSendSuccessEventHandler() const
Definition Queue.h:100
virtual void Delete(const MESSAGE_TYPE &)=0
void StartPolling()
Definition Queue.h:59
void SetMessageSendSuccessEventHandler(const MessageSendSuccessEventHandler &messageHandler)
Definition Queue.h:88
Queue(unsigned pollingFrequency)
Definition Queue.h:40
void SetMessageReceivedEventHandler(MessageReceivedEventHandler &&messageHandler)
Definition Queue.h:90
void SetMessageDeleteFailedEventHandler(const MessageDeleteFailedEventHandler &messageHandler)
Definition Queue.h:85
void SetMessageDeleteSuccessEventHandler(MessageDeleteSuccessEventHandler &&messageHandler)
Definition Queue.h:92
virtual void Push(const MESSAGE_TYPE &)=0
void SetMessageReceivedEventHandler(const MessageReceivedEventHandler &messageHandler)
Definition Queue.h:84
void SetMessageSendSuccessEventHandler(MessageSendSuccessEventHandler &&messageHandler)
Definition Queue.h:94
virtual ~Queue()
Definition Queue.h:45
std::atomic< bool > m_continue
Definition Queue.h:103
void SetMessageDeleteFailedEventHandler(MessageDeleteFailedEventHandler &&messageHandler)
Definition Queue.h:91
const MessageReceivedEventHandler & GetMessageReceivedEventHandler() const
Definition Queue.h:96
void SetMessageSendFailedEventHandler(MessageSendFailedEventHandler &&messageHandler)
Definition Queue.h:93
static const char * MEM_TAG
Definition Queue.h:18
UniquePtr< T > MakeUnique(const char *allocationTag, ArgTypes &&... args)
std::unique_ptr< T, D > UniquePtr