Sese Framework  2.3.0
A cross-platform framework
Loading...
Searching...
No Matches
UserBalanceLoader.h
Go to the documentation of this file.
1// Copyright 2024 libsese
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
20
21#pragma once
22
23#include "sese/event/Event.h"
24#include "sese/net/Socket.h"
25#include "sese/thread/Thread.h"
26
27#include <atomic>
28#include <queue>
29#include <mutex>
30
31namespace sese::service {
32
33class MasterEventLoop;
34
37public:
38 ~UserBalanceLoader() noexcept;
39
42 void setThreads(size_t th) noexcept;
43
46 void setAddress(const net::IPAddress::Ptr &addr) noexcept { UserBalanceLoader::address = addr; } // GCOVR_EXCL_LINE
47
50 void setAcceptTimeout(uint32_t to) noexcept { UserBalanceLoader::acceptTimeout = to; }
51
54 void setDispatchTimeout(uint32_t to) noexcept { UserBalanceLoader::dispatchTimeout = to; }
55
58 [[nodiscard]] bool isStarted() const { return _isStart; } // GCOVR_EXCL_LINE
59
63 template<class SERVICE>
64 bool init() noexcept;
65
70 template<class SERVICE>
71 bool init(std::function<SERVICE *()> creator) noexcept;
72
74 void start() noexcept;
75
77 void stop() noexcept;
78
79 void dispatchSocket(socket_t sock, void *data);
80
81 void setOnDispatchedCallbackFunction(const std::function<void(int, sese::event::EventLoop *, void *)> &cb) { UserBalanceLoader::onDispatchedCallbackFunction = cb; }
82
83protected:
85 struct SocketStatus {
87 void *data{};
88 };
89
90 void master() noexcept;
91
92 void slave(
93 sese::event::EventLoop *event_loop,
94 std::queue<SocketStatus> *master_queue,
95 std::queue<SocketStatus> *slave_queue,
96 std::mutex *mutex
97 ) noexcept;
98
99protected:
100 std::atomic_bool _isStart{false};
101 std::atomic_bool _isStop{false};
102
103 uint32_t acceptTimeout = 100;
104 uint32_t dispatchTimeout = 100;
105 size_t threads{2};
106 std::vector<sese::event::EventLoop *> eventLoopVector;
107 std::vector<sese::Thread::Ptr> threadVector;
109
114 std::queue<SocketStatus> *masterSocketQueueArray{nullptr};
115 std::queue<SocketStatus> *slaveSocketQueueArray{nullptr};
116 std::mutex *mutexArray{nullptr};
117
118 std::function<void(int, sese::event::EventLoop *event_loop, void *)> onDispatchedCallbackFunction;
119};
120
122class MasterEventLoop final : public sese::event::EventLoop {
123public:
124 void onAccept(int fd) override;
125
126 std::queue<socket_t> socketQueue;
127};
128} // namespace sese::service
129
130// The test code here is not convenient to simulate
131// GCOVR_EXCL_START
132
133template<class SERVICE>
135 return sese::service::UserBalanceLoader::init<SERVICE>([]() -> SERVICE * { return new SERVICE; });
136}
137
138template<class SERVICE>
139bool sese::service::UserBalanceLoader::init(std::function<SERVICE *()> creator) noexcept {
140 if (address == nullptr) return false;
141
142 socket = new net::Socket(
143 address->getRawAddress()->sa_family == AF_INET ? net::Socket::Family::IPv4 : net::Socket::Family::IPv6,
145 );
146 if (-1 == socket->getRawSocket()) {
147 return false;
148 }
149
150 if (!socket->setNonblocking()) {
151 goto freeSocket;
152 }
153
154 if (0 != socket->bind(address)) {
155 goto freeSocket;
156 }
157
158 if (0 != socket->listen(32)) {
159 goto freeSocket;
160 }
161
162 masterEventLoop = new MasterEventLoop;
163 masterEventLoop->setListenFd(static_cast<int>(socket->getRawSocket()));
164 if (!masterEventLoop->init()) {
165 goto freeMaster;
166 }
167
168 for (size_t i = 0; i < threads; ++i) {
169 auto event = creator();
170 if (event == nullptr) {
171 goto freeEvent;
172 }
173 if (!event->init()) {
174 delete event;
175 goto freeEvent;
176 } else {
177 eventLoopVector.emplace_back(event);
178 }
179 }
180
181 // Initialize exchange queues
182 masterSocketQueueArray = new std::queue<SocketStatus>[threads];
183 slaveSocketQueueArray = new std::queue<SocketStatus>[threads];
184 mutexArray = new std::mutex[threads];
185
186 return true;
187
188freeEvent:
189 for (decltype(auto) event_loop: eventLoopVector) {
190 delete event_loop;
191 }
192 eventLoopVector.clear();
193
194freeMaster:
195 delete masterEventLoop;
196 masterEventLoop = nullptr;
197
198freeSocket:
199 socket->close();
200 delete socket;
201 socket = nullptr;
202 return false;
203}
204
205
206// GCOVR_EXCL_STOP