From e53575d2727205c53a671bc63fc92dfe32ab03a7 Mon Sep 17 00:00:00 2001 From: Puneetha Ramachandra Date: Tue, 1 Dec 2020 09:19:41 -0800 Subject: [PATCH] Split bind API Bind API is split into connect and startProcessingMessages calls. This enables users to directly call connect and manage processing messages on user threads. --- include/dap/session.h | 42 +++++++++++++++++++++++++++++------------- src/session.cpp | 25 +++++++------------------ src/session_test.cpp | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 31 deletions(-) diff --git a/include/dap/session.h b/include/dap/session.h index e11c57e..e06957f 100644 --- a/include/dap/session.h +++ b/include/dap/session.h @@ -192,22 +192,28 @@ class Session { template > void send(const T& event); - // bind() connects this Session to an endpoint. - // bind() can only be called once. Repeated calls will raise an error, but + // connect() connects this Session to an endpoint. + // connect() can only be called once. Repeated calls will raise an error, but // otherwise will do nothing. - virtual void bind(const std::shared_ptr&, - const std::shared_ptr&) = 0; + virtual void connect(const std::shared_ptr&, + const std::shared_ptr&) = 0; + inline void connect(const std::shared_ptr&); + + // startProcessingMessages() starts a new thread to receive and dispatch + // incoming messages. + virtual void startProcessingMessages() = 0; + + // bind() connects this Session to an endpoint using connect(), and then + // starts processing incoming messages with startProcessingMessages(). + inline void bind(const std::shared_ptr&, + const std::shared_ptr&); inline void bind(const std::shared_ptr&); - - // Alternative to bind() for control over which thread the request is processed. - // If bindNoThread is used, the user must call OnDataAvailable() whenever data is - // ready in the reader pipe. The processing will be done on the calling thread - // and a function to handle the request will be returned in some cases, which can - // be executed on any thread of user choice. - virtual std::function OnDataAvailable() = 0; - virtual void bindNoThread(const std::shared_ptr& r, - const std::shared_ptr& w) = 0; + // getPayload() blocks until the next incoming message is received, returning + // the payload or an empty function if the connection was lost. The returned + // payload is function that can be called on any thread to dispatch the + // message to the Session handler. + virtual std::function getPayload() = 0; protected: using RequestSuccessCallback = @@ -309,6 +315,16 @@ void Session::send(const T& event) { send(typeinfo, &event); } +void Session::connect(const std::shared_ptr& rw) { + connect(rw, rw); +} + +void Session::bind(const std::shared_ptr& r, + const std::shared_ptr& w) { + connect(r, w); + startProcessingMessages(); +} + void Session::bind(const std::shared_ptr& rw) { bind(rw, rw); } diff --git a/src/session.cpp b/src/session.cpp index f95d095..ad6597c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -52,17 +52,7 @@ class Impl : public dap::Session { handlers.put(typeinfo, handler); } - void bindNoThread(const std::shared_ptr& r, - const std::shared_ptr& w) override { - if (isBound.exchange(true)) { - handlers.error("Session is already bound!"); - return; - } - reader = dap::ContentReader(r); - writer = dap::ContentWriter(w); - } - - std::function OnDataAvailable() override { + std::function getPayload() override { auto request = reader.read(); if (request.size() > 0) { if (auto payload = processMessage(request)) { @@ -72,8 +62,8 @@ class Impl : public dap::Session { return {}; } - void bind(const std::shared_ptr& r, - const std::shared_ptr& w) override { + void connect(const std::shared_ptr& r, + const std::shared_ptr& w) override { if (isBound.exchange(true)) { handlers.error("Session is already bound!"); return; @@ -81,14 +71,13 @@ class Impl : public dap::Session { reader = dap::ContentReader(r); writer = dap::ContentWriter(w); + } + void startProcessingMessages() override { recvThread = std::thread([this] { while (reader.isOpen()) { - auto request = reader.read(); - if (request.size() > 0) { - if (auto payload = processMessage(request)) { - inbox.put(std::move(payload)); - } + if (auto payload = getPayload()) { + inbox.put(std::move(payload)); } } }); diff --git a/src/session_test.cpp b/src/session_test.cpp index 6d181f7..21b73d6 100644 --- a/src/session_test.cpp +++ b/src/session_test.cpp @@ -370,6 +370,45 @@ TEST_F(SessionTest, SendEventNoBind) { ASSERT_TRUE(errored); } +TEST_F(SessionTest, SingleThread) { + server->registerHandler( + [&](const dap::TestRequest&) { return createResponse(); }); + + // Explicitly connect and process request on this test thread instead of + // calling bind() which inturn starts processing messages immediately on a new + // thread. + auto client2server = dap::pipe(); + auto server2client = dap::pipe(); + client->connect(server2client, client2server); + server->connect(client2server, server2client); + + auto request = createRequest(); + auto response = client->send(request); + + // Process request and response on this thread + if (auto payload = server->getPayload()) { + payload(); + } + if (auto payload = client->getPayload()) { + payload(); + } + + auto got = response.get(); + // Check response was received correctly. + ASSERT_EQ(got.error, false); + ASSERT_EQ(got.response.b, dap::boolean(true)); + ASSERT_EQ(got.response.i, dap::integer(99)); + ASSERT_EQ(got.response.n, dap::number(123.456)); + ASSERT_EQ(got.response.a, dap::array({5, 4, 3, 2, 1})); + ASSERT_EQ(got.response.o.size(), 3U); + ASSERT_EQ(got.response.o["one"].get(), dap::integer(1)); + ASSERT_EQ(got.response.o["two"].get(), dap::number(2)); + ASSERT_EQ(got.response.o["three"].get(), dap::string("3")); + ASSERT_EQ(got.response.s, "ROGER"); + ASSERT_EQ(got.response.o1, dap::optional(50)); + ASSERT_FALSE(got.response.o2.has_value()); +} + TEST_F(SessionTest, Concurrency) { std::atomic numEventsHandled = {0}; std::atomic done = {false};