Split bind API

Bind API is split into connect and startProcessingMessages calls.
This enables users to directly call connect and manage processing
messages on user threads.
This commit is contained in:
Puneetha Ramachandra 2020-12-01 09:19:41 -08:00 committed by Ben Clayton
parent ea6098df7f
commit e53575d272
3 changed files with 75 additions and 31 deletions

View File

@ -192,22 +192,28 @@ class Session {
template <typename T, typename = IsEvent<T>> template <typename T, typename = IsEvent<T>>
void send(const T& event); void send(const T& event);
// bind() connects this Session to an endpoint. // connect() connects this Session to an endpoint.
// bind() can only be called once. Repeated calls will raise an error, but // connect() can only be called once. Repeated calls will raise an error, but
// otherwise will do nothing. // otherwise will do nothing.
virtual void bind(const std::shared_ptr<Reader>&, virtual void connect(const std::shared_ptr<Reader>&,
const std::shared_ptr<Writer>&) = 0; const std::shared_ptr<Writer>&) = 0;
inline void connect(const std::shared_ptr<ReaderWriter>&);
// startProcessingMessages() starts a new thread to receive and dispatch
// incoming messages.
virtual void startProcessingMessages() = 0;
// bind() connects this Session to an endpoint using connect(), and then
// starts processing incoming messages with startProcessingMessages().
inline void bind(const std::shared_ptr<Reader>&,
const std::shared_ptr<Writer>&);
inline void bind(const std::shared_ptr<ReaderWriter>&); inline void bind(const std::shared_ptr<ReaderWriter>&);
// getPayload() blocks until the next incoming message is received, returning
// Alternative to bind() for control over which thread the request is processed. // the payload or an empty function if the connection was lost. The returned
// If bindNoThread is used, the user must call OnDataAvailable() whenever data is // payload is function that can be called on any thread to dispatch the
// ready in the reader pipe. The processing will be done on the calling thread // message to the Session handler.
// and a function to handle the request will be returned in some cases, which can virtual std::function<void()> getPayload() = 0;
// be executed on any thread of user choice.
virtual std::function<void()> OnDataAvailable() = 0;
virtual void bindNoThread(const std::shared_ptr<dap::Reader>& r,
const std::shared_ptr<dap::Writer>& w) = 0;
protected: protected:
using RequestSuccessCallback = using RequestSuccessCallback =
@ -309,6 +315,16 @@ void Session::send(const T& event) {
send(typeinfo, &event); send(typeinfo, &event);
} }
void Session::connect(const std::shared_ptr<ReaderWriter>& rw) {
connect(rw, rw);
}
void Session::bind(const std::shared_ptr<dap::Reader>& r,
const std::shared_ptr<dap::Writer>& w) {
connect(r, w);
startProcessingMessages();
}
void Session::bind(const std::shared_ptr<ReaderWriter>& rw) { void Session::bind(const std::shared_ptr<ReaderWriter>& rw) {
bind(rw, rw); bind(rw, rw);
} }

View File

@ -52,17 +52,7 @@ class Impl : public dap::Session {
handlers.put(typeinfo, handler); handlers.put(typeinfo, handler);
} }
void bindNoThread(const std::shared_ptr<dap::Reader>& r, std::function<void()> getPayload() override {
const std::shared_ptr<dap::Writer>& w) override {
if (isBound.exchange(true)) {
handlers.error("Session is already bound!");
return;
}
reader = dap::ContentReader(r);
writer = dap::ContentWriter(w);
}
std::function<void()> OnDataAvailable() override {
auto request = reader.read(); auto request = reader.read();
if (request.size() > 0) { if (request.size() > 0) {
if (auto payload = processMessage(request)) { if (auto payload = processMessage(request)) {
@ -72,8 +62,8 @@ class Impl : public dap::Session {
return {}; return {};
} }
void bind(const std::shared_ptr<dap::Reader>& r, void connect(const std::shared_ptr<dap::Reader>& r,
const std::shared_ptr<dap::Writer>& w) override { const std::shared_ptr<dap::Writer>& w) override {
if (isBound.exchange(true)) { if (isBound.exchange(true)) {
handlers.error("Session is already bound!"); handlers.error("Session is already bound!");
return; return;
@ -81,14 +71,13 @@ class Impl : public dap::Session {
reader = dap::ContentReader(r); reader = dap::ContentReader(r);
writer = dap::ContentWriter(w); writer = dap::ContentWriter(w);
}
void startProcessingMessages() override {
recvThread = std::thread([this] { recvThread = std::thread([this] {
while (reader.isOpen()) { while (reader.isOpen()) {
auto request = reader.read(); if (auto payload = getPayload()) {
if (request.size() > 0) { inbox.put(std::move(payload));
if (auto payload = processMessage(request)) {
inbox.put(std::move(payload));
}
} }
} }
}); });

View File

@ -370,6 +370,45 @@ TEST_F(SessionTest, SendEventNoBind) {
ASSERT_TRUE(errored); ASSERT_TRUE(errored);
} }
TEST_F(SessionTest, SingleThread) {
server->registerHandler(
[&](const dap::TestRequest&) { return createResponse(); });
// Explicitly connect and process request on this test thread instead of
// calling bind() which inturn starts processing messages immediately on a new
// thread.
auto client2server = dap::pipe();
auto server2client = dap::pipe();
client->connect(server2client, client2server);
server->connect(client2server, server2client);
auto request = createRequest();
auto response = client->send(request);
// Process request and response on this thread
if (auto payload = server->getPayload()) {
payload();
}
if (auto payload = client->getPayload()) {
payload();
}
auto got = response.get();
// Check response was received correctly.
ASSERT_EQ(got.error, false);
ASSERT_EQ(got.response.b, dap::boolean(true));
ASSERT_EQ(got.response.i, dap::integer(99));
ASSERT_EQ(got.response.n, dap::number(123.456));
ASSERT_EQ(got.response.a, dap::array<dap::integer>({5, 4, 3, 2, 1}));
ASSERT_EQ(got.response.o.size(), 3U);
ASSERT_EQ(got.response.o["one"].get<dap::integer>(), dap::integer(1));
ASSERT_EQ(got.response.o["two"].get<dap::number>(), dap::number(2));
ASSERT_EQ(got.response.o["three"].get<dap::string>(), dap::string("3"));
ASSERT_EQ(got.response.s, "ROGER");
ASSERT_EQ(got.response.o1, dap::optional<dap::integer>(50));
ASSERT_FALSE(got.response.o2.has_value());
}
TEST_F(SessionTest, Concurrency) { TEST_F(SessionTest, Concurrency) {
std::atomic<int> numEventsHandled = {0}; std::atomic<int> numEventsHandled = {0};
std::atomic<bool> done = {false}; std::atomic<bool> done = {false};