Fix reconnection of sockets (#70)

This was entirely broken:
* The `Server` `Impl` used a `do{}while(false)` block, which never attempted to accept another connection after the first connection closed (#69)
* The `Server` `Impl` could deadlock with the mutex being locked by both the thread calling `isRunning()` and `stopWithLock()` waiting on `thread.join()`.
* `Socket::accept()` didn't check that the returned socket was valid, and could return a `ReaderWriter` that would just error on IO.
* `Socket::accept()` could deadlock on macOS as `shutdown()` can seemingly fail to unblock an accept call. This has been worked around by calling `close()` outside of the mutex write-lock. This introduces a potential race, but I'm not sure of a better solution right now.
 
Fixes: #69
This commit is contained in:
Ben Clayton 2021-09-07 18:55:02 +01:00 committed by GitHub
parent 51cf2951d0
commit 5f3169421e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 24 deletions

View File

@ -16,6 +16,7 @@
#include "socket.h" #include "socket.h"
#include <atomic>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <thread> #include <thread>
@ -24,7 +25,7 @@ namespace {
class Impl : public dap::net::Server { class Impl : public dap::net::Server {
public: public:
Impl() {} Impl() : stopped{true} {}
~Impl() { stop(); } ~Impl() { stop(); }
@ -41,17 +42,18 @@ class Impl : public dap::net::Server {
return false; return false;
} }
running = true; stopped = false;
thread = std::thread([=] { thread = std::thread([=] {
do { while (true) {
if (auto rw = socket->accept()) { if (auto rw = socket->accept()) {
onConnect(rw); onConnect(rw);
continue; continue;
} }
if (!isRunning()) { if (!stopped) {
onError("Failed to accept connection"); onError("Failed to accept connection");
} }
} while (false); break;
};
}); });
return true; return true;
@ -63,23 +65,19 @@ class Impl : public dap::net::Server {
} }
private: private:
bool isRunning() { bool isRunning() { return !stopped; }
std::unique_lock<std::mutex> lock(mutex);
return running;
}
void stopWithLock() { void stopWithLock() {
if (running) { if (!stopped.exchange(true)) {
socket->close(); socket->close();
thread.join(); thread.join();
running = false;
} }
} }
std::mutex mutex; std::mutex mutex;
std::thread thread; std::thread thread;
std::unique_ptr<dap::Socket> socket; std::unique_ptr<dap::Socket> socket;
bool running = false; std::atomic<bool> stopped;
OnError errorHandler; OnError errorHandler;
}; };

View File

@ -25,6 +25,8 @@
namespace { namespace {
constexpr int port = 19021;
bool write(const std::shared_ptr<dap::Writer>& w, const std::string& s) { bool write(const std::shared_ptr<dap::Writer>& w, const std::string& s) {
return w->write(s.data(), s.size()) && w->write("\0", 1); return w->write(s.data(), s.size()) && w->write("\0", 1);
} }
@ -44,7 +46,6 @@ std::string read(const std::shared_ptr<dap::Reader>& r) {
} // anonymous namespace } // anonymous namespace
TEST(Network, ClientServer) { TEST(Network, ClientServer) {
const int port = 19021;
dap::Chan<bool> done; dap::Chan<bool> done;
auto server = dap::net::Server::create(); auto server = dap::net::Server::create();
if (!server->start( if (!server->start(
@ -59,15 +60,51 @@ TEST(Network, ClientServer) {
return; return;
} }
for (int i = 0; i < 10; i++) { for (int i = 0; i < 5; i++) {
if (auto client = dap::net::connect("localhost", port)) { auto client = dap::net::connect("localhost", port);
ASSERT_TRUE(write(client, "client to server")); ASSERT_NE(client, nullptr) << "Failed to connect client " << i;
ASSERT_EQ(read(client), "server to client"); ASSERT_TRUE(write(client, "client to server"));
break; ASSERT_EQ(read(client), "server to client");
} done.take();
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
done.take(); server.reset();
}
TEST(Network, ServerRepeatStopAndRestart) {
dap::Chan<bool> done;
auto onConnect = [&](const std::shared_ptr<dap::ReaderWriter>& rw) {
ASSERT_EQ(read(rw), "client to server");
ASSERT_TRUE(write(rw, "server to client"));
done.put(true);
};
auto onError = [&](const char* err) { FAIL() << "Server error: " << err; };
auto server = dap::net::Server::create();
if (!server->start(port, onConnect, onError)) {
FAIL() << "Couldn't start server";
return;
}
server->stop();
server->stop();
server->stop();
if (!server->start(port, onConnect, onError)) {
FAIL() << "Couldn't restart server";
return;
}
auto client = dap::net::connect("localhost", port);
ASSERT_NE(client, nullptr) << "Failed to connect";
ASSERT_TRUE(write(client, "client to server"));
ASSERT_EQ(read(client), "server to client");
done.take();
server->stop();
server->stop();
server->stop();
server.reset(); server.reset();
} }

View File

@ -174,7 +174,17 @@ class dap::Socket::Shared : public dap::ReaderWriter {
if (s != InvalidSocket) { if (s != InvalidSocket) {
#if defined(_WIN32) #if defined(_WIN32)
closesocket(s); closesocket(s);
#elif __APPLE__
// ::shutdown() *should* be sufficient to unblock ::accept(), but
// apparently on macos it can return ENOTCONN and ::accept() continues
// to block indefinitely.
// Note: There is a race here. Calling ::close() frees the socket ID,
// which may be reused before `s` is assigned InvalidSocket.
::shutdown(s, SHUT_RDWR);
::close(s);
#else #else
// ::shutdown() to unblock ::accept(). We'll actually close the socket
// under lock below.
::shutdown(s, SHUT_RDWR); ::shutdown(s, SHUT_RDWR);
#endif #endif
} }
@ -182,7 +192,7 @@ class dap::Socket::Shared : public dap::ReaderWriter {
WLock l(mutex); WLock l(mutex);
if (s != InvalidSocket) { if (s != InvalidSocket) {
#if !defined(_WIN32) #if !defined(_WIN32) && !defined(__APPLE__)
::close(s); ::close(s);
#endif #endif
s = InvalidSocket; s = InvalidSocket;
@ -240,10 +250,13 @@ std::shared_ptr<ReaderWriter> Socket::accept() const {
std::shared_ptr<Shared> out; std::shared_ptr<Shared> out;
if (shared) { if (shared) {
shared->lock([&](SOCKET socket, const addrinfo*) { shared->lock([&](SOCKET socket, const addrinfo*) {
if (socket != InvalidSocket) { if (socket != InvalidSocket && !errored(socket)) {
init(); init();
out = std::make_shared<Shared>(::accept(socket, 0, 0)); auto accepted = ::accept(socket, 0, 0);
out->setOptions(); if (accepted != InvalidSocket) {
out = std::make_shared<Shared>(accepted);
out->setOptions();
}
} }
}); });
} }