Xclox
C++11 header-only cross-platform date and time library with an asynchronous NTP client
query_series.h
1 /*
2  * Copyright (c) 2024 Abdullatif Kalla.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE.txt file in the root directory of this source tree.
6  */
7 
8 #include "xclox/ntp/query_series.hpp"
9 
10 #include "tools/server.hpp"
11 #include "tools/tracer.hpp"
12 
13 #include "tools/helper.hpp"
14 
15 using namespace xclox::ntp;
16 using namespace std::chrono;
17 
18 TEST_SUITE("QuerySeries")
19 {
20  struct Context {
21  Context()
22  : server1(32101, serverTracer1.callable())
23  , server2(32102, serverTracer2.callable())
24  , server3(32103, serverTracer3.callable())
25  {
26  }
27  Tracer<asio::ip::udp::endpoint, asio::error_code, const uint8_t*, size_t> serverTracer1, serverTracer2, serverTracer3;
28  Tracer<asio::ip::udp::endpoint, asio::error_code, Packet, steady_clock::duration> queryTracer;
29  Server server1, server2, server3;
30  asio::io_context io;
31  };
32 
33  TEST_CASE_FIXTURE(Context, "no callback or endpoints")
34  {
35  QuerySeries::start(io, asio::ip::udp::resolver::results_type(), {});
36  QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(server1.endpoint(), "", ""), {});
37  QuerySeries::start(io, asio::ip::udp::resolver::results_type(), queryTracer.callable());
38  CHECK(io.run() == 0);
39  }
40 
41  TEST_CASE_FIXTURE(Context, "single query fails")
42  {
43  QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(broadcastEndpoint, "", ""), queryTracer.callable());
44  io.run();
45  CHECK(queryTracer.counter() == 1);
46  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
47  return endpoint == broadcastEndpoint && error == asio::error::access_denied && isClientPacket(packet) && compare(rtt, milliseconds(1));
48  }) == 1);
49  }
50 
51  TEST_CASE_FIXTURE(Context, "single query succeeds")
52  {
53  server1.replay();
54  QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(server1.endpoint(), "", ""), queryTracer.callable());
55  io.run();
56  CHECK(queryTracer.counter() == 1);
57  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
58  return endpoint == server1.endpoint() && !error && isClientPacket(packet) && compare(rtt, milliseconds(1));
59  }) == 1);
60  }
61 
62  TEST_CASE_FIXTURE(Context, "two queries fail")
63  {
64  const auto& localEndpoint = asio::ip::udp::endpoint(asio::ip::make_address("0.0.0.0"), 1234);
65  std::vector<asio::ip::udp::endpoint> endpointList { localEndpoint, broadcastEndpoint };
66  QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(endpointList.begin(), endpointList.end(), "", ""), queryTracer.callable());
67  io.run();
68  CHECK(queryTracer.counter() == 1);
69  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
70  return endpoint == broadcastEndpoint && error == asio::error::access_denied && isClientPacket(packet) && compare(rtt, milliseconds(1));
71  }) == 1);
72  }
73 
74  TEST_CASE_FIXTURE(Context, "first query fails, second query times out, third query succeeds")
75  {
76  uint8_t data { 1 };
77  server1.replay(&data, 0);
78  server2.receive();
79  server3.replay();
80  std::vector<asio::ip::udp::endpoint> endpointList { server1.endpoint(), server2.endpoint(), server3.endpoint() };
81  QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(endpointList.begin(), endpointList.end(), "", ""), queryTracer.callable());
82  io.run();
83  CHECK(queryTracer.counter() == 1);
84  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
85  return endpoint == server3.endpoint() && !error && isClientPacket(packet) && compare(rtt, milliseconds(1));
86  }) == 1);
87  CHECK(serverTracer1.wait(2) == 2);
88  CHECK(serverTracer2.wait(1) == 1);
89  CHECK(serverTracer3.wait(2) == 2);
90  }
91 
92  TEST_CASE_FIXTURE(Context, "traceable")
93  {
94  SUBCASE("single-target")
95  {
96  auto query = QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(broadcastEndpoint, "", ""), queryTracer.callable());
97  CHECK_FALSE(query.expired());
98  io.run();
99  CHECK(queryTracer.counter() == 1);
100  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
101  return endpoint == broadcastEndpoint && error == asio::error::access_denied && !packet.isNull() && compare(rtt, milliseconds(1));
102  }) == 1);
103  CHECK(query.expired());
104  }
105  SUBCASE("multi-target")
106  {
107  server1.replay();
108  std::vector<asio::ip::udp::endpoint> endpointList { server1.endpoint(), broadcastEndpoint };
109  auto query = QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(endpointList.begin(), endpointList.end(), "", ""), queryTracer.callable());
110  CHECK_FALSE(query.expired());
111  io.run();
112  CHECK(serverTracer1.wait(2) == 2);
113  CHECK(queryTracer.counter() == 1);
114  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
115  return endpoint == server1.endpoint() && !error && !packet.isNull() && compare(rtt, milliseconds(1));
116  }) == 1);
117  CHECK(query.expired());
118  }
119  SUBCASE("3-target")
120  {
121  const auto& someEndpoint = asio::ip::udp::endpoint(asio::ip::make_address("254.254.254.254"), 1234);
122  server1.replay();
123  std::vector<asio::ip::udp::endpoint> endpointList { broadcastEndpoint, server1.endpoint(), someEndpoint };
124  auto query = QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(endpointList.begin(), endpointList.end(), "", ""), queryTracer.callable());
125  CHECK_FALSE(query.expired());
126  io.run();
127  CHECK(serverTracer1.wait(2) == 2);
128  CHECK(queryTracer.counter() == 1);
129  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
130  return endpoint == server1.endpoint() && !error && !packet.isNull() && compare(rtt, milliseconds(1));
131  }) == 1);
132  CHECK(query.expired());
133  }
134  }
135 
136  TEST_CASE_FIXTURE(Context, "cancellable")
137  {
138  SUBCASE("before running a query")
139  {
140  auto query = QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(broadcastEndpoint, "", ""), queryTracer.callable());
141  query.lock()->cancel();
142  io.run();
143  CHECK(queryTracer.wait() == 1);
144  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
145  return endpoint == broadcastEndpoint && error == asio::error::operation_aborted && !packet.isNull() && compare(rtt, milliseconds(1));
146  }) == 1);
147  CHECK(query.expired());
148  }
149  SUBCASE("during the first query")
150  {
151  server1.receive();
152  auto query = QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(server1.endpoint(), "", ""), queryTracer.callable());
153  std::thread([&] {
154  io.run();
155  }).detach();
156  CHECK(serverTracer1.wait() == 1);
157  query.lock()->cancel();
158  CHECK(queryTracer.wait() == 1);
159  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
160  return endpoint == server1.endpoint() && error == asio::error::operation_aborted && packet.isNull() && compare(rtt, milliseconds(1));
161  }) == 1);
162  CHECK(query.expired());
163  }
164  SUBCASE("during the second query")
165  {
166  uint8_t data {};
167  server1.replay(&data, 1);
168  server2.receive();
169  server3.replay();
170  std::vector<asio::ip::udp::endpoint> endpointList { server1.endpoint(), server2.endpoint(), server3.endpoint() };
171  auto query = QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(endpointList.begin(), endpointList.end(), "", ""), queryTracer.callable());
172  std::thread([&] {
173  io.run();
174  }).detach();
175  CHECK(serverTracer1.wait(2) == 2);
176  CHECK(serverTracer2.wait() == 1);
177  query.lock()->cancel();
178  CHECK(queryTracer.wait() == 1);
179  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
180  return endpoint == server2.endpoint() && error == asio::error::operation_aborted && packet.isNull() && compare(rtt, milliseconds(1));
181  }) == 1);
182  CHECK(serverTracer3.counter() == 0);
183  CHECK(query.expired());
184  }
185  SUBCASE("multiple cancellations")
186  {
187  auto query = QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(broadcastEndpoint, "", ""), queryTracer.callable());
188  auto handle = query.lock();
189  handle->cancel();
190  io.run();
191  handle->cancel();
192  CHECK(io.run() == 0);
193  }
194  }
195 
196  TEST_CASE_FIXTURE(Context, "custom timeout")
197  {
198  for (int i = 0; i < 3; ++i) {
199  server1.receive();
200  const auto& start = steady_clock::now();
201  auto query = QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(server1.endpoint(), "", ""), queryTracer.callable(), milliseconds(i * 100));
202  io.run();
203  io.restart();
204  CHECK(compare(start, milliseconds(i * 100)));
205  CHECK(query.expired());
206  CHECK(queryTracer.counter() == 1);
207  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
208  return endpoint == server1.endpoint() && error == asio::error::timed_out && packet.isNull() && compare(rtt, milliseconds(i * 100));
209  }) == 1);
210  queryTracer.reset();
211  }
212  }
213 
214  TEST_CASE_FIXTURE(Context, "default timeout")
215  {
216  server1.receive();
217  server2.receive(milliseconds(QuerySingle::DefaultTimeout::ms + 1000));
218  const auto& start = steady_clock::now();
219  std::vector<asio::ip::udp::endpoint> endpointList { server1.endpoint(), server2.endpoint() };
220  auto query = QuerySeries::start(io, asio::ip::udp::resolver::results_type::create(endpointList.begin(), endpointList.end(), "", ""), queryTracer.callable());
221  io.run();
222  CHECK(compare(start, milliseconds(QuerySeries::DefaultTimeout::ms)));
223  CHECK(query.expired());
224  CHECK(queryTracer.counter() == 1);
225  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
226  return endpoint == server2.endpoint() && error == asio::error::timed_out && packet.isNull() && compare(rtt, milliseconds(QuerySeries::DefaultTimeout::ms - QuerySingle::DefaultTimeout::ms));
227  }) == 1);
228  }
229 } // TEST_SUITE
Packet is an immutable raw NTP packet.
Definition: packet.hpp:54
static std::weak_ptr< QuerySeries > start(asio::io_context &io, const asio::ip::udp::resolver::results_type &endpoints, Callback callback, const std::chrono::milliseconds &timeout=std::chrono::milliseconds(DefaultTimeout::ms))
Starts querying the given endpoints one at a time until success or all endpoints are queried.
Definition: query_series.hpp:52