include/boost/corosio/native/detail/posix/posix_resolver_service.hpp

81.7% Lines (236/289) 93.8% Functions (30/32)
include/boost/corosio/native/detail/posix/posix_resolver_service.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RESOLVER_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RESOLVER_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_POSIX
16
17 #include <boost/corosio/native/detail/posix/posix_resolver.hpp>
18
19 namespace boost::corosio::detail {
20
21 /** Resolver service for POSIX backends.
22
23 Owns all posix_resolver instances and tracks active worker
24 threads for safe shutdown synchronization.
25 */
26 class BOOST_COROSIO_DECL posix_resolver_service final
27 : public capy::execution_context::service
28 , public io_object::io_service
29 {
30 public:
31 using key_type = posix_resolver_service;
32
33 340 posix_resolver_service(capy::execution_context&, scheduler& sched)
34 340 : sched_(&sched)
35 {
36 340 }
37
38 680 ~posix_resolver_service() override = default;
39
40 posix_resolver_service(posix_resolver_service const&) = delete;
41 posix_resolver_service& operator=(posix_resolver_service const&) = delete;
42
43 io_object::implementation* construct() override;
44
45 29 void destroy(io_object::implementation* p) override
46 {
47 29 auto& impl = static_cast<posix_resolver&>(*p);
48 29 impl.cancel();
49 29 destroy_impl(impl);
50 29 }
51
52 void shutdown() override;
53 void destroy_impl(posix_resolver& impl);
54
55 void post(scheduler_op* op);
56 void work_started() noexcept;
57 void work_finished() noexcept;
58
59 void thread_started() noexcept;
60 void thread_finished() noexcept;
61 bool is_shutting_down() const noexcept;
62
63 private:
64 scheduler* sched_;
65 std::mutex mutex_;
66 std::condition_variable cv_;
67 std::atomic<bool> shutting_down_{false};
68 std::size_t active_threads_ = 0;
69 intrusive_list<posix_resolver> resolver_list_;
70 std::unordered_map<posix_resolver*, std::shared_ptr<posix_resolver>>
71 resolver_ptrs_;
72 };
73
74 /** Get or create the resolver service for the given context.
75
76 This function is called by the concrete scheduler during initialization
77 to create the resolver service with a reference to itself.
78
79 @param ctx Reference to the owning execution_context.
80 @param sched Reference to the scheduler for posting completions.
81 @return Reference to the resolver service.
82 */
83 posix_resolver_service&
84 get_resolver_service(capy::execution_context& ctx, scheduler& sched);
85
86 // ---------------------------------------------------------------------------
87 // Inline implementation
88 // ---------------------------------------------------------------------------
89
90 // posix_resolver_detail helpers
91
92 inline int
93 16 posix_resolver_detail::flags_to_hints(resolve_flags flags)
94 {
95 16 int hints = 0;
96
97 16 if ((flags & resolve_flags::passive) != resolve_flags::none)
98 hints |= AI_PASSIVE;
99 16 if ((flags & resolve_flags::numeric_host) != resolve_flags::none)
100 11 hints |= AI_NUMERICHOST;
101 16 if ((flags & resolve_flags::numeric_service) != resolve_flags::none)
102 8 hints |= AI_NUMERICSERV;
103 16 if ((flags & resolve_flags::address_configured) != resolve_flags::none)
104 hints |= AI_ADDRCONFIG;
105 16 if ((flags & resolve_flags::v4_mapped) != resolve_flags::none)
106 hints |= AI_V4MAPPED;
107 16 if ((flags & resolve_flags::all_matching) != resolve_flags::none)
108 hints |= AI_ALL;
109
110 16 return hints;
111 }
112
113 inline int
114 10 posix_resolver_detail::flags_to_ni_flags(reverse_flags flags)
115 {
116 10 int ni_flags = 0;
117
118 10 if ((flags & reverse_flags::numeric_host) != reverse_flags::none)
119 5 ni_flags |= NI_NUMERICHOST;
120 10 if ((flags & reverse_flags::numeric_service) != reverse_flags::none)
121 5 ni_flags |= NI_NUMERICSERV;
122 10 if ((flags & reverse_flags::name_required) != reverse_flags::none)
123 1 ni_flags |= NI_NAMEREQD;
124 10 if ((flags & reverse_flags::datagram_service) != reverse_flags::none)
125 ni_flags |= NI_DGRAM;
126
127 10 return ni_flags;
128 }
129
130 inline resolver_results
131 13 posix_resolver_detail::convert_results(
132 struct addrinfo* ai, std::string_view host, std::string_view service)
133 {
134 13 std::vector<resolver_entry> entries;
135 13 entries.reserve(4); // Most lookups return 1-4 addresses
136
137 26 for (auto* p = ai; p != nullptr; p = p->ai_next)
138 {
139 13 if (p->ai_family == AF_INET)
140 {
141 11 auto* addr = reinterpret_cast<sockaddr_in*>(p->ai_addr);
142 11 auto ep = from_sockaddr_in(*addr);
143 11 entries.emplace_back(ep, host, service);
144 }
145 2 else if (p->ai_family == AF_INET6)
146 {
147 2 auto* addr = reinterpret_cast<sockaddr_in6*>(p->ai_addr);
148 2 auto ep = from_sockaddr_in6(*addr);
149 2 entries.emplace_back(ep, host, service);
150 }
151 }
152
153 26 return resolver_results(std::move(entries));
154 13 }
155
156 inline std::error_code
157 4 posix_resolver_detail::make_gai_error(int gai_err)
158 {
159 // Map GAI errors to appropriate generic error codes
160 4 switch (gai_err)
161 {
162 case EAI_AGAIN:
163 // Temporary failure - try again later
164 return std::error_code(
165 static_cast<int>(std::errc::resource_unavailable_try_again),
166 std::generic_category());
167
168 case EAI_BADFLAGS:
169 // Invalid flags
170 return std::error_code(
171 static_cast<int>(std::errc::invalid_argument),
172 std::generic_category());
173
174 case EAI_FAIL:
175 // Non-recoverable failure
176 return std::error_code(
177 static_cast<int>(std::errc::io_error), std::generic_category());
178
179 case EAI_FAMILY:
180 // Address family not supported
181 return std::error_code(
182 static_cast<int>(std::errc::address_family_not_supported),
183 std::generic_category());
184
185 case EAI_MEMORY:
186 // Memory allocation failure
187 return std::error_code(
188 static_cast<int>(std::errc::not_enough_memory),
189 std::generic_category());
190
191 4 case EAI_NONAME:
192 // Host or service not found
193 4 return std::error_code(
194 static_cast<int>(std::errc::no_such_device_or_address),
195 4 std::generic_category());
196
197 case EAI_SERVICE:
198 // Service not supported for socket type
199 return std::error_code(
200 static_cast<int>(std::errc::invalid_argument),
201 std::generic_category());
202
203 case EAI_SOCKTYPE:
204 // Socket type not supported
205 return std::error_code(
206 static_cast<int>(std::errc::not_supported),
207 std::generic_category());
208
209 case EAI_SYSTEM:
210 // System error - use errno
211 return std::error_code(errno, std::generic_category());
212
213 default:
214 // Unknown error
215 return std::error_code(
216 static_cast<int>(std::errc::io_error), std::generic_category());
217 }
218 }
219
220 // posix_resolver
221
222 29 inline posix_resolver::posix_resolver(posix_resolver_service& svc) noexcept
223 29 : svc_(svc)
224 {
225 29 }
226
227 // posix_resolver::resolve_op implementation
228
229 inline void
230 16 posix_resolver::resolve_op::reset() noexcept
231 {
232 16 host.clear();
233 16 service.clear();
234 16 flags = resolve_flags::none;
235 16 stored_results = resolver_results{};
236 16 gai_error = 0;
237 16 cancelled.store(false, std::memory_order_relaxed);
238 16 stop_cb.reset();
239 16 ec_out = nullptr;
240 16 out = nullptr;
241 16 }
242
243 inline void
244 16 posix_resolver::resolve_op::operator()()
245 {
246 16 stop_cb.reset(); // Disconnect stop callback
247
248 16 bool const was_cancelled = cancelled.load(std::memory_order_acquire);
249
250 16 if (ec_out)
251 {
252 16 if (was_cancelled)
253 *ec_out = capy::error::canceled;
254 16 else if (gai_error != 0)
255 3 *ec_out = posix_resolver_detail::make_gai_error(gai_error);
256 else
257 13 *ec_out = {}; // Clear on success
258 }
259
260 16 if (out && !was_cancelled && gai_error == 0)
261 13 *out = std::move(stored_results);
262
263 16 impl->svc_.work_finished();
264 16 dispatch_coro(ex, h).resume();
265 16 }
266
267 inline void
268 posix_resolver::resolve_op::destroy()
269 {
270 stop_cb.reset();
271 }
272
273 inline void
274 33 posix_resolver::resolve_op::request_cancel() noexcept
275 {
276 33 cancelled.store(true, std::memory_order_release);
277 33 }
278
279 inline void
280 // NOLINTNEXTLINE(performance-unnecessary-value-param)
281 16 posix_resolver::resolve_op::start(std::stop_token token)
282 {
283 16 cancelled.store(false, std::memory_order_release);
284 16 stop_cb.reset();
285
286 16 if (token.stop_possible())
287 stop_cb.emplace(token, canceller{this});
288 16 }
289
290 // posix_resolver::reverse_resolve_op implementation
291
292 inline void
293 10 posix_resolver::reverse_resolve_op::reset() noexcept
294 {
295 10 ep = endpoint{};
296 10 flags = reverse_flags::none;
297 10 stored_host.clear();
298 10 stored_service.clear();
299 10 gai_error = 0;
300 10 cancelled.store(false, std::memory_order_relaxed);
301 10 stop_cb.reset();
302 10 ec_out = nullptr;
303 10 result_out = nullptr;
304 10 }
305
306 inline void
307 10 posix_resolver::reverse_resolve_op::operator()()
308 {
309 10 stop_cb.reset(); // Disconnect stop callback
310
311 10 bool const was_cancelled = cancelled.load(std::memory_order_acquire);
312
313 10 if (ec_out)
314 {
315 10 if (was_cancelled)
316 *ec_out = capy::error::canceled;
317 10 else if (gai_error != 0)
318 1 *ec_out = posix_resolver_detail::make_gai_error(gai_error);
319 else
320 9 *ec_out = {}; // Clear on success
321 }
322
323 10 if (result_out && !was_cancelled && gai_error == 0)
324 {
325 27 *result_out = reverse_resolver_result(
326 27 ep, std::move(stored_host), std::move(stored_service));
327 }
328
329 10 impl->svc_.work_finished();
330 10 dispatch_coro(ex, h).resume();
331 10 }
332
333 inline void
334 posix_resolver::reverse_resolve_op::destroy()
335 {
336 stop_cb.reset();
337 }
338
339 inline void
340 33 posix_resolver::reverse_resolve_op::request_cancel() noexcept
341 {
342 33 cancelled.store(true, std::memory_order_release);
343 33 }
344
345 inline void
346 // NOLINTNEXTLINE(performance-unnecessary-value-param)
347 10 posix_resolver::reverse_resolve_op::start(std::stop_token token)
348 {
349 10 cancelled.store(false, std::memory_order_release);
350 10 stop_cb.reset();
351
352 10 if (token.stop_possible())
353 stop_cb.emplace(token, canceller{this});
354 10 }
355
356 // posix_resolver implementation
357
358 inline std::coroutine_handle<>
359 16 posix_resolver::resolve(
360 std::coroutine_handle<> h,
361 capy::executor_ref ex,
362 std::string_view host,
363 std::string_view service,
364 resolve_flags flags,
365 std::stop_token token,
366 std::error_code* ec,
367 resolver_results* out)
368 {
369 16 auto& op = op_;
370 16 op.reset();
371 16 op.h = h;
372 16 op.ex = ex;
373 16 op.impl = this;
374 16 op.ec_out = ec;
375 16 op.out = out;
376 16 op.host = host;
377 16 op.service = service;
378 16 op.flags = flags;
379 16 op.start(token);
380
381 // Keep io_context alive while resolution is pending
382 16 op.ex.on_work_started();
383
384 // Track thread for safe shutdown
385 16 svc_.thread_started();
386
387 try
388 {
389 // Prevent impl destruction while worker thread is running
390 16 auto self = this->shared_from_this();
391 32 std::thread worker([this, self = std::move(self)]() {
392 16 struct addrinfo hints{};
393 16 hints.ai_family = AF_UNSPEC;
394 16 hints.ai_socktype = SOCK_STREAM;
395 16 hints.ai_flags = posix_resolver_detail::flags_to_hints(op_.flags);
396
397 16 struct addrinfo* ai = nullptr;
398 48 int result = ::getaddrinfo(
399 32 op_.host.empty() ? nullptr : op_.host.c_str(),
400 32 op_.service.empty() ? nullptr : op_.service.c_str(), &hints,
401 &ai);
402
403 16 if (!op_.cancelled.load(std::memory_order_acquire))
404 {
405 16 if (result == 0 && ai)
406 {
407 26 op_.stored_results = posix_resolver_detail::convert_results(
408 26 ai, op_.host, op_.service);
409 13 op_.gai_error = 0;
410 }
411 else
412 {
413 3 op_.gai_error = result;
414 }
415 }
416
417 16 if (ai)
418 13 ::freeaddrinfo(ai);
419
420 // Always post so the scheduler can properly drain the op
421 // during shutdown via destroy().
422 16 svc_.post(&op_);
423
424 // Signal thread completion for shutdown synchronization
425 16 svc_.thread_finished();
426 32 });
427 16 worker.detach();
428 16 }
429 catch (std::system_error const&)
430 {
431 // Thread creation failed - no thread was started
432 svc_.thread_finished();
433
434 // Set error and post completion to avoid hanging the coroutine
435 op_.gai_error = EAI_MEMORY; // Map to "not enough memory"
436 svc_.post(&op_);
437 }
438 16 return std::noop_coroutine();
439 }
440
441 inline std::coroutine_handle<>
442 10 posix_resolver::reverse_resolve(
443 std::coroutine_handle<> h,
444 capy::executor_ref ex,
445 endpoint const& ep,
446 reverse_flags flags,
447 std::stop_token token,
448 std::error_code* ec,
449 reverse_resolver_result* result_out)
450 {
451 10 auto& op = reverse_op_;
452 10 op.reset();
453 10 op.h = h;
454 10 op.ex = ex;
455 10 op.impl = this;
456 10 op.ec_out = ec;
457 10 op.result_out = result_out;
458 10 op.ep = ep;
459 10 op.flags = flags;
460 10 op.start(token);
461
462 // Keep io_context alive while resolution is pending
463 10 op.ex.on_work_started();
464
465 // Track thread for safe shutdown
466 10 svc_.thread_started();
467
468 try
469 {
470 // Prevent impl destruction while worker thread is running
471 10 auto self = this->shared_from_this();
472 20 std::thread worker([this, self = std::move(self)]() {
473 // Build sockaddr from endpoint
474 10 sockaddr_storage ss{};
475 socklen_t ss_len;
476
477 10 if (reverse_op_.ep.is_v4())
478 {
479 8 auto sa = to_sockaddr_in(reverse_op_.ep);
480 8 std::memcpy(&ss, &sa, sizeof(sa));
481 8 ss_len = sizeof(sockaddr_in);
482 }
483 else
484 {
485 2 auto sa = to_sockaddr_in6(reverse_op_.ep);
486 2 std::memcpy(&ss, &sa, sizeof(sa));
487 2 ss_len = sizeof(sockaddr_in6);
488 }
489
490 char host[NI_MAXHOST];
491 char service[NI_MAXSERV];
492
493 10 int result = ::getnameinfo(
494 reinterpret_cast<sockaddr*>(&ss), ss_len, host, sizeof(host),
495 service, sizeof(service),
496 posix_resolver_detail::flags_to_ni_flags(reverse_op_.flags));
497
498 10 if (!reverse_op_.cancelled.load(std::memory_order_acquire))
499 {
500 10 if (result == 0)
501 {
502 9 reverse_op_.stored_host = host;
503 9 reverse_op_.stored_service = service;
504 9 reverse_op_.gai_error = 0;
505 }
506 else
507 {
508 1 reverse_op_.gai_error = result;
509 }
510 }
511
512 // Always post so the scheduler can properly drain the op
513 // during shutdown via destroy().
514 10 svc_.post(&reverse_op_);
515
516 // Signal thread completion for shutdown synchronization
517 10 svc_.thread_finished();
518 20 });
519 10 worker.detach();
520 10 }
521 catch (std::system_error const&)
522 {
523 // Thread creation failed - no thread was started
524 svc_.thread_finished();
525
526 // Set error and post completion to avoid hanging the coroutine
527 reverse_op_.gai_error = EAI_MEMORY;
528 svc_.post(&reverse_op_);
529 }
530 10 return std::noop_coroutine();
531 }
532
533 inline void
534 33 posix_resolver::cancel() noexcept
535 {
536 33 op_.request_cancel();
537 33 reverse_op_.request_cancel();
538 33 }
539
540 // posix_resolver_service implementation
541
542 inline void
543 340 posix_resolver_service::shutdown()
544 {
545 {
546 340 std::lock_guard<std::mutex> lock(mutex_);
547
548 // Signal threads to not access service after getaddrinfo returns
549 340 shutting_down_.store(true, std::memory_order_release);
550
551 // Cancel all resolvers (sets cancelled flag checked by threads)
552 340 for (auto* impl = resolver_list_.pop_front(); impl != nullptr;
553 impl = resolver_list_.pop_front())
554 {
555 impl->cancel();
556 }
557
558 // Clear the map which releases shared_ptrs
559 340 resolver_ptrs_.clear();
560 340 }
561
562 // Wait for all worker threads to finish before service is destroyed
563 {
564 340 std::unique_lock<std::mutex> lock(mutex_);
565 680 cv_.wait(lock, [this] { return active_threads_ == 0; });
566 340 }
567 340 }
568
569 inline io_object::implementation*
570 29 posix_resolver_service::construct()
571 {
572 29 auto ptr = std::make_shared<posix_resolver>(*this);
573 29 auto* impl = ptr.get();
574
575 {
576 29 std::lock_guard<std::mutex> lock(mutex_);
577 29 resolver_list_.push_back(impl);
578 29 resolver_ptrs_[impl] = std::move(ptr);
579 29 }
580
581 29 return impl;
582 29 }
583
584 inline void
585 29 posix_resolver_service::destroy_impl(posix_resolver& impl)
586 {
587 29 std::lock_guard<std::mutex> lock(mutex_);
588 29 resolver_list_.remove(&impl);
589 29 resolver_ptrs_.erase(&impl);
590 29 }
591
592 inline void
593 26 posix_resolver_service::post(scheduler_op* op)
594 {
595 26 sched_->post(op);
596 26 }
597
598 inline void
599 posix_resolver_service::work_started() noexcept
600 {
601 sched_->work_started();
602 }
603
604 inline void
605 26 posix_resolver_service::work_finished() noexcept
606 {
607 26 sched_->work_finished();
608 26 }
609
610 inline void
611 26 posix_resolver_service::thread_started() noexcept
612 {
613 26 std::lock_guard<std::mutex> lock(mutex_);
614 26 ++active_threads_;
615 26 }
616
617 inline void
618 26 posix_resolver_service::thread_finished() noexcept
619 {
620 26 std::lock_guard<std::mutex> lock(mutex_);
621 26 --active_threads_;
622 26 cv_.notify_one();
623 26 }
624
625 inline bool
626 posix_resolver_service::is_shutting_down() const noexcept
627 {
628 return shutting_down_.load(std::memory_order_acquire);
629 }
630
631 // Free function to get/create the resolver service
632
633 inline posix_resolver_service&
634 340 get_resolver_service(capy::execution_context& ctx, scheduler& sched)
635 {
636 340 return ctx.make_service<posix_resolver_service>(sched);
637 }
638
639 } // namespace boost::corosio::detail
640
641 #endif // BOOST_COROSIO_POSIX
642
643 #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_RESOLVER_SERVICE_HPP
644