#include "./socket.hpp" #include "../detect.hpp" #if MIJIN_TARGET_OS == MIJIN_OS_LINUX #include #include #include #include #endif namespace mijin { namespace { inline constexpr int LISTEN_BACKLOG = 3; StreamError translateErrno() noexcept { switch (errno) { default: return StreamError::UNKNOWN_ERROR; } } bool appendSocketFlags(int handle, int flags) noexcept { const int currentFlags = fcntl(handle, F_GETFL); if (currentFlags < 0) { return false; } return fcntl(handle, F_SETFL, currentFlags | flags) >= 0; } bool removeSocketFlags(int handle, int flags) noexcept { const int currentFlags = fcntl(handle, F_GETFL); if (currentFlags < 0) { return false; } return fcntl(handle, F_SETFL, currentFlags & ~flags) >= 0; } int readFlags(const ReadOptions& options) { return (options.partial ? 0 : MSG_WAITALL) | (options.peek ? MSG_PEEK : 0); } } StreamError TCPStream::readRaw(std::span buffer, const ReadOptions& options, std::size_t* outBytesRead) { MIJIN_ASSERT(isOpen(), "Socket is not open."); setAsync(false); const ::ssize_t bytesRead = recv(handle_, buffer.data(), buffer.size(), readFlags(options)); if (bytesRead < 0) { return translateErrno(); } *outBytesRead = static_cast(bytesRead); return StreamError::SUCCESS; } StreamError TCPStream::writeRaw(std::span buffer) { MIJIN_ASSERT(isOpen(), "Socket is not open."); setAsync(false); if (send(handle_, buffer.data(), buffer.size(), 0) < 0) { return translateErrno(); } return StreamError::SUCCESS; } mijin::Task TCPStream::c_readRaw(std::span buffer, const ReadOptions& options, std::size_t* outBytesRead) { MIJIN_ASSERT(isOpen(), "Socket is not open."); setAsync(true); while(true) { const ::ssize_t bytesRead = recv(handle_, buffer.data(), buffer.size(), readFlags(options)); if (bytesRead >= 0) { if (outBytesRead != nullptr) { *outBytesRead = static_cast(bytesRead); } co_return StreamError::SUCCESS; } else if (errno != EAGAIN) { co_return translateErrno(); } co_await mijin::c_suspend(); } } mijin::Task TCPStream::c_writeRaw(std::span buffer) { MIJIN_ASSERT(isOpen(), "Socket is not open."); setAsync(true); while (true) { if (send(handle_, buffer.data(), buffer.size(), 0) >= 0) { co_return StreamError::SUCCESS; } else if (errno != EAGAIN) { co_return translateErrno(); } co_await mijin::c_suspend(); } } void TCPStream::setAsync(bool async) { if (async == async_) { return; } async_ = async; if (async) { appendSocketFlags(handle_, O_NONBLOCK); } else { removeSocketFlags(handle_, O_NONBLOCK); } } std::size_t TCPStream::tell() { return 0; } StreamError TCPStream::seek(std::intptr_t /* pos */, mijin::SeekMode /* seekMode */) { return StreamError::NOT_SUPPORTED; } void TCPStream::flush() { } bool TCPStream::isAtEnd() { return !isOpen(); } StreamFeatures TCPStream::getFeatures() { return { .read = true, .write = true, .tell = false, .seek = false, .async = true, .readOptions = { .partial = true, .peek = true } }; } StreamError TCPStream::open(const char* address, std::uint16_t port) noexcept { MIJIN_ASSERT(!isOpen(), "Socket is already open."); handle_ = socket(AF_INET, SOCK_STREAM, 0); if (handle_ < 0) { return translateErrno(); } sockaddr_in connectAddress = { .sin_family = AF_INET, .sin_port = htons(port), .sin_addr = {inet_addr(address)} }; if (connect(handle_, reinterpret_cast(&connectAddress), sizeof(sockaddr_in)) < 0) { ::close(handle_); handle_ = -1; return translateErrno(); } return StreamError::SUCCESS; } void TCPStream::close() noexcept { MIJIN_ASSERT(isOpen(), "Socket is not open."); ::close(handle_); handle_ = -1; } TCPStream& TCPSocket::getStream() noexcept { return stream_; } StreamError TCPServerSocket::setup(const char* address, std::uint16_t port) noexcept { MIJIN_ASSERT(!isListening(), "Socket is already listening."); handle_ = socket(AF_INET, SOCK_STREAM, 0); if (handle_ < 0) { return translateErrno(); } sockaddr_in bindAddress = { .sin_family = AF_INET, .sin_port = htons(port), .sin_addr = {inet_addr(address)} }; static const int ONE = 1; if ((setsockopt(handle_, SOL_SOCKET, SO_REUSEADDR, &ONE, sizeof(int))) || (bind(handle_, reinterpret_cast(&bindAddress), sizeof(sockaddr_in)) < 0) || (listen(handle_, LISTEN_BACKLOG) < 0) || !appendSocketFlags(handle_, O_NONBLOCK)) { close(); return translateErrno(); } return StreamError::SUCCESS; } void TCPServerSocket::close() noexcept { MIJIN_ASSERT(isListening(), "Socket is not listening."); ::close(handle_); handle_ = -1; } Task>> TCPServerSocket::c_waitForConnection() noexcept { while (isListening()) { sockaddr_in client = {}; socklen_t LENGTH = sizeof(sockaddr_in); const int newSocket = accept(handle_, reinterpret_cast(&client), &LENGTH); if (newSocket < 0) { if (errno != EAGAIN) { co_return translateErrno(); } co_await c_suspend(); continue; } std::unique_ptr socket = std::make_unique(); socket->stream_.handle_ = newSocket; co_return socket; } co_return StreamError::CONNECTION_CLOSED; } }