1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
27  
#include <boost/corosio/detail/except.hpp>
27  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/capy/buffers.hpp>
28  
#include <boost/capy/buffers.hpp>
29  

29  

30  
#include <coroutine>
30  
#include <coroutine>
31  
#include <mutex>
31  
#include <mutex>
32  
#include <unordered_map>
32  
#include <unordered_map>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <netinet/in.h>
36  
#include <netinet/in.h>
37  
#include <netinet/tcp.h>
37  
#include <netinet/tcp.h>
38  
#include <sys/epoll.h>
38  
#include <sys/epoll.h>
39  
#include <sys/socket.h>
39  
#include <sys/socket.h>
40  
#include <unistd.h>
40  
#include <unistd.h>
41  

41  

42  
/*
42  
/*
43  
    epoll Socket Implementation
43  
    epoll Socket Implementation
44  
    ===========================
44  
    ===========================
45  

45  

46  
    Each I/O operation follows the same pattern:
46  
    Each I/O operation follows the same pattern:
47  
      1. Try the syscall immediately (non-blocking socket)
47  
      1. Try the syscall immediately (non-blocking socket)
48  
      2. If it succeeds or fails with a real error, post to completion queue
48  
      2. If it succeeds or fails with a real error, post to completion queue
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50  

50  

51  
    This "try first" approach avoids unnecessary epoll round-trips for
51  
    This "try first" approach avoids unnecessary epoll round-trips for
52  
    operations that can complete immediately (common for small reads/writes
52  
    operations that can complete immediately (common for small reads/writes
53  
    on fast local connections).
53  
    on fast local connections).
54  

54  

55  
    One-Shot Registration
55  
    One-Shot Registration
56  
    ---------------------
56  
    ---------------------
57  
    We use one-shot epoll registration: each operation registers, waits for
57  
    We use one-shot epoll registration: each operation registers, waits for
58  
    one event, then unregisters. This simplifies the state machine since we
58  
    one event, then unregisters. This simplifies the state machine since we
59  
    don't need to track whether an fd is currently registered or handle
59  
    don't need to track whether an fd is currently registered or handle
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61  
    simplicity is worth it.
61  
    simplicity is worth it.
62  

62  

63  
    Cancellation
63  
    Cancellation
64  
    ------------
64  
    ------------
65  
    See op.hpp for the completion/cancellation race handling via the
65  
    See op.hpp for the completion/cancellation race handling via the
66  
    `registered` atomic. cancel() must complete pending operations (post
66  
    `registered` atomic. cancel() must complete pending operations (post
67  
    them with cancelled flag) so coroutines waiting on them can resume.
67  
    them with cancelled flag) so coroutines waiting on them can resume.
68  
    close_socket() calls cancel() first to ensure this.
68  
    close_socket() calls cancel() first to ensure this.
69  

69  

70  
    Impl Lifetime with shared_ptr
70  
    Impl Lifetime with shared_ptr
71  
    -----------------------------
71  
    -----------------------------
72  
    Socket impls use enable_shared_from_this. The service owns impls via
72  
    Socket impls use enable_shared_from_this. The service owns impls via
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74  
    removal. When a user calls close(), we call cancel() which posts pending
74  
    removal. When a user calls close(), we call cancel() which posts pending
75  
    ops to the scheduler.
75  
    ops to the scheduler.
76  

76  

77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
81  
    to be destroyed if no other references exist.
81  
    to be destroyed if no other references exist.
82  

82  

83  
    Service Ownership
83  
    Service Ownership
84  
    -----------------
84  
    -----------------
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
86  
    shared_ptr from the map, but the impl may survive if ops still hold
86  
    shared_ptr from the map, but the impl may survive if ops still hold
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
88  
    in-flight ops will complete and release their refs.
88  
    in-flight ops will complete and release their refs.
89  
*/
89  
*/
90  

90  

