Xclox
C++11 header-only cross-platform date and time library with an asynchronous NTP client
query_single.h
1 /*
2  * Copyright (c) 2024 Abdullatif Kalla.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE.txt file in the root directory of this source tree.
6  */
7 
8 #include "xclox/ntp/query_single.hpp"
9 
10 #include "tools/server.hpp"
11 #include "tools/tracer.hpp"
12 
13 #include "tools/helper.hpp"
14 
15 using namespace xclox::ntp;
16 using namespace std::chrono;
17 
18 TEST_SUITE("QuerySingle")
19 {
20  struct Context {
21  Context()
22  : server(32101, serverTracer.callable())
23  {
24  }
25  Tracer<asio::ip::udp::endpoint, asio::error_code, const uint8_t*, size_t> serverTracer;
26  Tracer<asio::ip::udp::endpoint, asio::error_code, Packet, steady_clock::duration> queryTracer;
27  Server server;
28  asio::io_context io;
29  };
30 
31  TEST_CASE_FIXTURE(Context, "no callback")
32  {
33  QuerySingle::start(io, server.endpoint(), {});
34  CHECK(io.run() == 0);
35  }
36 
37  TEST_CASE_FIXTURE(Context, "non-exising server")
38  {
39  QuerySingle::start(io, broadcastEndpoint, queryTracer.callable());
40  io.run();
41  CHECK(queryTracer.counter() == 1);
42  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
43  return endpoint == broadcastEndpoint && error == asio::error::access_denied && isClientPacket(packet) && compare(rtt, milliseconds(1));
44  }) == 1);
45  }
46 
47  TEST_CASE_FIXTURE(Context, "bogus server")
48  {
49  std::array<uint8_t, 9> sendData {};
50  server.replay(sendData.data(), sendData.size());
51  QuerySingle::start(io, server.endpoint(), queryTracer.callable());
52  io.run();
53  CHECK(serverTracer.wait(2) == 2);
54  CHECK(serverTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const uint8_t* data, size_t size) {
55  Packet::DataType buffer;
56  std::memcpy(buffer.data(), data, size);
57  return endpoint.address() == server.endpoint().address() && !error && isClientPacket(Packet(buffer)) && size == std::tuple_size<Packet::DataType> {};
58  }) == 1);
59  CHECK(serverTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const uint8_t* data, size_t size) {
60  return endpoint.address() == server.endpoint().address() && !error && std::memcmp(data, sendData.data(), sendData.size()) == 0 && size == sendData.size();
61  }) == 1);
62  CHECK(queryTracer.counter() == 1);
63  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
64  return endpoint == server.endpoint() && error == asio::error::message_size && packet.isNull() && compare(rtt, milliseconds(1));
65  }) == 1);
66  }
67 
68  TEST_CASE_FIXTURE(Context, "valid server")
69  {
70  server.replay();
71  QuerySingle::start(io, server.endpoint(), queryTracer.callable());
72  io.run();
73  CHECK(serverTracer.wait(2) == 2);
74  const uint8_t* recvData = nullptr;
75  size_t recvSize = 0;
76  CHECK(serverTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const uint8_t* data, size_t size) {
77  recvData = data;
78  recvSize = size;
79  return endpoint.address() == server.endpoint().address() && !error && size == std::tuple_size<Packet::DataType> {};
80  }) == 2);
81  CHECK(queryTracer.counter() == 1);
82  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
83  return endpoint == server.endpoint() && !error && std::memcmp(packet.data().data(), recvData, recvSize) == 0 && isClientPacket(packet) && compare(rtt, milliseconds(1));
84  }) == 1);
85  }
86 
87  TEST_CASE_FIXTURE(Context, "round-trip time")
88  {
89  SUBCASE("no delay")
90  {
91  QuerySingle::start(io, broadcastEndpoint, queryTracer.callable());
92  io.run();
93  CHECK(queryTracer.counter() == 1);
94  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
95  return endpoint == broadcastEndpoint && error == asio::error::access_denied && isClientPacket(packet) && compare(rtt, milliseconds(1));
96  }) == 1);
97  }
98  SUBCASE("receive error")
99  {
100  server.receive();
101  QuerySingle::start(io, server.endpoint(), queryTracer.callable());
102  std::this_thread::sleep_for(milliseconds(100));
103  std::thread([&] {
104  io.run();
105  }).detach();
106  CHECK(serverTracer.wait() == 1);
107  asio::ip::udp::endpoint sender;
108  CHECK(serverTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const uint8_t*, size_t) {
109  sender = endpoint;
110  return !error;
111  }) == 1);
112  std::this_thread::sleep_for(milliseconds(100));
113  server.send(sender, nullptr, 0);
114  CHECK(queryTracer.wait() == 1);
115  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
116  return endpoint == server.endpoint() && error == asio::error::message_size && packet.isNull() && compare(rtt, milliseconds(200));
117  }) == 1);
118  }
119  SUBCASE("successful query")
120  {
121  server.receive();
122  QuerySingle::start(io, server.endpoint(), queryTracer.callable());
123  std::this_thread::sleep_for(milliseconds(200));
124  std::thread([&] {
125  io.run();
126  }).detach();
127  CHECK(serverTracer.wait() == 1);
128  asio::ip::udp::endpoint sender;
129  Packet::DataType buffer;
130  CHECK(serverTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const uint8_t* data, size_t size) {
131  sender = endpoint;
132  std::memcpy(buffer.data(), data, size);
133  return !error;
134  }) == 1);
135  std::this_thread::sleep_for(milliseconds(200));
136  server.send(sender, buffer.data(), buffer.size());
137  CHECK(queryTracer.wait() == 1);
138  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
139  return endpoint == server.endpoint() && !error && isClientPacket(packet) && compare(rtt, milliseconds(400));
140  }) == 1);
141  }
142  }
143 
144  TEST_CASE_FIXTURE(Context, "custom timeout")
145  {
146  for (int i = 0; i < 3; ++i) {
147  const auto& start = steady_clock::now();
148  server.receive();
149  QuerySingle::start(io, server.endpoint(), queryTracer.callable(), milliseconds(i * 100));
150  io.run();
151  CHECK(compare(start, milliseconds(i * 100)));
152  CHECK(queryTracer.counter() == 1);
153  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
154  return endpoint == server.endpoint() && error == asio::error::timed_out && packet.isNull() && compare(rtt, milliseconds(i * 100));
155  }) == 1);
156  CHECK(serverTracer.wait() == 1);
157  queryTracer.reset();
158  serverTracer.reset();
159  io.restart();
160  }
161  }
162 
163  TEST_CASE_FIXTURE(Context, "default timeout")
164  {
165  const auto& start = steady_clock::now();
166  server.receive();
167  QuerySingle::start(io, server.endpoint(), queryTracer.callable());
168  io.run();
169  CHECK(compare(start, milliseconds(QuerySingle::DefaultTimeout::ms)));
170  CHECK(queryTracer.counter() == 1);
171  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
172  return endpoint == server.endpoint() && error == asio::error::timed_out && packet.isNull() && compare(rtt, milliseconds(QuerySingle::DefaultTimeout::ms));
173  }) == 1);
174  CHECK(serverTracer.counter() == 1);
175  }
176 
177  TEST_CASE_FIXTURE(Context, "traceable")
178  {
179  SUBCASE("no query")
180  {
181  auto query = QuerySingle::start(io, broadcastEndpoint, {});
182  CHECK(query.expired());
183  }
184  SUBCASE("unsuccessful query")
185  {
186  auto query = QuerySingle::start(io, broadcastEndpoint, queryTracer.callable());
187  CHECK_FALSE(query.expired());
188  io.run();
189  CHECK(queryTracer.counter() == 1);
190  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet&, const steady_clock::duration&) {
191  return endpoint == broadcastEndpoint && error == asio::error::access_denied;
192  }) == 1);
193  CHECK(query.expired());
194  }
195  SUBCASE("successful query")
196  {
197  server.replay();
198  auto query = QuerySingle::start(io, server.endpoint(), queryTracer.callable());
199  CHECK_FALSE(query.expired());
200  io.run();
201  CHECK(queryTracer.counter() == 1);
202  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet&, const steady_clock::duration&) {
203  return endpoint == server.endpoint() && !error;
204  }) == 1);
205  CHECK(query.expired());
206  }
207  }
208 
209  TEST_CASE_FIXTURE(Context, "cancellable")
210  {
211  SUBCASE("on start")
212  {
213  auto query = QuerySingle::start(io, server.endpoint(), queryTracer.callable());
214  query.lock()->cancel();
215  io.run();
216  CHECK(query.expired());
217  CHECK(queryTracer.wait() == 1);
218  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
219  return endpoint == server.endpoint() && error == asio::error::operation_aborted && packet.isNull() && compare(rtt, milliseconds(1));
220  }) == 1);
221  CHECK(serverTracer.counter() == 0);
222  }
223  SUBCASE("on receive")
224  {
225  server.receive();
226  auto query = QuerySingle::start(io, server.endpoint(), queryTracer.callable());
227  std::thread([&] {
228  io.run();
229  }).detach();
230  CHECK(serverTracer.wait() == 1);
231  query.lock()->cancel();
232  CHECK(queryTracer.wait() == 1);
233  CHECK(queryTracer.find([&](const asio::ip::udp::endpoint& endpoint, const asio::error_code& error, const Packet& packet, const steady_clock::duration& rtt) {
234  return endpoint == server.endpoint() && error == asio::error::operation_aborted && packet.isNull() && compare(rtt, milliseconds(1));
235  }) == 1);
236  CHECK(query.expired());
237  }
238  SUBCASE("on finish")
239  {
240  server.receive();
241  auto query = QuerySingle::start(io, server.endpoint(), queryTracer.callable());
242  auto handle = query.lock();
243  std::thread([&] {
244  io.run();
245  }).detach();
246  CHECK(serverTracer.wait() == 1);
247  handle->cancel();
248  CHECK(queryTracer.wait() == 1);
249  handle->cancel();
250  io.restart();
251  CHECK(io.run() == 0);
252  }
253  } // TEST_CASE
254 } // TEST_SUITE
Packet is an immutable raw NTP packet.
Definition: packet.hpp:54
internal::DataType DataType
Type of packet's underlying data.
Definition: packet.hpp:56
static std::weak_ptr< QuerySingle > start(asio::io_context &io, const asio::ip::udp::endpoint &server, Callback callback, const std::chrono::milliseconds &timeout=std::chrono::milliseconds(DefaultTimeout::ms))
Starts querying the given endpoint.
Definition: query_single.hpp:68