Add Session::ClosedHandler (#99)

A callback function to signal that the endpoint has closed its connection.

Add this as an optional argument to Session::bind() and Session::startProcessingMessages().

Bug: #98
This commit is contained in:
Ben Clayton 2023-02-28 14:26:12 +00:00 committed by GitHub
parent 315ffff9e7
commit 59819690ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 19 deletions

View File

@ -137,6 +137,10 @@ class Session {
// errors. // errors.
using ErrorHandler = std::function<void(const char*)>; using ErrorHandler = std::function<void(const char*)>;
// ClosedHandler is the type of callback function used to signal that a
// connected endpoint has closed.
using ClosedHandler = std::function<void()>;
// create() constructs and returns a new Session. // create() constructs and returns a new Session.
static std::unique_ptr<Session> create(); static std::unique_ptr<Session> create();
@ -205,9 +209,13 @@ class Session {
// bind() connects this Session to an endpoint using connect(), and then // bind() connects this Session to an endpoint using connect(), and then
// starts processing incoming messages with startProcessingMessages(). // starts processing incoming messages with startProcessingMessages().
inline void bind(const std::shared_ptr<Reader>&, // onClose is the optional callback which will be called when the session
const std::shared_ptr<Writer>&); // endpoint has been closed.
inline void bind(const std::shared_ptr<ReaderWriter>&); inline void bind(const std::shared_ptr<Reader>& reader,
const std::shared_ptr<Writer>& writer,
const ClosedHandler& onClose);
inline void bind(const std::shared_ptr<ReaderWriter>& readerWriter,
const ClosedHandler& onClose);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Note: // Note:
@ -227,9 +235,11 @@ class Session {
// startProcessingMessages() starts a new thread to receive and dispatch // startProcessingMessages() starts a new thread to receive and dispatch
// incoming messages. // incoming messages.
// onClose is the optional callback which will be called when the session
// endpoint has been closed.
// Note: This method is used for explicit control over message handling. // Note: This method is used for explicit control over message handling.
// Most users will use bind() instead of calling this method directly. // Most users will use bind() instead of calling this method directly.
virtual void startProcessingMessages() = 0; virtual void startProcessingMessages(const ClosedHandler& onClose = {}) = 0;
// getPayload() blocks until the next incoming message is received, returning // getPayload() blocks until the next incoming message is received, returning
// the payload or an empty function if the connection was lost. The returned // the payload or an empty function if the connection was lost. The returned
@ -423,13 +433,15 @@ void Session::connect(const std::shared_ptr<ReaderWriter>& rw) {
} }
void Session::bind(const std::shared_ptr<dap::Reader>& r, void Session::bind(const std::shared_ptr<dap::Reader>& r,
const std::shared_ptr<dap::Writer>& w) { const std::shared_ptr<dap::Writer>& w,
const ClosedHandler& onClose = {}) {
connect(r, w); connect(r, w);
startProcessingMessages(); startProcessingMessages(onClose);
} }
void Session::bind(const std::shared_ptr<ReaderWriter>& rw) { void Session::bind(const std::shared_ptr<ReaderWriter>& rw,
bind(rw, rw); const ClosedHandler& onClose = {}) {
bind(rw, rw, onClose);
} }
} // namespace dap } // namespace dap

View File

@ -65,7 +65,7 @@ class Impl : public dap::Session {
void connect(const std::shared_ptr<dap::Reader>& r, void connect(const std::shared_ptr<dap::Reader>& r,
const std::shared_ptr<dap::Writer>& w) override { const std::shared_ptr<dap::Writer>& w) override {
if (isBound.exchange(true)) { if (isBound.exchange(true)) {
handlers.error("Session is already bound!"); handlers.error("Session::connect called twice");
return; return;
} }
@ -73,13 +73,21 @@ class Impl : public dap::Session {
writer = dap::ContentWriter(w); writer = dap::ContentWriter(w);
} }
void startProcessingMessages() override { void startProcessingMessages(
recvThread = std::thread([this] { const ClosedHandler& onClose /* = {} */) override {
if (isProcessingMessages.exchange(true)) {
handlers.error("Session::startProcessingMessages() called twice");
return;
}
recvThread = std::thread([this, onClose] {
while (reader.isOpen()) { while (reader.isOpen()) {
if (auto payload = getPayload()) { if (auto payload = getPayload()) {
inbox.put(std::move(payload)); inbox.put(std::move(payload));
} }
} }
if (onClose) {
onClose();
}
}); });
dispatchThread = std::thread([this] { dispatchThread = std::thread([this] {
@ -398,17 +406,17 @@ class Impl : public dap::Session {
// "body" is an optional field for some events, such as "Terminated Event". // "body" is an optional field for some events, such as "Terminated Event".
bool body_ok = true; bool body_ok = true;
d->field("body", [&](dap::Deserializer* d) { d->field("body", [&](dap::Deserializer* d) {
if (!typeinfo->deserialize(d, data)) { if (!typeinfo->deserialize(d, data)) {
body_ok = false; body_ok = false;
} }
return true; return true;
}); });
if (!body_ok) { if (!body_ok) {
handlers.error("Failed to deserialize event '%s' body", event.c_str()); handlers.error("Failed to deserialize event '%s' body", event.c_str());
typeinfo->destruct(data); typeinfo->destruct(data);
delete[] data; delete[] data;
return {}; return {};
} }
return [=] { return [=] {
@ -471,6 +479,7 @@ class Impl : public dap::Session {
} }
std::atomic<bool> isBound = {false}; std::atomic<bool> isBound = {false};
std::atomic<bool> isProcessingMessages = {false};
dap::ContentReader reader; dap::ContentReader reader;
dap::ContentWriter writer; dap::ContentWriter writer;

View File

@ -579,3 +579,47 @@ TEST_F(SessionTest, Concurrency) {
client.reset(); client.reset();
server.reset(); server.reset();
} }
TEST_F(SessionTest, OnClientClosed) {
std::mutex mutex;
std::condition_variable cv;
bool clientClosed = false;
auto client2server = dap::pipe();
auto server2client = dap::pipe();
client->bind(server2client, client2server);
server->bind(client2server, server2client, [&] {
std::unique_lock<std::mutex> lock(mutex);
clientClosed = true;
cv.notify_all();
});
client.reset();
// Wait for the client closed handler to be called.
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [&] { return static_cast<bool>(clientClosed); });
}
TEST_F(SessionTest, OnServerClosed) {
std::mutex mutex;
std::condition_variable cv;
bool serverClosed = false;
auto client2server = dap::pipe();
auto server2client = dap::pipe();
client->bind(server2client, client2server, [&] {
std::unique_lock<std::mutex> lock(mutex);
serverClosed = true;
cv.notify_all();
});
server->bind(client2server, server2client);
server.reset();
// Wait for the client closed handler to be called.
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [&] { return static_cast<bool>(serverClosed); });
}