1#ifndef SIMDJSON_DOCUMENT_STREAM_INL_H
2#define SIMDJSON_DOCUMENT_STREAM_INL_H
4#include "simdjson/dom/base.h"
5#include "simdjson/dom/document_stream.h"
6#include "simdjson/dom/element-inl.h"
7#include "simdjson/dom/parser-inl.h"
8#include "simdjson/error-inl.h"
9#include "simdjson/internal/dom_parser_implementation.h"
14#ifdef SIMDJSON_THREADS_ENABLED
16inline void stage1_worker::finish() {
25inline stage1_worker::~stage1_worker() {
32inline void stage1_worker::start_thread() {
37 thread = std::thread([
this]{
62inline void stage1_worker::stop_thread() {
74inline void stage1_worker::run(document_stream * ds, dom::parser * stage1,
size_t next_batch_start) {
97 batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
99#ifdef SIMDJSON_THREADS_ENABLED
100 , use_thread(_parser.threaded)
103#ifdef SIMDJSON_THREADS_ENABLED
104 if(worker.get() ==
nullptr) {
116#ifdef SIMDJSON_THREADS_ENABLED
122simdjson_inline document_stream::~document_stream() noexcept {
123#ifdef SIMDJSON_THREADS_ENABLED
129 : stream{
nullptr}, finished{
true} {
143 : stream{
_stream}, finished{is_end} {
152 if (stream->error) {
return stream->error; }
153 return stream->parser->doc.root();
167 if (stream->error) { finished =
true; }
176 if (stream->error ==
EMPTY) { finished =
true; }
184 return finished !=
other.finished;
187inline void document_stream::start()
noexcept {
188 if (error) {
return; }
189 error =
parser->ensure_capacity(batch_size);
190 if (error) {
return; }
193 error = run_stage1(*parser, batch_start);
194 while(error ==
EMPTY) {
196 batch_start = next_batch_start();
197 if (batch_start >= len) {
return; }
198 error = run_stage1(*parser, batch_start);
200 if (error) {
return; }
201#ifdef SIMDJSON_THREADS_ENABLED
202 if (use_thread && next_batch_start() < len) {
204 error = stage1_thread_parser.ensure_capacity(batch_size);
205 if (error) {
return; }
206 worker->start_thread();
207 start_stage1_thread();
208 if (error) {
return; }
214simdjson_inline
size_t document_stream::iterator::current_index() const noexcept {
215 return stream->doc_index;
218simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
219 const char* start =
reinterpret_cast<const char*
>(stream->buf) + current_index();
220 bool object_or_array = ((*start ==
'[') || (*start ==
'{'));
221 if(object_or_array) {
222 size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index - 1];
223 return std::string_view(start, next_doc_index - current_index() + 1);
225 size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index];
226 size_t svlen = next_doc_index - current_index();
227 while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] ==
'\0')) {
230 return std::string_view(start, svlen);
235inline void document_stream::next() noexcept {
237 if (error) {
return; }
240 doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
241 error = parser->implementation->stage2_next(parser->doc);
243 while (error ==
EMPTY) {
244 batch_start = next_batch_start();
245 if (batch_start >= len) {
break; }
247#ifdef SIMDJSON_THREADS_ENABLED
249 load_from_stage1_thread();
251 error = run_stage1(*parser, batch_start);
254 error = run_stage1(*parser, batch_start);
256 if (error) {
continue; }
258 doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
259 error = parser->implementation->stage2_next(parser->doc);
267 if(error ==
CAPACITY) {
return len - batch_start; }
268 return parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes] -
parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes + 1];
271inline size_t document_stream::next_batch_start()
const noexcept {
272 return batch_start +
parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes];
276 size_t remaining = len - _batch_start;
277 if (remaining <= batch_size) {
278 return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
280 return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
284#ifdef SIMDJSON_THREADS_ENABLED
286inline void document_stream::load_from_stage1_thread() noexcept {
290 std::swap(*parser, stage1_thread_parser);
291 error = stage1_thread_error;
292 if (error) {
return; }
295 if (next_batch_start() < len) {
296 start_stage1_thread();
300inline void document_stream::start_stage1_thread() noexcept {
306 size_t _next_batch_start = this->next_batch_start();
308 worker->run(
this, & this->stage1_thread_parser, _next_batch_start);
315simdjson_inline simdjson_result<dom::document_stream>::simdjson_result() noexcept
316 : simdjson_result_base() {
318simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(
error_code error) noexcept
319 : simdjson_result_base(error) {
321simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(dom::document_stream &&value) noexcept
322 : simdjson_result_base(std::forward<dom::document_stream>(value)) {
325#if SIMDJSON_EXCEPTIONS
326simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept(false) {
327 if (error()) {
throw simdjson_error(error()); }
328 return first.begin();
330simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept(false) {
331 if (error()) {
throw simdjson_error(error()); }
335#ifndef SIMDJSON_DISABLE_DEPRECATED_API
336simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept {
337 first.error = error();
338 return first.begin();
340simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept {
341 first.error = error();
simdjson_inline array() noexcept
Create a new, invalid array.
An iterator through a forward-only stream of documents.
simdjson_inline reference operator*() noexcept
Get the current document (or error).
simdjson_inline bool operator!=(const iterator &other) const noexcept
Check if we're at the end yet.
simdjson_inline iterator() noexcept
Default constructor.
iterator & operator++() noexcept
Advance to the next document (prefix).
A forward-only stream of documents.
size_t size_in_bytes() const noexcept
Returns the input size in bytes.
size_t truncated_bytes() const noexcept
After iterating through the stream, this method returns the number of bytes that were not parsed at t...
simdjson_inline iterator begin() noexcept
Start iterating the documents in the stream.
simdjson_inline iterator end() noexcept
The end of the stream, for iterator comparison purposes.
simdjson_inline document_stream() noexcept
Construct an uninitialized document_stream.
A persistent document parser.
The top level simdjson namespace, containing everything the library provides.
error_code
All possible errors returned by simdjson.
@ CAPACITY
This parser can't support a document that big.
@ EMPTY
no structural element found
@ MEMALLOC
Error allocating memory, most likely out of memory.
@ UNINITIALIZED
unknown error, or uninitialized document
The result of a simdjson operation that could fail.