#include "./stream.hpp" #include #include #include namespace mijin { // // internal defines // // // internal constants // // // internal types // // // internal variables // // // internal functions // // // public functions // void Stream::flush() {} mijin::Task Stream::c_readRaw(std::span buffer, const ReadOptions& options, std::size_t* outBytesRead) { std::size_t bytesToRead = buffer.size(); if (bytesToRead == 0) { co_return StreamError::SUCCESS; } while(true) { bool done = false; std::size_t bytesRead = 0; const StreamError error = readRaw(buffer.data() + buffer.size() - bytesToRead, bytesToRead, {.partial = true, .noBlock = true}, &bytesToRead); switch (error) { case StreamError::SUCCESS: bytesToRead -= bytesRead; if (options.partial || bytesToRead == 0) { done = true; } break; case StreamError::WOULD_BLOCK: if (options.noBlock) { co_return StreamError::WOULD_BLOCK; } break; default: co_return error; } if (done) { break; } co_await mijin::c_suspend(); } if (outBytesRead != nullptr) { *outBytesRead = buffer.size() - bytesToRead; } co_return StreamError::SUCCESS; } mijin::Task Stream::c_writeRaw(std::span buffer) { co_return writeRaw(buffer); } StreamError Stream::readBinaryString(std::string& outString) { std::uint32_t length; // NOLINT(cppcoreguidelines-init-variables) StreamError error = read(length); if (error != StreamError::SUCCESS) { return error; } std::string result; result.resize(length); error = readSpan(result.begin(), result.end()); if (error != StreamError::SUCCESS) { return error; } outString = std::move(result); return StreamError::SUCCESS; } StreamError Stream::writeBinaryString(std::string_view str) { MIJIN_ASSERT(str.length() <= std::numeric_limits::max(), "Binary string is too long."); const std::uint32_t length = static_cast(str.length()); const StreamError error = write(length); if (error != StreamError::SUCCESS) { return error; } return writeSpan(str.begin(), str.end()); } StreamError Stream::readZString(std::string& outString) { char chr = '\0'; std::string result; while (true) { if (isAtEnd()) { return StreamError::IO_ERROR; } if (StreamError error = read(chr); error != StreamError::SUCCESS) { return error; } if (chr == '\0') { outString = std::move(result); return StreamError::SUCCESS; } result.push_back(chr); } } StreamError Stream::writeZString(std::string_view str) { static const char ZERO = '\0'; if (StreamError error = writeRaw(str.data(), str.size() * sizeof(char)); error != StreamError::SUCCESS) { return error; } return write(ZERO); } mijin::Task Stream::c_readBinaryString(std::string& outString) { std::uint32_t length; // NOLINT(cppcoreguidelines-init-variables) StreamError error = co_await c_read(length); if (error != StreamError::SUCCESS) { co_return error; } std::string result; result.resize(length); error = co_await c_readSpan(result.begin(), result.end()); if (error != StreamError::SUCCESS) { co_return error; } outString = std::move(result); co_return StreamError::SUCCESS; } mijin::Task Stream::c_writeBinaryString(std::string_view str) { MIJIN_ASSERT(str.length() <= std::numeric_limits::max(), "Binary string is too long."); const std::uint32_t length = static_cast(str.length()); StreamError error = co_await c_write(length); if (error != StreamError::SUCCESS) { co_return error; } co_return co_await c_writeSpan(str.begin(), str.end()); } StreamError Stream::getTotalLength(std::size_t& outLength) { const StreamFeatures features = getFeatures(); if (!features.tell || !features.seek) { return StreamError::NOT_SUPPORTED; } const std::size_t origPos = tell(); if (const StreamError error = seek(0, SeekMode::RELATIVE_TO_END); error != StreamError::SUCCESS) { return error; } outLength = tell(); if (const StreamError error = seek(static_cast(origPos)); error != StreamError::SUCCESS) { return error; } return StreamError::SUCCESS; } StreamError Stream::readLine(std::string& outString) { MIJIN_ASSERT(getFeatures().readOptions.peek, "Stream needs to support peeking."); static constexpr std::size_t BUFFER_SIZE = 4096; std::array buffer; outString.clear(); bool done = false; while (!done) { // read into the buffer std::size_t bytesRead = 0; if (const StreamError error = readRaw(buffer, {.partial = true, .peek = true}, &bytesRead); error != StreamError::SUCCESS) { return error; } // try to find a \n auto begin = buffer.begin(); // NOLINT(readability-qualified-auto) auto end = buffer.begin() + bytesRead; // NOLINT(readability-qualified-auto) auto newline = std::find(begin, end, '\n'); // NOLINT(readability-qualified-auto) if (newline != end) { // found the end outString.append(begin, newline); end = newline + 1; done = true; } else { outString.append(begin, end); } // read again, this time to skip if (const StreamError error = readSpan(begin, end); error != StreamError::SUCCESS) { return error; } if (isAtEnd()) { done = true; } } return StreamError::SUCCESS; } mijin::Task Stream::c_readLine(std::string& outString) { MIJIN_ASSERT(getFeatures().readOptions.peek, "Stream needs to support peeking."); static constexpr std::size_t BUFFER_SIZE = 4096; std::array buffer; outString.clear(); bool done = false; while(!done) { // read into the buffer std::size_t bytesRead = 0; if (StreamError error = co_await c_readRaw(buffer, {.partial = true, .peek = true}, &bytesRead); error != StreamError::SUCCESS) { co_return error; } // try to find a \n auto begin = buffer.begin(); // NOLINT(readability-qualified-auto) auto end = buffer.begin() + bytesRead; // NOLINT(readability-qualified-auto) auto newline = std::find(begin, end, '\n'); // NOLINT(readability-qualified-auto) if (newline != end) { // found the end outString.append(begin, newline); end = newline + 1; done = true; } else { outString.append(begin, end); } // read again, this time to skip if (StreamError error = co_await c_readSpan(begin, end); error != StreamError::SUCCESS) { co_return error; } } co_return StreamError::SUCCESS; } StreamError Stream::copyTo(Stream& other) { MIJIN_ASSERT(getFeatures().read, "Stream must support reading."); MIJIN_ASSERT(other.getFeatures().write, "Other stream must support writing."); static constexpr std::size_t CHUNK_SIZE = 4096; std::array chunk = {}; while (!isAtEnd()) { std::size_t bytesRead = 0; if (const StreamError error = readRaw(chunk, {.partial = true}, &bytesRead); error != StreamError::SUCCESS) { return error; } if (const StreamError error = other.writeRaw(chunk.data(), bytesRead); error != StreamError::SUCCESS) { return error; } } return StreamError::SUCCESS; } FileStream::FileStream(FileStream&& other) MIJIN_NOEXCEPT : handle_(std::exchange(other.handle_, nullptr)), mode_(other.mode_), length_(other.length_) { } FileStream::~FileStream() noexcept { if (handle_) { close(); } } FileStream& FileStream::operator=(FileStream&& other) MIJIN_NOEXCEPT { if (this != &other) { if (handle_) { close(); } handle_ = std::exchange(other.handle_, nullptr); mode_ = other.mode_; length_ = other.length_; } return *this; } StreamError FileStream::open(const char* path, FileOpenMode mode) { mode_ = mode; const char* modeStr; // NOLINT(cppcoreguidelines-init-variables) switch (mode_) { case FileOpenMode::READ: modeStr = "rb"; break; case FileOpenMode::WRITE: modeStr = "wb"; break; case FileOpenMode::APPEND: modeStr = "ab"; break; case FileOpenMode::READ_WRITE: modeStr = "r+b"; break; default: MIJIN_FATAL("Invalid value for mode."); return StreamError::UNKNOWN_ERROR; } handle_ = std::fopen(path, modeStr); // NOLINT(cppcoreguidelines-owning-memory) if (!handle_ && mode_ == FileOpenMode::READ_WRITE) { handle_ = std::fopen(path, "w+b"); // NOLINT(cppcoreguidelines-owning-memory) } if (!handle_) { return StreamError::IO_ERROR; } int result = std::fseek(handle_, 0, SEEK_END); MIJIN_ASSERT(result == 0, "fseek failed."); length_ = std::ftell(handle_); result = std::fseek(handle_, 0, SEEK_SET); MIJIN_ASSERT(result == 0, "fseek failed."); return StreamError::SUCCESS; } void FileStream::close() { MIJIN_ASSERT(handle_ != nullptr, "FileStream is not open."); [[maybe_unused]] const int result = std::fclose(handle_); // NOLINT(cppcoreguidelines-owning-memory) MIJIN_ASSERT(result == 0, "fclose failed."); handle_ = nullptr; } StreamError FileStream::readRaw(std::span buffer, const ReadOptions& options, std::size_t* outBytesRead) { MIJIN_ASSERT(handle_ != nullptr, "FileStream is not open."); MIJIN_ASSERT(mode_ == FileOpenMode::READ || mode_ == FileOpenMode::READ_WRITE, "Cannot read from this stream"); const std::size_t readBytes = std::fread(buffer.data(), 1, buffer.size(), handle_); if (std::ferror(handle_)) { return StreamError::IO_ERROR; } if (!options.partial && readBytes < buffer.size()) { return StreamError::IO_ERROR; } if (outBytesRead != nullptr) { *outBytesRead = readBytes; } if (options.peek) { if (const StreamError error = seek(-static_cast(readBytes), SeekMode::RELATIVE); error != StreamError::SUCCESS) { return error; } } return StreamError::SUCCESS; } StreamError FileStream::writeRaw(std::span buffer) { MIJIN_ASSERT(handle_ != nullptr, "FileStream is not open."); MIJIN_ASSERT(mode_ == FileOpenMode::WRITE || mode_ == FileOpenMode::READ_WRITE || mode_ == FileOpenMode::APPEND, "Cannot write to this stream"); const std::size_t written = std::fwrite(buffer.data(), 1, buffer.size(), handle_); if (written != buffer.size() || std::ferror(handle_)) { return StreamError::IO_ERROR; } length_ = std::max(length_, std::ftell(handle_)); return StreamError::SUCCESS; } std::size_t FileStream::tell() { MIJIN_ASSERT(handle_ != nullptr, "FileStream is not open."); return std::ftell(handle_); } StreamError FileStream::seek(std::intptr_t pos, SeekMode seekMode) { MIJIN_ASSERT(handle_ != nullptr, "FileStream is not open."); int origin; // NOLINT(cppcoreguidelines-init-variables) switch (seekMode) { case SeekMode::ABSOLUTE: origin = SEEK_SET; break; case SeekMode::RELATIVE: origin = SEEK_CUR; break; case SeekMode::RELATIVE_TO_END: origin = SEEK_END; break; default: MIJIN_ERROR("Invalid value passed as seekMode!"); return StreamError::UNKNOWN_ERROR; } const int result = std::fseek(handle_, static_cast(pos), origin); if (result != 0 || std::ferror(handle_)) { return StreamError::IO_ERROR; } return StreamError::SUCCESS; } void FileStream::flush() { [[maybe_unused]] const int result = std::fflush(handle_); MIJIN_ASSERT(result == 0, "fflush failed."); } bool FileStream::isAtEnd() { MIJIN_ASSERT(handle_ != nullptr, "FileStream is not open."); (void) std::fgetc(handle_); if (std::feof(handle_)) { return true; } [[maybe_unused]] const int result = std::fseek(handle_, -1, SEEK_CUR); MIJIN_ASSERT(result == 0, "fseek failed."); return false; } StreamFeatures FileStream::getFeatures() { if (handle_) { return { .read = (mode_ == FileOpenMode::READ || mode_ == FileOpenMode::READ_WRITE), .write = (mode_ == FileOpenMode::WRITE || mode_ == FileOpenMode::APPEND || mode_ == FileOpenMode::READ_WRITE), .tell = true, .seek = true, .readOptions = { .partial = true, .peek = true } }; } return {}; } void MemoryStream::openRW(std::span data) { MIJIN_ASSERT(!isOpen(), "MemoryStream is already open."); data_ = data; pos_ = 0; canWrite_ = true; } void MemoryStream::openROImpl(const void* data, std::size_t bytes) { MIJIN_ASSERT(!isOpen(), "MemoryStream is already open."); data_ = std::span(const_cast(static_cast(data)), bytes); // NOLINT(cppcoreguidelines-pro-type-const-cast) we'll be fine pos_ = 0; canWrite_ = false; } void MemoryStream::close() { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); data_ = {}; } StreamError MemoryStream::readRaw(std::span buffer, const ReadOptions& options, std::size_t* outBytesRead) { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); if (!options.partial && availableBytes() < buffer.size()) { return StreamError::IO_ERROR; // TODO: need more errors? } const std::size_t numBytes = std::min(buffer.size(), availableBytes()); std::copy_n(data_.begin() + static_cast(pos_), numBytes, buffer.begin()); if (outBytesRead) { *outBytesRead = numBytes; } if (!options.peek) { pos_ += numBytes; } return StreamError::SUCCESS; } StreamError MemoryStream::writeRaw(std::span buffer) { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); if (!canWrite_) { return StreamError::NOT_SUPPORTED; } if (availableBytes() < buffer.size()) { return StreamError::IO_ERROR; } std::copy(buffer.begin(), buffer.end(), data_.begin() + static_cast(pos_)); pos_ += buffer.size(); return StreamError::SUCCESS; } std::size_t MemoryStream::tell() { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); return pos_; } StreamError MemoryStream::seek(std::intptr_t pos, SeekMode seekMode) { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); std::intptr_t newPos = -1; switch (seekMode) { case SeekMode::ABSOLUTE: newPos = pos; break; case SeekMode::RELATIVE: newPos = static_cast(pos_) + pos; break; case SeekMode::RELATIVE_TO_END: newPos = static_cast(data_.size()) + pos; break; } if (newPos < 0 || newPos > static_cast(data_.size())) { return StreamError::IO_ERROR; } pos_ = newPos; return StreamError::SUCCESS; } bool MemoryStream::isAtEnd() { MIJIN_ASSERT(isOpen(), "MemoryStream is not open."); return pos_ == data_.size(); } StreamFeatures MemoryStream::getFeatures() { return { .read = true, .write = canWrite_, .tell = true, .seek = true, .readOptions = { .peek = true } }; } } // namespace mijin