include/boost/corosio/native/detail/epoll/epoll_acceptor_service.hpp

81.4% Lines (193/237) 100.0% Functions (20/20)
include/boost/corosio/native/detail/epoll/epoll_acceptor_service.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24
25 #include <boost/corosio/detail/endpoint_convert.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/make_err.hpp>
28
29 #include <memory>
30 #include <mutex>
31 #include <unordered_map>
32 #include <utility>
33
34 #include <errno.h>
35 #include <netinet/in.h>
36 #include <sys/epoll.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39
40 namespace boost::corosio::detail {
41
42 /** State for epoll acceptor service. */
43 class epoll_acceptor_state
44 {
45 public:
46 205 explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
47 205 : sched_(sched)
48 {
49 205 }
50
51 epoll_scheduler& sched_;
52 std::mutex mutex_;
53 intrusive_list<epoll_acceptor> acceptor_list_;
54 std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
55 acceptor_ptrs_;
56 };
57
58 /** epoll acceptor service implementation.
59
60 Inherits from acceptor_service to enable runtime polymorphism.
61 Uses key_type = acceptor_service for service lookup.
62 */
63 class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
64 {
65 public:
66 explicit epoll_acceptor_service(capy::execution_context& ctx);
67 ~epoll_acceptor_service() override;
68
69 epoll_acceptor_service(epoll_acceptor_service const&) = delete;
70 epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
71
72 void shutdown() override;
73
74 io_object::implementation* construct() override;
75 void destroy(io_object::implementation*) override;
76 void close(io_object::handle&) override;
77 std::error_code open_acceptor(
78 tcp_acceptor::implementation& impl, endpoint ep, int backlog) override;
79
80 4797 epoll_scheduler& scheduler() const noexcept
81 {
82 4797 return state_->sched_;
83 }
84 void post(epoll_op* op);
85 void work_started() noexcept;
86 void work_finished() noexcept;
87
88 /** Get the socket service for creating peer sockets during accept. */
89 epoll_socket_service* socket_service() const noexcept;
90
91 private:
92 capy::execution_context& ctx_;
93 std::unique_ptr<epoll_acceptor_state> state_;
94 };
95
96 //--------------------------------------------------------------------------
97 //
98 // Implementation
99 //
100 //--------------------------------------------------------------------------
101
102 inline void
103 6 epoll_accept_op::cancel() noexcept
104 {
105 6 if (acceptor_impl_)
106 6 acceptor_impl_->cancel_single_op(*this);
107 else
108 request_cancel();
109 6 }
110
111 inline void
112 4667 epoll_accept_op::operator()()
113 {
114 4667 stop_cb.reset();
115
116 4667 static_cast<epoll_acceptor*>(acceptor_impl_)
117 4667 ->service()
118 4667 .scheduler()
119 4667 .reset_inline_budget();
120
121 4667 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
122
123 4667 if (cancelled.load(std::memory_order_acquire))
124 9 *ec_out = capy::error::canceled;
125 4658 else if (errn != 0)
126 *ec_out = make_err(errn);
127 else
128 4658 *ec_out = {};
129
130 // Set up the peer socket on success
131 4667 if (success && accepted_fd >= 0 && acceptor_impl_)
132 {
133 4658 auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
134 4658 ->service()
135 4658 .socket_service();
136 4658 if (socket_svc)
137 {
138 4658 auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
139 4658 impl.set_socket(accepted_fd);
140
141 4658 impl.desc_state_.fd = accepted_fd;
142 {
143 4658 std::lock_guard lock(impl.desc_state_.mutex);
144 4658 impl.desc_state_.read_op = nullptr;
145 4658 impl.desc_state_.write_op = nullptr;
146 4658 impl.desc_state_.connect_op = nullptr;
147 4658 }
148 4658 socket_svc->scheduler().register_descriptor(
149 accepted_fd, &impl.desc_state_);
150
151 4658 impl.set_endpoints(
152 4658 static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
153 4658 from_sockaddr_in(peer_addr));
154
155 4658 if (impl_out)
156 4658 *impl_out = &impl;
157 4658 accepted_fd = -1;
158 }
159 else
160 {
161 // No socket service — treat as error
162 *ec_out = make_err(ENOENT);
163 success = false;
164 }
165 }
166
167 4667 if (!success || !acceptor_impl_)
168 {
169 9 if (accepted_fd >= 0)
170 {
171 ::close(accepted_fd);
172 accepted_fd = -1;
173 }
174 9 if (impl_out)
175 9 *impl_out = nullptr;
176 }
177
178 // Move to stack before resuming. See epoll_op::operator()() for rationale.
179 4667 capy::executor_ref saved_ex(ex);
180 4667 std::coroutine_handle<> saved_h(h);
181 4667 auto prevent_premature_destruction = std::move(impl_ptr);
182 4667 dispatch_coro(saved_ex, saved_h).resume();
183 4667 }
184
185 67 inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
186 67 : svc_(svc)
187 {
188 67 }
189
190 inline std::coroutine_handle<>
191 4667 epoll_acceptor::accept(
192 std::coroutine_handle<> h,
193 capy::executor_ref ex,
194 std::stop_token token,
195 std::error_code* ec,
196 io_object::implementation** impl_out)
197 {
198 4667 auto& op = acc_;
199 4667 op.reset();
200 4667 op.h = h;
201 4667 op.ex = ex;
202 4667 op.ec_out = ec;
203 4667 op.impl_out = impl_out;
204 4667 op.fd = fd_;
205 4667 op.start(token, this);
206
207 4667 sockaddr_in addr{};
208 4667 socklen_t addrlen = sizeof(addr);
209 int accepted;
210 do
211 {
212 4667 accepted = ::accept4(
213 fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen,
214 SOCK_NONBLOCK | SOCK_CLOEXEC);
215 }
216 4667 while (accepted < 0 && errno == EINTR);
217
218 4667 if (accepted >= 0)
219 {
220 {
221 2 std::lock_guard lock(desc_state_.mutex);
222 2 desc_state_.read_ready = false;
223 2 }
224
225 2 if (svc_.scheduler().try_consume_inline_budget())
226 {
227 auto* socket_svc = svc_.socket_service();
228 if (socket_svc)
229 {
230 auto& impl =
231 static_cast<epoll_socket&>(*socket_svc->construct());
232 impl.set_socket(accepted);
233
234 impl.desc_state_.fd = accepted;
235 {
236 std::lock_guard lock(impl.desc_state_.mutex);
237 impl.desc_state_.read_op = nullptr;
238 impl.desc_state_.write_op = nullptr;
239 impl.desc_state_.connect_op = nullptr;
240 }
241 socket_svc->scheduler().register_descriptor(
242 accepted, &impl.desc_state_);
243
244 impl.set_endpoints(local_endpoint_, from_sockaddr_in(addr));
245
246 *ec = {};
247 if (impl_out)
248 *impl_out = &impl;
249 }
250 else
251 {
252 ::close(accepted);
253 *ec = make_err(ENOENT);
254 if (impl_out)
255 *impl_out = nullptr;
256 }
257 return dispatch_coro(ex, h);
258 }
259
260 2 op.accepted_fd = accepted;
261 2 op.peer_addr = addr;
262 2 op.complete(0, 0);
263 2 op.impl_ptr = shared_from_this();
264 2 svc_.post(&op);
265 2 return std::noop_coroutine();
266 }
267
268 4665 if (errno == EAGAIN || errno == EWOULDBLOCK)
269 {
270 4665 op.impl_ptr = shared_from_this();
271 4665 svc_.work_started();
272
273 4665 std::lock_guard lock(desc_state_.mutex);
274 4665 bool io_done = false;
275 4665 if (desc_state_.read_ready)
276 {
277 desc_state_.read_ready = false;
278 op.perform_io();
279 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
280 if (!io_done)
281 op.errn = 0;
282 }
283
284 4665 if (io_done || op.cancelled.load(std::memory_order_acquire))
285 {
286 svc_.post(&op);
287 svc_.work_finished();
288 }
289 else
290 {
291 4665 desc_state_.read_op = &op;
292 }
293 4665 return std::noop_coroutine();
294 4665 }
295
296 op.complete(errno, 0);
297 op.impl_ptr = shared_from_this();
298 svc_.post(&op);
299 // completion is always posted to scheduler queue, never inline.
300 return std::noop_coroutine();
301 }
302
303 inline void
304 1 epoll_acceptor::cancel() noexcept
305 {
306 1 cancel_single_op(acc_);
307 1 }
308
309 inline void
310 7 epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
311 {
312 7 auto self = weak_from_this().lock();
313 7 if (!self)
314 return;
315
316 7 op.request_cancel();
317
318 7 epoll_op* claimed = nullptr;
319 {
320 7 std::lock_guard lock(desc_state_.mutex);
321 7 if (desc_state_.read_op == &op)
322 7 claimed = std::exchange(desc_state_.read_op, nullptr);
323 7 }
324 7 if (claimed)
325 {
326 7 op.impl_ptr = self;
327 7 svc_.post(&op);
328 7 svc_.work_finished();
329 }
330 7 }
331
332 inline void
333 264 epoll_acceptor::close_socket() noexcept
334 {
335 264 auto self = weak_from_this().lock();
336 264 if (self)
337 {
338 264 acc_.request_cancel();
339
340 264 epoll_op* claimed = nullptr;
341 {
342 264 std::lock_guard lock(desc_state_.mutex);
343 264 claimed = std::exchange(desc_state_.read_op, nullptr);
344 264 desc_state_.read_ready = false;
345 264 desc_state_.write_ready = false;
346 264 }
347
348 264 if (claimed)
349 {
350 2 acc_.impl_ptr = self;
351 2 svc_.post(&acc_);
352 2 svc_.work_finished();
353 }
354
355 264 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
356 desc_state_.impl_ref_ = self;
357 }
358
359 264 if (fd_ >= 0)
360 {
361 64 if (desc_state_.registered_events != 0)
362 64 svc_.scheduler().deregister_descriptor(fd_);
363 64 ::close(fd_);
364 64 fd_ = -1;
365 }
366
367 264 desc_state_.fd = -1;
368 264 desc_state_.registered_events = 0;
369
370 264 local_endpoint_ = endpoint{};
371 264 }
372
373 205 inline epoll_acceptor_service::epoll_acceptor_service(
374 205 capy::execution_context& ctx)
375 205 : ctx_(ctx)
376 205 , state_(
377 std::make_unique<epoll_acceptor_state>(
378 205 ctx.use_service<epoll_scheduler>()))
379 {
380 205 }
381
382 410 inline epoll_acceptor_service::~epoll_acceptor_service() {}
383
384 inline void
385 205 epoll_acceptor_service::shutdown()
386 {
387 205 std::lock_guard lock(state_->mutex_);
388
389 205 while (auto* impl = state_->acceptor_list_.pop_front())
390 impl->close_socket();
391
392 // Don't clear acceptor_ptrs_ here — same rationale as
393 // epoll_socket_service::shutdown(). Let ~state_ release ptrs
394 // after scheduler shutdown has drained all queued ops.
395 205 }
396
397 inline io_object::implementation*
398 67 epoll_acceptor_service::construct()
399 {
400 67 auto impl = std::make_shared<epoll_acceptor>(*this);
401 67 auto* raw = impl.get();
402
403 67 std::lock_guard lock(state_->mutex_);
404 67 state_->acceptor_list_.push_back(raw);
405 67 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
406
407 67 return raw;
408 67 }
409
410 inline void
411 67 epoll_acceptor_service::destroy(io_object::implementation* impl)
412 {
413 67 auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
414 67 epoll_impl->close_socket();
415 67 std::lock_guard lock(state_->mutex_);
416 67 state_->acceptor_list_.remove(epoll_impl);
417 67 state_->acceptor_ptrs_.erase(epoll_impl);
418 67 }
419
420 inline void
421 131 epoll_acceptor_service::close(io_object::handle& h)
422 {
423 131 static_cast<epoll_acceptor*>(h.get())->close_socket();
424 131 }
425
426 inline std::error_code
427 66 epoll_acceptor_service::open_acceptor(
428 tcp_acceptor::implementation& impl, endpoint ep, int backlog)
429 {
430 66 auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
431 66 epoll_impl->close_socket();
432
433 66 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
434 66 if (fd < 0)
435 return make_err(errno);
436
437 66 int reuse = 1;
438 66 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
439
440 66 sockaddr_in addr = detail::to_sockaddr_in(ep);
441 66 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
442 {
443 2 int errn = errno;
444 2 ::close(fd);
445 2 return make_err(errn);
446 }
447
448 64 if (::listen(fd, backlog) < 0)
449 {
450 int errn = errno;
451 ::close(fd);
452 return make_err(errn);
453 }
454
455 64 epoll_impl->fd_ = fd;
456
457 // Register fd with epoll (edge-triggered mode)
458 64 epoll_impl->desc_state_.fd = fd;
459 {
460 64 std::lock_guard lock(epoll_impl->desc_state_.mutex);
461 64 epoll_impl->desc_state_.read_op = nullptr;
462 64 }
463 64 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
464
465 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
466 64 sockaddr_in local_addr{};
467 64 socklen_t local_len = sizeof(local_addr);
468 64 if (::getsockname(
469 64 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
470 64 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
471
472 64 return {};
473 }
474
475 inline void
476 11 epoll_acceptor_service::post(epoll_op* op)
477 {
478 11 state_->sched_.post(op);
479 11 }
480
481 inline void
482 4665 epoll_acceptor_service::work_started() noexcept
483 {
484 4665 state_->sched_.work_started();
485 4665 }
486
487 inline void
488 9 epoll_acceptor_service::work_finished() noexcept
489 {
490 9 state_->sched_.work_finished();
491 9 }
492
493 inline epoll_socket_service*
494 4658 epoll_acceptor_service::socket_service() const noexcept
495 {
496 4658 auto* svc = ctx_.find_service<detail::socket_service>();
497 4658 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
498 }
499
500 } // namespace boost::corosio::detail
501
502 #endif // BOOST_COROSIO_HAS_EPOLL
503
504 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
505