#include "./stream.hpp" #include #include #include namespace mijin { // // internal defines // // // internal constants // // // internal types // // // internal variables // // // internal functions // // // public functions // void Stream::flush() {} mijin::Task Stream::c_readRaw(std::span buffer, const ReadOptions& options, std::size_t* outBytesRead) { std::size_t bytesToRead = buffer.size(); if (bytesToRead == 0) { co_return StreamError::SUCCESS; } while(true) { bool done = false; std::size_t bytesRead = 0; const StreamError error = readRaw(buffer.data() + buffer.size() - bytesToRead, bytesToRead, {.partial = true, .noBlock = true}, &bytesToRead); switch (error) { case StreamError::SUCCESS: bytesToRead -= bytesRead; if (options.partial || bytesToRead == 0) { done = true; } break; case StreamError::WOULD_BLOCK: if (options.noBlock) { co_return StreamError::WOULD_BLOCK; } break; default: co_return error; } if (done) { break; } co_await mijin::c_suspend(); } if (outBytesRead != nullptr) { *outBytesRead = buffer.size() - bytesToRead; } co_return StreamError::SUCCESS; } mijin::Task Stream::c_writeRaw(std::span buffer) { co_return writeRaw(buffer); } StreamError Stream::readBinaryString(std::string& outString) { std::uint32_t length; // NOLINT(cppcoreguidelines-init-variables) StreamError error = read(length); if (error != StreamError::SUCCESS) { return error; } std::string result; result.resize(length); error = readSpan(result.begin(), result.end()); if (error != StreamError::SUCCESS) { return error; } outString = std::move(result); return StreamError::SUCCESS; } StreamError Stream::writeBinaryString(std::string_view str) { MIJIN_ASSERT(str.length() <= std::numeric_limits::max(), "Binary string is too long."); const std::uint32_t length = static_cast(str.length()); const StreamError error = write(length); if (error != StreamError::SUCCESS) { return error; } return writeSpan(str.begin(), str.end()); } StreamError Stream::readZString(std::string& outString) { char chr = '\0'; std::string result; while (true) { if (isAtEnd()) { return StreamError::IO_ERROR; } if (StreamError error = read(chr); error != StreamError::SUCCESS) { return error; } if (chr == '\0') { outString = std::move(result); return StreamError::SUCCESS; } result.push_back(chr); } } StreamError Stream::writeZString(std::string_view str) { static const char ZERO = '\0'; if (StreamError error = writeRaw(str.data(), str.size() * sizeof(char)); error != StreamError::SUCCESS) { return error; } return write(ZERO); } mijin::Task Stream::c_readBinaryString(std::string& outString) { std::uint32_t length; // NOLINT(cppcoreguidelines-init-variables) StreamError error = co_await c_read(length); if (error != StreamError::SUCCESS) { co_return error; } std::string result; result.resize(length); error = co_await c_readSpan(result.begin(), result.end()); if (error != StreamError::SUCCESS) { co_return error; } outString = std::move(result); co_return StreamError::SUCCESS; } mijin::Task Stream::c_writeBinaryString(std::string_view str) { MIJIN_ASSERT(str.length() <= std::numeric_limits::max(), "Binary string is too long."); const std::uint32_t length = static_cast(str.length()); StreamError error = co_await c_write(length); if (error != StreamError::SUCCESS) { co_return error; } co_return co_await c_writeSpan(str.begin(), str.end()); } StreamError Stream::getTotalLength(std::size_t& outLength) { const StreamFeatures features = getFeatures(); if (!features.tell || !features.seek) { return StreamError::NOT_SUPPORTED; } const std::size_t origPos = tell(); if (const StreamError error = seek(0, SeekMode::RELATIVE_TO_END); error != StreamError::SUCCESS) { return error; } outLength = tell(); if (const StreamError error = seek(static_cast(origPos)); error != StreamError::SUCCESS) { return error; } return StreamError::SUCCESS; } StreamError Stream::readLine(std::string& outString) { MIJIN_ASSERT(getFeatures().readOptions.peek, "Stream needs to support peeking."); static constexpr std::size_t BUFFER_SIZE = 4096; std::array buffer; outString.clear(); bool done = false; while (!done) { // read into the buffer std::size_t bytesRead = 0; if (const StreamError error = readRaw(buffer, {.partial = true, .peek = true}, &bytesRead); error != StreamError::SUCCESS) { return error; } // try to find a \n auto begin = buffer.begin(); // NOLINT(readability-qualified-auto) auto end = buffer.begin() + bytesRead; // NOLINT(readability-qualified-auto) auto newline = std::find(begin, end, '\n'); // NOLINT(readability-qualified-auto) if (newline != end) { // found the end outString.append(begin, newline); end = newline + 1; done = true; } else { outString.append(begin, end); } // read again, this time to skip if (const StreamError error = readSpan(begin, end); error != StreamError::SUCCESS) { return error; } if (isAtEnd()) { done = true; } } return StreamError::SUCCESS; } mijin::Task Stream::c_readLine(std::string& outString) { MIJIN_ASSERT(getFeatures().readOptions.peek, "Stream needs to support peeking."); static constexpr std::size_t BUFFER_SIZE = 4096; std::array buffer; outString.clear(); bool done = false; while(!done) { // read into the buffer std::size_t bytesRead = 0; if (StreamError error = co_await c_readRaw(buffer, {.partial = true, .peek = true}, &bytesRead); error != StreamError::SUCCESS) { co_return error; } // try to find a \n auto begin = buffer.begin(); // NOLINT(readability-qualified-auto) auto end = buffer.begin() + bytesRead; // NOLINT(readability-qualified-auto) auto newline = std::find(begin, end, '\n'); // NOLINT(readability-qualified-auto) if (newline != end) { // found the end outString.append(begin, newline); end = newline + 1; done = true; } else { outString.append(begin, end); } // read again, this time to skip if (StreamError error = co_await c_readSpan(begin, end); error != StreamError::SUCCESS) { co_return error; } } co_return StreamError::SUCCESS; } FileStream::~FileStream() { if (handle) { close(); } } StreamError FileStream::open(const char* path, FileOpenMode mode_) { mode = mode_; const char* modeStr; // NOLINT(cppcoreguidelines-init-variables) switch (mode_) { case FileOpenMode::READ: modeStr = "rb"; break; case FileOpenMode::WRITE: modeStr = "wb"; break; case FileOpenMode::APPEND: modeStr = "ab"; break; case FileOpenMode::READ_WRITE: modeStr = "r+b"; break; default: MIJIN_FATAL("Invalid value for mode."); return StreamError::UNKNOWN_ERROR; } handle = std::fopen(path, modeStr); // NOLINT(cppcoreguidelines-owning-memory) if (!handle && mode_ == FileOpenMode::READ_WRITE) { handle = std::fopen(path, "w+b"); // NOLINT(cppcoreguidelines-owning-memory) } if (!handle) { return StreamError::IO_ERROR; } int result = std::fseek(handle, 0, SEEK_END); MIJIN_ASSERT(result == 0, "fseek failed."); length = std::ftell(handle); result = std::fseek(handle, 0, SEEK_SET); MIJIN_ASSERT(result == 0, "fseek failed."); return StreamError::SUCCESS; } void FileStream::close() { MIJIN_ASSERT(handle != nullptr, "FileStream is not open."); [[maybe_unused]] const int result = std::fclose(handle); // NOLINT(cppcoreguidelines-owning-memory) MIJIN_ASSERT(result == 0, "fclose failed."); handle = nullptr; } StreamError FileStream::readRaw(std::span buffer, const ReadOptions& options, std::size_t* outBytesRead) { MIJIN_ASSERT(handle != nullptr, "FileStream is not open."); MIJIN_ASSERT(mode == FileOpenMode::READ || mode == FileOpenMode::READ_WRITE, "Cannot read from this stream"); const std::size_t readBytes = std::fread(buffer.data(), 1, buffer.size(), handle); if (std::ferror(handle)) { return StreamError::IO_ERROR; } if (!options.partial && readBytes < buffer.size()) { return StreamError::IO_ERROR; } if (outBytesRead != nullptr) { *outBytesRead = readBytes; } if (options.peek) { if (const StreamError error = seek(-static_cast(readBytes), SeekMode::RELATIVE); error != StreamError::SUCCESS) { return error; } } return StreamError::SUCCESS; } StreamError FileStream::writeRaw(std::span buffer) { MIJIN_ASSERT(handle != nullptr, "FileStream is not open."); MIJIN_ASSERT(mode == FileOpenMode::WRITE || mode == FileOpenMode::READ_WRITE || mode == FileOpenMode::APPEND, "Cannot write to this stream"); const std::size_t written = std::fwrite(buffer.data(), 1, buffer.size(), handle); if (written != buffer.size() || std::ferror(handle)) { return StreamError::IO_ERROR; } length = std::max(length, std::ftell(handle)); return StreamError::SUCCESS; } std::size_t FileStream::tell() { MIJIN_ASSERT(handle != nullptr, "FileStream is not open."); return std::ftell(handle); } StreamError FileStream::seek(std::intptr_t pos, SeekMode seekMode) { MIJIN_ASSERT(handle != nullptr, "FileStream is not open."); int origin; // NOLINT(cppcoreguidelines-init-variables) switch (seekMode) { case SeekMode::ABSOLUTE: origin = SEEK_SET; break; case SeekMode::RELATIVE: origin = SEEK_CUR; break; case SeekMode::RELATIVE_TO_END: origin = SEEK_END; break; default: MIJIN_ERROR("Invalid value passed as seekMode!"); return StreamError::UNKNOWN_ERROR; } const int result = std::fseek(handle, static_cast(pos), origin); if (result != 0 || std::ferror(handle)) { return StreamError::IO_ERROR; } return StreamError::SUCCESS; } void FileStream::flush() { [[maybe_unused]] const int result = std::fflush(handle); MIJIN_ASSERT(result == 0, "fflush failed."); } bool FileStream::isAtEnd() { MIJIN_ASSERT(handle != nullptr, "FileStream is not open."); (void) std::fgetc(handle); if (std::feof(handle)) { return true; } [[maybe_unused]] const int result = std::fseek(handle, -1, SEEK_CUR); MIJIN_ASSERT(result == 0, "fseek failed."); return false; } StreamFeatures FileStream::getFeatures() { if (handle) { return { .read = (mode == FileOpenMode::READ), .write = (mode == FileOpenMode::WRITE || mode == FileOpenMode::APPEND || mode == FileOpenMode::READ_WRITE), .tell = true, .seek = true, .readOptions = { .partial = true, .peek = true } }; } return {}; } void MemoryStream::openRW(std::span data) { MIJIN_ASSERT(!isOpen(), "MemoryStream is already open."); data_ = data; pos_ = 0; canWrite_ = true; } void MemoryStream::openROImpl(const void* data, std::size_t bytes) { MIJIN_ASSERT(!isOpen(), "MemoryStream is already open."); data_ = std::span(const_cast(static_cast(data)), bytes); // NOLINT(cppcoreguidelines-pro-type-const-cast) we'll be fine pos_ = 0; canWrite_ = false; } void MemoryStream::close() { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); data_ = {}; } StreamError MemoryStream::readRaw(std::span buffer, const ReadOptions& options, std::size_t* outBytesRead) { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); if (!options.partial && availableBytes() < buffer.size()) { return StreamError::IO_ERROR; // TODO: need more errors? } const std::size_t numBytes = std::min(buffer.size(), availableBytes()); std::copy_n(data_.begin() + static_cast(pos_), numBytes, buffer.begin()); if (outBytesRead) { *outBytesRead = numBytes; } if (!options.peek) { pos_ += numBytes; } return StreamError::SUCCESS; } StreamError MemoryStream::writeRaw(std::span buffer) { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); if (!canWrite_) { return StreamError::NOT_SUPPORTED; } if (availableBytes() < buffer.size()) { return StreamError::IO_ERROR; } std::copy(buffer.begin(), buffer.end(), data_.begin() + static_cast(pos_)); pos_ += buffer.size(); return StreamError::SUCCESS; } std::size_t MemoryStream::tell() { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); return pos_; } StreamError MemoryStream::seek(std::intptr_t pos, SeekMode seekMode) { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); std::intptr_t newPos = -1; switch (seekMode) { case SeekMode::ABSOLUTE: newPos = pos; break; case SeekMode::RELATIVE: newPos = static_cast(pos_) + pos; break; case SeekMode::RELATIVE_TO_END: newPos = static_cast(data_.size()) + pos; break; } if (newPos < 0 || newPos > static_cast(data_.size())) { return StreamError::IO_ERROR; } pos_ = newPos; return StreamError::SUCCESS; } bool MemoryStream::isAtEnd() { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); return pos_ == data_.size(); } StreamFeatures MemoryStream::getFeatures() { return { .read = true, .write = canWrite_, .tell = true, .seek = true, .readOptions = { .peek = true } }; } } // namespace mijin