91  
namespace boost::corosio::detail {
91  
namespace boost::corosio::detail {
92  

92  

93  
/** State for epoll socket service. */
93  
/** State for epoll socket service. */
94  
class epoll_socket_state
94  
class epoll_socket_state
95  
{
95  
{
96  
public:
96  
public:
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98  
    {
98  
    {
99  
    }
99  
    }
100  

100  

101  
    epoll_scheduler& sched_;
101  
    epoll_scheduler& sched_;
102  
    std::mutex mutex_;
102  
    std::mutex mutex_;
103  
    intrusive_list<epoll_socket> socket_list_;
103  
    intrusive_list<epoll_socket> socket_list_;
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105  
        socket_ptrs_;
105  
        socket_ptrs_;
106  
};
106  
};
107  

107  

108  
/** epoll socket service implementation.
108  
/** epoll socket service implementation.
109  

109  

110  
    Inherits from socket_service to enable runtime polymorphism.
110  
    Inherits from socket_service to enable runtime polymorphism.
111  
    Uses key_type = socket_service for service lookup.
111  
    Uses key_type = socket_service for service lookup.
112  
*/
112  
*/
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114  
{
114  
{
115  
public:
115  
public:
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
117  
    ~epoll_socket_service() override;
117  
    ~epoll_socket_service() override;
118  

118  

119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121  

121  

122  
    void shutdown() override;
122  
    void shutdown() override;
123  

123  

124  
    io_object::implementation* construct() override;
124  
    io_object::implementation* construct() override;
125  
    void destroy(io_object::implementation*) override;
125  
    void destroy(io_object::implementation*) override;
126  
    void close(io_object::handle&) override;
126  
    void close(io_object::handle&) override;
127  
    std::error_code open_socket(tcp_socket::implementation& impl) override;
127  
    std::error_code open_socket(tcp_socket::implementation& impl) override;
128  

128  

129  
    epoll_scheduler& scheduler() const noexcept
129  
    epoll_scheduler& scheduler() const noexcept
130  
    {
130  
    {
131  
        return state_->sched_;
131  
        return state_->sched_;
132  
    }
132  
    }
133  
    void post(epoll_op* op);
133  
    void post(epoll_op* op);
134  
    void work_started() noexcept;
134  
    void work_started() noexcept;
135  
    void work_finished() noexcept;
135  
    void work_finished() noexcept;
136  

136  

137  
private:
137  
private:
138  
    std::unique_ptr<epoll_socket_state> state_;
138  
    std::unique_ptr<epoll_socket_state> state_;
139  
};
139  
};
140  

140  

141  
//--------------------------------------------------------------------------
141  
//--------------------------------------------------------------------------
142  
//
142  
//
143  
// Implementation
143  
// Implementation
144  
//
144  
//
145  
//--------------------------------------------------------------------------
145  
//--------------------------------------------------------------------------
146  

146  

147  
// Register an op with the reactor, handling cached edge events.
147  
// Register an op with the reactor, handling cached edge events.
148  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
148  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
149  
inline void
149  
inline void
150  
epoll_socket::register_op(
150  
epoll_socket::register_op(
151  
    epoll_op& op,
151  
    epoll_op& op,
152  
    epoll_op*& desc_slot,
152  
    epoll_op*& desc_slot,
153  
    bool& ready_flag,
153  
    bool& ready_flag,
154  
    bool& cancel_flag) noexcept
154  
    bool& cancel_flag) noexcept
155  
{
155  
{
156  
    svc_.work_started();
156  
    svc_.work_started();
157  

157  

158  
    std::lock_guard lock(desc_state_.mutex);
158  
    std::lock_guard lock(desc_state_.mutex);
159  
    bool io_done = false;
159  
    bool io_done = false;
160  
    if (ready_flag)
160  
    if (ready_flag)
161  
    {
161  
    {
162  
        ready_flag = false;
162  
        ready_flag = false;
163  
        op.perform_io();
163  
        op.perform_io();
164  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
164  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
165  
        if (!io_done)
165  
        if (!io_done)
166  
            op.errn = 0;
166  
            op.errn = 0;
167  
    }
167  
    }
168  

168  

169  
    if (cancel_flag)
169  
    if (cancel_flag)
170  
    {
170  
    {
171  
        cancel_flag = false;
171  
        cancel_flag = false;
172  
        op.cancelled.store(true, std::memory_order_relaxed);
172  
        op.cancelled.store(true, std::memory_order_relaxed);
173  
    }
173  
    }
174  

174  

175  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
175  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
176  
    {
176  
    {
177  
        svc_.post(&op);
177  
        svc_.post(&op);
178  
        svc_.work_finished();
178  
        svc_.work_finished();
179  
    }
179  
    }
180  
    else
180  
    else
181  
    {
181  
    {
182  
        desc_slot = &op;
182  
        desc_slot = &op;
183  
    }
183  
    }
184  
}
184  
}
185  

185  

186  
inline void
186  
inline void
187  
epoll_op::canceller::operator()() const noexcept
187  
epoll_op::canceller::operator()() const noexcept
188  
{
188  
{
189  
    op->cancel();
189  
    op->cancel();
190  
}
190  
}
191  

191  

192  
inline void
192  
inline void
193  
epoll_connect_op::cancel() noexcept
193  
epoll_connect_op::cancel() noexcept
194  
{
194  
{
195  
    if (socket_impl_)
195  
    if (socket_impl_)
196  
        socket_impl_->cancel_single_op(*this);
196  
        socket_impl_->cancel_single_op(*this);
197  
    else
197  
    else
198  
        request_cancel();
198  
        request_cancel();
199  
}
199  
}
200  

200  

201  
inline void
201  
inline void
202  
epoll_read_op::cancel() noexcept
202  
epoll_read_op::cancel() noexcept
203  
{
203  
{
204  
    if (socket_impl_)
204  
    if (socket_impl_)
205  
        socket_impl_->cancel_single_op(*this);
205  
        socket_impl_->cancel_single_op(*this);
206  
    else
206  
    else
207  
        request_cancel();
207  
        request_cancel();
208  
}
208  
}
209  

209  

210  
inline void
210  
inline void
211  
epoll_write_op::cancel() noexcept
211  
epoll_write_op::cancel() noexcept
212  
{
212  
{
213  
    if (socket_impl_)
213  
    if (socket_impl_)
214  
        socket_impl_->cancel_single_op(*this);
214  
        socket_impl_->cancel_single_op(*this);
215  
    else
215  
    else
216  
        request_cancel();
216  
        request_cancel();
217  
}
217  
}
218  

218  

219  
inline void
219  
inline void
220  
epoll_op::operator()()
220  
epoll_op::operator()()
221  
{
221  
{
222  
    stop_cb.reset();
222  
    stop_cb.reset();
223  

223  

224  
    socket_impl_->svc_.scheduler().reset_inline_budget();
224  
    socket_impl_->svc_.scheduler().reset_inline_budget();
225  

225  

226  
    if (cancelled.load(std::memory_order_acquire))
226  
    if (cancelled.load(std::memory_order_acquire))
227  
        *ec_out = capy::error::canceled;
227  
        *ec_out = capy::error::canceled;
228  
    else if (errn != 0)
228  
    else if (errn != 0)
229  
        *ec_out = make_err(errn);
229  
        *ec_out = make_err(errn);
230  
    else if (is_read_operation() && bytes_transferred == 0)
230  
    else if (is_read_operation() && bytes_transferred == 0)
231  
        *ec_out = capy::error::eof;
231  
        *ec_out = capy::error::eof;
232  
    else
232  
    else
233  
        *ec_out = {};
233  
        *ec_out = {};
234  

234  

235  
    *bytes_out = bytes_transferred;
235  
    *bytes_out = bytes_transferred;
236  

236  

237  
    // Move to stack before resuming coroutine. The coroutine might close
237  
    // Move to stack before resuming coroutine. The coroutine might close
238  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
238  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
239  
    // last ref and we destroyed it while still in operator(), we'd have
239  
    // last ref and we destroyed it while still in operator(), we'd have
240  
    // use-after-free. Moving to local ensures destruction happens at
240  
    // use-after-free. Moving to local ensures destruction happens at
241  
    // function exit, after all member accesses are complete.
241  
    // function exit, after all member accesses are complete.
242  
    capy::executor_ref saved_ex(ex);
242  
    capy::executor_ref saved_ex(ex);
243  
    std::coroutine_handle<> saved_h(h);
243  
    std::coroutine_handle<> saved_h(h);
244  
    auto prevent_premature_destruction = std::move(impl_ptr);
244  
    auto prevent_premature_destruction = std::move(impl_ptr);
245  
    dispatch_coro(saved_ex, saved_h).resume();
245  
    dispatch_coro(saved_ex, saved_h).resume();
246  
}
246  
}
247  

247  

248  
inline void
248  
inline void
249  
epoll_connect_op::operator()()
249  
epoll_connect_op::operator()()
250  
{
250  
{
251  
    stop_cb.reset();
251  
    stop_cb.reset();
252  

252  

253  
    socket_impl_->svc_.scheduler().reset_inline_budget();
253  
    socket_impl_->svc_.scheduler().reset_inline_budget();
254  

254  

255  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
255  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
256  

256  

257  
    // Cache endpoints on successful connect
257  
    // Cache endpoints on successful connect
258  
    if (success && socket_impl_)
258  
    if (success && socket_impl_)
259  
    {
259  
    {
260  
        // Query local endpoint via getsockname (may fail, but remote is always known)
260  
        // Query local endpoint via getsockname (may fail, but remote is always known)
261  
        endpoint local_ep;
261  
        endpoint local_ep;
262  
        sockaddr_in local_addr{};
262  
        sockaddr_in local_addr{};
263  
        socklen_t local_len = sizeof(local_addr);
263  
        socklen_t local_len = sizeof(local_addr);
264  
        if (::getsockname(
264  
        if (::getsockname(
265  
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
265  
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
266  
            local_ep = from_sockaddr_in(local_addr);
266  
            local_ep = from_sockaddr_in(local_addr);
267  
        // Always cache remote endpoint; local may be default if getsockname failed
267  
        // Always cache remote endpoint; local may be default if getsockname failed
268  
        static_cast<epoll_socket*>(socket_impl_)
268  
        static_cast<epoll_socket*>(socket_impl_)
269  
            ->set_endpoints(local_ep, target_endpoint);
269  
            ->set_endpoints(local_ep, target_endpoint);
270  
    }
270  
    }
271  

271  

272  
    if (cancelled.load(std::memory_order_acquire))
272  
    if (cancelled.load(std::memory_order_acquire))
273  
        *ec_out = capy::error::canceled;
273  
        *ec_out = capy::error::canceled;
274  
    else if (errn != 0)
274  
    else if (errn != 0)
275  
        *ec_out = make_err(errn);
275  
        *ec_out = make_err(errn);
276  
    else
276  
    else
277  
        *ec_out = {};
277  
        *ec_out = {};
278  

278  

279  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
279  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
280  
    capy::executor_ref saved_ex(ex);
280  
    capy::executor_ref saved_ex(ex);
281  
    std::coroutine_handle<> saved_h(h);
281  
    std::coroutine_handle<> saved_h(h);
282  
    auto prevent_premature_destruction = std::move(impl_ptr);
282  
    auto prevent_premature_destruction = std::move(impl_ptr);
283  
    dispatch_coro(saved_ex, saved_h).resume();
283  
    dispatch_coro(saved_ex, saved_h).resume();
284  
}
284  
}
285  

285  

286  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
286  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
287  
    : svc_(svc)
287  
    : svc_(svc)
288  
{
288  
{
289  
}
289  
}
290  

290  

291  
inline epoll_socket::~epoll_socket() = default;
291  
inline epoll_socket::~epoll_socket() = default;
292  

292  

293  
inline std::coroutine_handle<>
293  
inline std::coroutine_handle<>
294  
epoll_socket::connect(
294  
epoll_socket::connect(
295  
    std::coroutine_handle<> h,
295  
    std::coroutine_handle<> h,
296  
    capy::executor_ref ex,
296  
    capy::executor_ref ex,
297  
    endpoint ep,
297  
    endpoint ep,
298  
    std::stop_token token,
298  
    std::stop_token token,
299  
    std::error_code* ec)
299  
    std::error_code* ec)
300  
{
300  
{
301  
    auto& op = conn_;
301  
    auto& op = conn_;
302  

302  

303  
    sockaddr_in addr = detail::to_sockaddr_in(ep);
303  
    sockaddr_in addr = detail::to_sockaddr_in(ep);
304  
    int result =
304  
    int result =
305  
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
305  
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
306  

306  

307  
    if (result == 0)
307  
    if (result == 0)
308  
    {
308  
    {
309  
        sockaddr_in local_addr{};
309  
        sockaddr_in local_addr{};
310  
        socklen_t local_len = sizeof(local_addr);
310  
        socklen_t local_len = sizeof(local_addr);
311  
        if (::getsockname(
311  
        if (::getsockname(
312  
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
312  
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
313  
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
313  
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
314  
        remote_endpoint_ = ep;
314  
        remote_endpoint_ = ep;
315  
    }
315  
    }
316  

316  

317  
    if (result == 0 || errno != EINPROGRESS)
317  
    if (result == 0 || errno != EINPROGRESS)
318  
    {
318  
    {
319  
        int err = (result < 0) ? errno : 0;
319  
        int err = (result < 0) ? errno : 0;
320  
        if (svc_.scheduler().try_consume_inline_budget())
320  
        if (svc_.scheduler().try_consume_inline_budget())
321  
        {
321  
        {
322  
            *ec = err ? make_err(err) : std::error_code{};
322  
            *ec = err ? make_err(err) : std::error_code{};
323  
            return dispatch_coro(ex, h);
323  
            return dispatch_coro(ex, h);
324  
        }
324  
        }
325  
        op.reset();
325  
        op.reset();
326  
        op.h               = h;
326  
        op.h               = h;
327  
        op.ex              = ex;
327  
        op.ex              = ex;
328  
        op.ec_out          = ec;
328  
        op.ec_out          = ec;
329  
        op.fd              = fd_;
329  
        op.fd              = fd_;
330  
        op.target_endpoint = ep;
330  
        op.target_endpoint = ep;
331  
        op.start(token, this);
331  
        op.start(token, this);
332  
        op.impl_ptr = shared_from_this();
332  
        op.impl_ptr = shared_from_this();
333  
        op.complete(err, 0);
333  
        op.complete(err, 0);
334  
        svc_.post(&op);
334  
        svc_.post(&op);
335  
        return std::noop_coroutine();
335  
        return std::noop_coroutine();
336  
    }
336  
    }
337  

337  

338  
    // EINPROGRESS — register with reactor
338  
    // EINPROGRESS — register with reactor
339  
    op.reset();
339  
    op.reset();
340  
    op.h               = h;
340  
    op.h               = h;
341  
    op.ex              = ex;
341  
    op.ex              = ex;
342  
    op.ec_out          = ec;
342  
    op.ec_out          = ec;
343  
    op.fd              = fd_;
343  
    op.fd              = fd_;
344  
    op.target_endpoint = ep;
344  
    op.target_endpoint = ep;
345  
    op.start(token, this);
345  
    op.start(token, this);
346  
    op.impl_ptr = shared_from_this();
346  
    op.impl_ptr = shared_from_this();
347  

347  

348  
    register_op(
348  
    register_op(
349  
        op, desc_state_.connect_op, desc_state_.write_ready,
349  
        op, desc_state_.connect_op, desc_state_.write_ready,
350  
        desc_state_.connect_cancel_pending);
350  
        desc_state_.connect_cancel_pending);
351  
    return std::noop_coroutine();
351  
    return std::noop_coroutine();
352  
}
352  
}
353  

353  

354  
inline std::coroutine_handle<>
354  
inline std::coroutine_handle<>
355  
epoll_socket::read_some(
355  
epoll_socket::read_some(
356  
    std::coroutine_handle<> h,
356  
    std::coroutine_handle<> h,
357  
    capy::executor_ref ex,
357  
    capy::executor_ref ex,
358  
    io_buffer_param param,
358  
    io_buffer_param param,
359  
    std::stop_token token,
359  
    std::stop_token token,
360  
    std::error_code* ec,
360  
    std::error_code* ec,
361  
    std::size_t* bytes_out)
361  
    std::size_t* bytes_out)
362  
{
362  
{
363  
    auto& op = rd_;
363  
    auto& op = rd_;
364  
    op.reset();
364  
    op.reset();
365  

365  

366  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
366  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
367  
    op.iovec_count =
367  
    op.iovec_count =
368  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
368  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
369  

369  

370  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
370  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
371  
    {
371  
    {
372  
        op.empty_buffer_read = true;
372  
        op.empty_buffer_read = true;
373  
        op.h                 = h;
373  
        op.h                 = h;
374  
        op.ex                = ex;
374  
        op.ex                = ex;
375  
        op.ec_out            = ec;
375  
        op.ec_out            = ec;
376  
        op.bytes_out         = bytes_out;
376  
        op.bytes_out         = bytes_out;
377  
        op.start(token, this);
377  
        op.start(token, this);
378  
        op.impl_ptr = shared_from_this();
378  
        op.impl_ptr = shared_from_this();
379  
        op.complete(0, 0);
379  
        op.complete(0, 0);
380  
        svc_.post(&op);
380  
        svc_.post(&op);
381  
        return std::noop_coroutine();
381  
        return std::noop_coroutine();
382  
    }
382  
    }
383  

383  

384  
    for (int i = 0; i < op.iovec_count; ++i)
384  
    for (int i = 0; i < op.iovec_count; ++i)
385  
    {
385  
    {
386  
        op.iovecs[i].iov_base = bufs[i].data();
386  
        op.iovecs[i].iov_base = bufs[i].data();
387  
        op.iovecs[i].iov_len  = bufs[i].size();
387  
        op.iovecs[i].iov_len  = bufs[i].size();
388  
    }
388  
    }
389  

389  

390  
    // Speculative read
390  
    // Speculative read
391  
    ssize_t n;
391  
    ssize_t n;
392  
    do
392  
    do
393  
    {
393  
    {
394  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
394  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
395  
    }
395  
    }
396  
    while (n < 0 && errno == EINTR);
396  
    while (n < 0 && errno == EINTR);
397  

397  

398  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
398  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
399  
    {
399  
    {
400  
        int err    = (n < 0) ? errno : 0;
400  
        int err    = (n < 0) ? errno : 0;
401  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
401  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
402  

402  

403  
        if (svc_.scheduler().try_consume_inline_budget())
403  
        if (svc_.scheduler().try_consume_inline_budget())
404  
        {
404  
        {
405  
            if (err)
405  
            if (err)
406  
                *ec = make_err(err);
406  
                *ec = make_err(err);
407  
            else if (n == 0)
407  
            else if (n == 0)
408  
                *ec = capy::error::eof;
408  
                *ec = capy::error::eof;
409  
            else
409  
            else
410  
                *ec = {};
410  
                *ec = {};
411  
            *bytes_out = bytes;
411  
            *bytes_out = bytes;
412  
            return dispatch_coro(ex, h);
412  
            return dispatch_coro(ex, h);
413  
        }
413  
        }
414  
        op.h         = h;
414  
        op.h         = h;
415  
        op.ex        = ex;
415  
        op.ex        = ex;
416  
        op.ec_out    = ec;
416  
        op.ec_out    = ec;
417  
        op.bytes_out = bytes_out;
417  
        op.bytes_out = bytes_out;
418  
        op.start(token, this);
418  
        op.start(token, this);
419  
        op.impl_ptr = shared_from_this();
419  
        op.impl_ptr = shared_from_this();
420  
        op.complete(err, bytes);
420  
        op.complete(err, bytes);
421  
        svc_.post(&op);
421  
        svc_.post(&op);
422  
        return std::noop_coroutine();
422  
        return std::noop_coroutine();
423  
    }
423  
    }
424  

424  

425  
    // EAGAIN — register with reactor
425  
    // EAGAIN — register with reactor
426  
    op.h         = h;
426  
    op.h         = h;
427  
    op.ex        = ex;
427  
    op.ex        = ex;
428  
    op.ec_out    = ec;
428  
    op.ec_out    = ec;
429  
    op.bytes_out = bytes_out;
429  
    op.bytes_out = bytes_out;
430  
    op.fd        = fd_;
430  
    op.fd        = fd_;
431  
    op.start(token, this);
431  
    op.start(token, this);
432  
    op.impl_ptr = shared_from_this();
432  
    op.impl_ptr = shared_from_this();
433  

433  

434  
    register_op(
434  
    register_op(
435  
        op, desc_state_.read_op, desc_state_.read_ready,
435  
        op, desc_state_.read_op, desc_state_.read_ready,
436  
        desc_state_.read_cancel_pending);
436  
        desc_state_.read_cancel_pending);
437  
    return std::noop_coroutine();
437  
    return std::noop_coroutine();
438  
}
438  
}
439  

439  

440  
inline std::coroutine_handle<>
440  
inline std::coroutine_handle<>
441  
epoll_socket::write_some(
441  
epoll_socket::write_some(
442  
    std::coroutine_handle<> h,
442  
    std::coroutine_handle<> h,
443  
    capy::executor_ref ex,
443  
    capy::executor_ref ex,
444  
    io_buffer_param param,
444  
    io_buffer_param param,
445  
    std::stop_token token,
445  
    std::stop_token token,
446  
    std::error_code* ec,
446  
    std::error_code* ec,
447  
    std::size_t* bytes_out)
447  
    std::size_t* bytes_out)
448  
{
448  
{
449  
    auto& op = wr_;
449  
    auto& op = wr_;
450  
    op.reset();
450  
    op.reset();
451  

451  

452  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
452  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
453  
    op.iovec_count =
453  
    op.iovec_count =
454  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
454  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
455  

455  

456  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
456  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
457  
    {
457  
    {
458  
        op.h         = h;
458  
        op.h         = h;
459  
        op.ex        = ex;
459  
        op.ex        = ex;
460  
        op.ec_out    = ec;
460  
        op.ec_out    = ec;
461  
        op.bytes_out = bytes_out;
461  
        op.bytes_out = bytes_out;
462  
        op.start(token, this);
462  
        op.start(token, this);
463  
        op.impl_ptr = shared_from_this();
463  
        op.impl_ptr = shared_from_this();
464  
        op.complete(0, 0);
464  
        op.complete(0, 0);
465  
        svc_.post(&op);
465  
        svc_.post(&op);
466  
        return std::noop_coroutine();
466  
        return std::noop_coroutine();
467  
    }
467  
    }
468  

468  

469  
    for (int i = 0; i < op.iovec_count; ++i)
469  
    for (int i = 0; i < op.iovec_count; ++i)
470  
    {
470  
    {
471  
        op.iovecs[i].iov_base = bufs[i].data();
471  
        op.iovecs[i].iov_base = bufs[i].data();
472  
        op.iovecs[i].iov_len  = bufs[i].size();
472  
        op.iovecs[i].iov_len  = bufs[i].size();
473  
    }
473  
    }
474  

474  

475  
    // Speculative write
475  
    // Speculative write
476  
    msghdr msg{};
476  
    msghdr msg{};
477  
    msg.msg_iov    = op.iovecs;
477  
    msg.msg_iov    = op.iovecs;
478  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
478  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
479  

479  

480  
    ssize_t n;
480  
    ssize_t n;
481  
    do
481  
    do
482  
    {
482  
    {
483  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
483  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
484  
    }
484  
    }
485  
    while (n < 0 && errno == EINTR);
485  
    while (n < 0 && errno == EINTR);
486  

486  

487  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
487  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
488  
    {
488  
    {
489  
        int err    = (n < 0) ? errno : 0;
489  
        int err    = (n < 0) ? errno : 0;
490  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
490  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
491  

491  

492  
        if (svc_.scheduler().try_consume_inline_budget())
492  
        if (svc_.scheduler().try_consume_inline_budget())
493  
        {
493  
        {
494  
            *ec        = err ? make_err(err) : std::error_code{};
494  
            *ec        = err ? make_err(err) : std::error_code{};
495  
            *bytes_out = bytes;
495  
            *bytes_out = bytes;
496  
            return dispatch_coro(ex, h);
496  
            return dispatch_coro(ex, h);
497  
        }
497  
        }
498  
        op.h         = h;
498  
        op.h         = h;
499  
        op.ex        = ex;
499  
        op.ex        = ex;
500  
        op.ec_out    = ec;
500  
        op.ec_out    = ec;
501  
        op.bytes_out = bytes_out;
501  
        op.bytes_out = bytes_out;
502  
        op.start(token, this);
502  
        op.start(token, this);
503  
        op.impl_ptr = shared_from_this();
503  
        op.impl_ptr = shared_from_this();
504  
        op.complete(err, bytes);
504  
        op.complete(err, bytes);
505  
        svc_.post(&op);
505  
        svc_.post(&op);
506  
        return std::noop_coroutine();
506  
        return std::noop_coroutine();
507  
    }
507  
    }
508  

508  

509  
    // EAGAIN — register with reactor
509  
    // EAGAIN — register with reactor
510  
    op.h         = h;
510  
    op.h         = h;
511  
    op.ex        = ex;
511  
    op.ex        = ex;
512  
    op.ec_out    = ec;
512  
    op.ec_out    = ec;
513  
    op.bytes_out = bytes_out;
513  
    op.bytes_out = bytes_out;
514  
    op.fd        = fd_;
514  
    op.fd        = fd_;
515  
    op.start(token, this);
515  
    op.start(token, this);
516  
    op.impl_ptr = shared_from_this();
516  
    op.impl_ptr = shared_from_this();
517  

517  

518  
    register_op(
518  
    register_op(
519  
        op, desc_state_.write_op, desc_state_.write_ready,
519  
        op, desc_state_.write_op, desc_state_.write_ready,
520  
        desc_state_.write_cancel_pending);
520  
        desc_state_.write_cancel_pending);
521  
    return std::noop_coroutine();
521  
    return std::noop_coroutine();
522  
}
522  
}
523  

523  

524  
inline std::error_code
524  
inline std::error_code
525  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
525  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
526  
{
526  
{
527  
    int how;
527  
    int how;
528  
    switch (what)
528  
    switch (what)
529  
    {
529  
    {
530  
    case tcp_socket::shutdown_receive:
530  
    case tcp_socket::shutdown_receive:
531  
        how = SHUT_RD;
531  
        how = SHUT_RD;
532  
        break;
532  
        break;
533  
    case tcp_socket::shutdown_send:
533  
    case tcp_socket::shutdown_send:
534  
        how = SHUT_WR;
534  
        how = SHUT_WR;
535  
        break;
535  
        break;
536  
    case tcp_socket::shutdown_both:
536  
    case tcp_socket::shutdown_both:
537  
        how = SHUT_RDWR;
537  
        how = SHUT_RDWR;
538  
        break;
538  
        break;
539  
    default:
539  
    default:
540  
        return make_err(EINVAL);
540  
        return make_err(EINVAL);
541  
    }
541  
    }
542  
    if (::shutdown(fd_, how) != 0)
542  
    if (::shutdown(fd_, how) != 0)
543  
        return make_err(errno);
543  
        return make_err(errno);
544  
    return {};
544  
    return {};
545  
}
545  
}
546  

546  

547  
inline std::error_code
547  
inline std::error_code
548  
epoll_socket::set_no_delay(bool value) noexcept
548  
epoll_socket::set_no_delay(bool value) noexcept
549  
{
549  
{
550  
    int flag = value ? 1 : 0;
550  
    int flag = value ? 1 : 0;
551  
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
551  
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
552  
        return make_err(errno);
552  
        return make_err(errno);
553  
    return {};
553  
    return {};
554  
}
554  
}
555  

555  

556  
inline bool
556  
inline bool
557  
epoll_socket::no_delay(std::error_code& ec) const noexcept
557  
epoll_socket::no_delay(std::error_code& ec) const noexcept
558  
{
558  
{
559  
    int flag      = 0;
559  
    int flag      = 0;
560  
    socklen_t len = sizeof(flag);
560  
    socklen_t len = sizeof(flag);
561  
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
561  
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
562  
    {
562  
    {
563  
        ec = make_err(errno);
563  
        ec = make_err(errno);
564  
        return false;
564  
        return false;
565  
    }
565  
    }
566  
    ec = {};
566  
    ec = {};
567  
    return flag != 0;
567  
    return flag != 0;
568  
}
568  
}
569  

569  

570  
inline std::error_code
570  
inline std::error_code
571  
epoll_socket::set_keep_alive(bool value) noexcept
571  
epoll_socket::set_keep_alive(bool value) noexcept
572  
{
572  
{
573  
    int flag = value ? 1 : 0;
573  
    int flag = value ? 1 : 0;
574  
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
574  
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
575  
        return make_err(errno);
575  
        return make_err(errno);
576  
    return {};
576  
    return {};
577  
}
577  
}
578  

578  

579  
inline bool
579  
inline bool
580  
epoll_socket::keep_alive(std::error_code& ec) const noexcept
580  
epoll_socket::keep_alive(std::error_code& ec) const noexcept
581  
{
581  
{
582  
    int flag      = 0;
582  
    int flag      = 0;
583  
    socklen_t len = sizeof(flag);
583  
    socklen_t len = sizeof(flag);
584  
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
584  
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
585  
    {
585  
    {
586  
        ec = make_err(errno);
586  
        ec = make_err(errno);
587  
        return false;
587  
        return false;
588  
    }
588  
    }
589  
    ec = {};
589  
    ec = {};
590  
    return flag != 0;
590  
    return flag != 0;
591  
}
591  
}
592  

592  

593  
inline std::error_code
593  
inline std::error_code
594  
epoll_socket::set_receive_buffer_size(int size) noexcept
594  
epoll_socket::set_receive_buffer_size(int size) noexcept
595  
{
595  
{
596  
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
596  
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
597  
        return make_err(errno);
597  
        return make_err(errno);
598  
    return {};
598  
    return {};
599  
}
599  
}
600  

600  

601  
inline int
601  
inline int
602  
epoll_socket::receive_buffer_size(std::error_code& ec) const noexcept
602  
epoll_socket::receive_buffer_size(std::error_code& ec) const noexcept
603  
{
603  
{
604  
    int size      = 0;
604  
    int size      = 0;
605  
    socklen_t len = sizeof(size);
605  
    socklen_t len = sizeof(size);
606  
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
606  
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
607  
    {
607  
    {
608  
        ec = make_err(errno);
608  
        ec = make_err(errno);
609  
        return 0;
609  
        return 0;
610  
    }
610  
    }
611  
    ec = {};
611  
    ec = {};
612  
    return size;
612  
    return size;
613  
}
613  
}
614  

614  

615  
inline std::error_code
615  
inline std::error_code
616  
epoll_socket::set_send_buffer_size(int size) noexcept
616  
epoll_socket::set_send_buffer_size(int size) noexcept
617  
{
617  
{
618  
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
618  
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
619  
        return make_err(errno);
619  
        return make_err(errno);
620  
    return {};
620  
    return {};
621  
}
621  
}
622  

622  

623  
inline int
623  
inline int
624  
epoll_socket::send_buffer_size(std::error_code& ec) const noexcept
624  
epoll_socket::send_buffer_size(std::error_code& ec) const noexcept
625  
{
625  
{
626  
    int size      = 0;
626  
    int size      = 0;
627  
    socklen_t len = sizeof(size);
627  
    socklen_t len = sizeof(size);
628  
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
628  
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
629  
    {
629  
    {
630  
        ec = make_err(errno);
630  
        ec = make_err(errno);
631  
        return 0;
631  
        return 0;
632  
    }
632  
    }
633  
    ec = {};
633  
    ec = {};
634  
    return size;
634  
    return size;
635  
}
635  
}
636  

636  

637  
inline std::error_code
637  
inline std::error_code
638  
epoll_socket::set_linger(bool enabled, int timeout) noexcept
638  
epoll_socket::set_linger(bool enabled, int timeout) noexcept
639  
{
639  
{
640  
    if (timeout < 0)
640  
    if (timeout < 0)
641  
        return make_err(EINVAL);
641  
        return make_err(EINVAL);
642  
    struct ::linger lg;
642  
    struct ::linger lg;
643  
    lg.l_onoff  = enabled ? 1 : 0;
643  
    lg.l_onoff  = enabled ? 1 : 0;
644  
    lg.l_linger = timeout;
644  
    lg.l_linger = timeout;
645  
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
645  
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
646  
        return make_err(errno);
646  
        return make_err(errno);
647  
    return {};
647  
    return {};
648  
}
648  
}
649  

649  

650  
inline tcp_socket::linger_options
650  
inline tcp_socket::linger_options
651  
epoll_socket::linger(std::error_code& ec) const noexcept
651  
epoll_socket::linger(std::error_code& ec) const noexcept
652  
{
652  
{
653  
    struct ::linger lg{};
653  
    struct ::linger lg{};
654  
    socklen_t len = sizeof(lg);
654  
    socklen_t len = sizeof(lg);
655  
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
655  
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
656  
    {
656  
    {
657  
        ec = make_err(errno);
657  
        ec = make_err(errno);
658  
        return {};
658  
        return {};
659  
    }
659  
    }
660  
    ec = {};
660  
    ec = {};
661  
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
661  
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
662  
}
662  
}
663  

663  

664  
inline void
664  
inline void
665  
epoll_socket::cancel() noexcept
665  
epoll_socket::cancel() noexcept
666  
{
666  
{
667  
    auto self = weak_from_this().lock();
667  
    auto self = weak_from_this().lock();
668  
    if (!self)
668  
    if (!self)
669  
        return;
669  
        return;
670  

670  

671  
    conn_.request_cancel();
671  
    conn_.request_cancel();
672  
    rd_.request_cancel();
672  
    rd_.request_cancel();
673  
    wr_.request_cancel();
673  
    wr_.request_cancel();
674  

674  

675  
    epoll_op* conn_claimed = nullptr;
675  
    epoll_op* conn_claimed = nullptr;
676  
    epoll_op* rd_claimed   = nullptr;
676  
    epoll_op* rd_claimed   = nullptr;
677  
    epoll_op* wr_claimed   = nullptr;
677  
    epoll_op* wr_claimed   = nullptr;
678  
    {
678  
    {
679  
        std::lock_guard lock(desc_state_.mutex);
679  
        std::lock_guard lock(desc_state_.mutex);
680  
        if (desc_state_.connect_op == &conn_)
680  
        if (desc_state_.connect_op == &conn_)
681  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
681  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
682  
        else
682  
        else
683  
            desc_state_.connect_cancel_pending = true;
683  
            desc_state_.connect_cancel_pending = true;
684  
        if (desc_state_.read_op == &rd_)
684  
        if (desc_state_.read_op == &rd_)
685  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
685  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
686  
        else
686  
        else
687  
            desc_state_.read_cancel_pending = true;
687  
            desc_state_.read_cancel_pending = true;
688  
        if (desc_state_.write_op == &wr_)
688  
        if (desc_state_.write_op == &wr_)
689  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
689  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
690  
        else
690  
        else
691  
            desc_state_.write_cancel_pending = true;
691  
            desc_state_.write_cancel_pending = true;
692  
    }
692  
    }
693  

693  

694  
    if (conn_claimed)
694  
    if (conn_claimed)
695  
    {
695  
    {
696  
        conn_.impl_ptr = self;
696  
        conn_.impl_ptr = self;
697  
        svc_.post(&conn_);
697  
        svc_.post(&conn_);
698  
        svc_.work_finished();
698  
        svc_.work_finished();
699  
    }
699  
    }
700  
    if (rd_claimed)
700  
    if (rd_claimed)
701  
    {
701  
    {
702  
        rd_.impl_ptr = self;
702  
        rd_.impl_ptr = self;
703  
        svc_.post(&rd_);
703  
        svc_.post(&rd_);
704  
        svc_.work_finished();
704  
        svc_.work_finished();
705  
    }
705  
    }
706  
    if (wr_claimed)
706  
    if (wr_claimed)
707  
    {
707  
    {
708  
        wr_.impl_ptr = self;
708  
        wr_.impl_ptr = self;
709  
        svc_.post(&wr_);
709  
        svc_.post(&wr_);
710  
        svc_.work_finished();
710  
        svc_.work_finished();
711  
    }
711  
    }
712  
}
712  
}
713  

713  

714  
inline void
714  
inline void
715  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
715  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
716  
{
716  
{
717  
    auto self = weak_from_this().lock();
717  
    auto self = weak_from_this().lock();
718  
    if (!self)
718  
    if (!self)
719  
        return;
719  
        return;
720  

720  

721  
    op.request_cancel();
721  
    op.request_cancel();
722  

722  

723  
    epoll_op** desc_op_ptr = nullptr;
723  
    epoll_op** desc_op_ptr = nullptr;
724  
    if (&op == &conn_)
724  
    if (&op == &conn_)
725  
        desc_op_ptr = &desc_state_.connect_op;
725  
        desc_op_ptr = &desc_state_.connect_op;
726  
    else if (&op == &rd_)
726  
    else if (&op == &rd_)
727  
        desc_op_ptr = &desc_state_.read_op;
727  
        desc_op_ptr = &desc_state_.read_op;
728  
    else if (&op == &wr_)
728  
    else if (&op == &wr_)
729  
        desc_op_ptr = &desc_state_.write_op;
729  
        desc_op_ptr = &desc_state_.write_op;
730  

730  

731  
    if (desc_op_ptr)
731  
    if (desc_op_ptr)
732  
    {
732  
    {
733  
        epoll_op* claimed = nullptr;
733  
        epoll_op* claimed = nullptr;
734  
        {
734  
        {
735  
            std::lock_guard lock(desc_state_.mutex);
735  
            std::lock_guard lock(desc_state_.mutex);
736  
            if (*desc_op_ptr == &op)
736  
            if (*desc_op_ptr == &op)
737  
                claimed = std::exchange(*desc_op_ptr, nullptr);
737  
                claimed = std::exchange(*desc_op_ptr, nullptr);
738  
            else if (&op == &conn_)
738  
            else if (&op == &conn_)
739  
                desc_state_.connect_cancel_pending = true;
739  
                desc_state_.connect_cancel_pending = true;
740  
            else if (&op == &rd_)
740  
            else if (&op == &rd_)
741  
                desc_state_.read_cancel_pending = true;
741  
                desc_state_.read_cancel_pending = true;
742  
            else if (&op == &wr_)
742  
            else if (&op == &wr_)
743  
                desc_state_.write_cancel_pending = true;
743  
                desc_state_.write_cancel_pending = true;
744  
        }
744  
        }
745  
        if (claimed)
745  
        if (claimed)
746  
        {
746  
        {
747  
            op.impl_ptr = self;
747  
            op.impl_ptr = self;
748  
            svc_.post(&op);
748  
            svc_.post(&op);
749  
            svc_.work_finished();
749  
            svc_.work_finished();
750  
        }
750  
        }
751  
    }
751  
    }
752  
}
752  
}
753  

753  

754  
inline void
754  
inline void
755  
epoll_socket::close_socket() noexcept
755  
epoll_socket::close_socket() noexcept
756  
{
756  
{
757  
    auto self = weak_from_this().lock();
757  
    auto self = weak_from_this().lock();
758  
    if (self)
758  
    if (self)
759  
    {
759  
    {
760  
        conn_.request_cancel();
760  
        conn_.request_cancel();
761  
        rd_.request_cancel();
761  
        rd_.request_cancel();
762  
        wr_.request_cancel();
762  
        wr_.request_cancel();
763  

763  

764  
        epoll_op* conn_claimed = nullptr;
764  
        epoll_op* conn_claimed = nullptr;
765  
        epoll_op* rd_claimed   = nullptr;
765  
        epoll_op* rd_claimed   = nullptr;
766  
        epoll_op* wr_claimed   = nullptr;
766  
        epoll_op* wr_claimed   = nullptr;
767  
        {
767  
        {
768  
            std::lock_guard lock(desc_state_.mutex);
768  
            std::lock_guard lock(desc_state_.mutex);
769  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
769  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
770  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
770  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
771  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
771  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
772  
            desc_state_.read_ready             = false;
772  
            desc_state_.read_ready             = false;
773  
            desc_state_.write_ready            = false;
773  
            desc_state_.write_ready            = false;
774  
            desc_state_.read_cancel_pending    = false;
774  
            desc_state_.read_cancel_pending    = false;
775  
            desc_state_.write_cancel_pending   = false;
775  
            desc_state_.write_cancel_pending   = false;
776  
            desc_state_.connect_cancel_pending = false;
776  
            desc_state_.connect_cancel_pending = false;
777  
        }
777  
        }
778  

778  

779  
        if (conn_claimed)
779  
        if (conn_claimed)
780  
        {
780  
        {
781  
            conn_.impl_ptr = self;
781  
            conn_.impl_ptr = self;
782  
            svc_.post(&conn_);
782  
            svc_.post(&conn_);
783  
            svc_.work_finished();
783  
            svc_.work_finished();
784  
        }
784  
        }
785  
        if (rd_claimed)
785  
        if (rd_claimed)
786  
        {
786  
        {
787  
            rd_.impl_ptr = self;
787  
            rd_.impl_ptr = self;
788  
            svc_.post(&rd_);
788  
            svc_.post(&rd_);
789  
            svc_.work_finished();
789  
            svc_.work_finished();
790  
        }
790  
        }
791  
        if (wr_claimed)
791  
        if (wr_claimed)
792  
        {
792  
        {
793  
            wr_.impl_ptr = self;
793  
            wr_.impl_ptr = self;
794  
            svc_.post(&wr_);
794  
            svc_.post(&wr_);
795  
            svc_.work_finished();
795  
            svc_.work_finished();
796  
        }
796  
        }
797  

797  

798  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
798  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
799  
            desc_state_.impl_ref_ = self;
799  
            desc_state_.impl_ref_ = self;
800  
    }
800  
    }
801  

801  

802  
    if (fd_ >= 0)
802  
    if (fd_ >= 0)
803  
    {
803  
    {
804  
        if (desc_state_.registered_events != 0)
804  
        if (desc_state_.registered_events != 0)
805  
            svc_.scheduler().deregister_descriptor(fd_);
805  
            svc_.scheduler().deregister_descriptor(fd_);
806  
        ::close(fd_);
806  
        ::close(fd_);
807  
        fd_ = -1;
807  
        fd_ = -1;
808  
    }
808  
    }
809  

809  

810  
    desc_state_.fd                = -1;
810  
    desc_state_.fd                = -1;
811  
    desc_state_.registered_events = 0;
811  
    desc_state_.registered_events = 0;
812  

812  

813  
    local_endpoint_  = endpoint{};
813  
    local_endpoint_  = endpoint{};
814  
    remote_endpoint_ = endpoint{};
814  
    remote_endpoint_ = endpoint{};
815  
}
815  
}
816  

816  

817  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
817  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
818  
    : state_(
818  
    : state_(
819  
          std::make_unique<epoll_socket_state>(
819  
          std::make_unique<epoll_socket_state>(
820  
              ctx.use_service<epoll_scheduler>()))
820  
              ctx.use_service<epoll_scheduler>()))
821  
{
821  
{
822  
}
822  
}
823  

823  

824  
inline epoll_socket_service::~epoll_socket_service() {}
824  
inline epoll_socket_service::~epoll_socket_service() {}
825  

825  

826  
inline void
826  
inline void
827  
epoll_socket_service::shutdown()
827  
epoll_socket_service::shutdown()
828  
{
828  
{
829  
    std::lock_guard lock(state_->mutex_);
829  
    std::lock_guard lock(state_->mutex_);
830  

830  

831  
    while (auto* impl = state_->socket_list_.pop_front())
831  
    while (auto* impl = state_->socket_list_.pop_front())
832  
        impl->close_socket();
832  
        impl->close_socket();
833  

833  

834  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
834  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
835  
    // drains completed_ops_, calling destroy() on each queued op. If we
835  
    // drains completed_ops_, calling destroy() on each queued op. If we
836  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
836  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
837  
    // last ref to an impl whose embedded descriptor_state is still linked
837  
    // last ref to an impl whose embedded descriptor_state is still linked
838  
    // in the queue — use-after-free on the next pop(). Letting ~state_
838  
    // in the queue — use-after-free on the next pop(). Letting ~state_
839  
    // release the ptrs (during service destruction, after scheduler
839  
    // release the ptrs (during service destruction, after scheduler
840  
    // shutdown) keeps every impl alive until all ops have been drained.
840  
    // shutdown) keeps every impl alive until all ops have been drained.
841  
}
841  
}
842  

842  

843  
inline io_object::implementation*
843  
inline io_object::implementation*
844  
epoll_socket_service::construct()
844  
epoll_socket_service::construct()
845  
{
845  
{
846  
    auto impl = std::make_shared<epoll_socket>(*this);
846  
    auto impl = std::make_shared<epoll_socket>(*this);
847  
    auto* raw = impl.get();
847  
    auto* raw = impl.get();
848  

848  

849  
    {
849  
    {
850  
        std::lock_guard lock(state_->mutex_);
850  
        std::lock_guard lock(state_->mutex_);
851  
        state_->socket_list_.push_back(raw);
851  
        state_->socket_list_.push_back(raw);
852  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
852  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
853  
    }
853  
    }
854  

854  

855  
    return raw;
855  
    return raw;
856  
}
856  
}
857  

857  

858  
inline void
858  
inline void
859  
epoll_socket_service::destroy(io_object::implementation* impl)
859  
epoll_socket_service::destroy(io_object::implementation* impl)
860  
{
860  
{
861  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
861  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
862  
    epoll_impl->close_socket();
862  
    epoll_impl->close_socket();
863  
    std::lock_guard lock(state_->mutex_);
863  
    std::lock_guard lock(state_->mutex_);
864  
    state_->socket_list_.remove(epoll_impl);
864  
    state_->socket_list_.remove(epoll_impl);
865  
    state_->socket_ptrs_.erase(epoll_impl);
865  
    state_->socket_ptrs_.erase(epoll_impl);
866  
}
866  
}
867  

867  

868  
inline std::error_code
868  
inline std::error_code
869  
epoll_socket_service::open_socket(tcp_socket::implementation& impl)
869  
epoll_socket_service::open_socket(tcp_socket::implementation& impl)
870  
{
870  
{
871  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
871  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
872  
    epoll_impl->close_socket();
872  
    epoll_impl->close_socket();
873  

873  

874  
    int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
874  
    int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
875  
    if (fd < 0)
875  
    if (fd < 0)
876  
        return make_err(errno);
876  
        return make_err(errno);
877  

877  

878  
    epoll_impl->fd_ = fd;
878  
    epoll_impl->fd_ = fd;
879  

879  

880  
    // Register fd with epoll (edge-triggered mode)
880  
    // Register fd with epoll (edge-triggered mode)
881  
    epoll_impl->desc_state_.fd = fd;
881  
    epoll_impl->desc_state_.fd = fd;
882  
    {
882  
    {
883  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
883  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
884  
        epoll_impl->desc_state_.read_op    = nullptr;
884  
        epoll_impl->desc_state_.read_op    = nullptr;
885  
        epoll_impl->desc_state_.write_op   = nullptr;
885  
        epoll_impl->desc_state_.write_op   = nullptr;
886  
        epoll_impl->desc_state_.connect_op = nullptr;
886  
        epoll_impl->desc_state_.connect_op = nullptr;
887  
    }
887  
    }
888  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
888  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
889  

889  

890  
    return {};
890  
    return {};
891  
}
891  
}
892  

892  

893  
inline void
893  
inline void
894  
epoll_socket_service::close(io_object::handle& h)
894  
epoll_socket_service::close(io_object::handle& h)
895  
{
895  
{
896  
    static_cast<epoll_socket*>(h.get())->close_socket();
896  
    static_cast<epoll_socket*>(h.get())->close_socket();
897  
}
897  
}
898  

898  

899  
inline void
899  
inline void
900  
epoll_socket_service::post(epoll_op* op)
900  
epoll_socket_service::post(epoll_op* op)
901  
{
901  
{
902  
    state_->sched_.post(op);
902  
    state_->sched_.post(op);
903  
}
903  
}
904  

904  

905  
inline void
905  
inline void
906  
epoll_socket_service::work_started() noexcept
906  
epoll_socket_service::work_started() noexcept
907  
{
907  
{
908  
    state_->sched_.work_started();
908  
    state_->sched_.work_started();
909  
}
909  
}
910  

910  

911  
inline void
911  
inline void
912  
epoll_socket_service::work_finished() noexcept
912  
epoll_socket_service::work_finished() noexcept
913  
{
913  
{
914  
    state_->sched_.work_finished();
914  
    state_->sched_.work_finished();
915  
}
915  
}
916  

916  

917  
} // namespace boost::corosio::detail
917  
} // namespace boost::corosio::detail
918  

918  

919  
#endif // BOOST_COROSIO_HAS_EPOLL
919  
#endif // BOOST_COROSIO_HAS_EPOLL
920  

920  

921  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
921  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP