1#ifndef SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
3#ifndef SIMDJSON_CONDITIONAL_INCLUDE
4#define SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
5#include "simdjson/generic/ondemand/base.h"
6#include "simdjson/generic/ondemand/document_stream.h"
7#include "simdjson/generic/ondemand/document-inl.h"
8#include "simdjson/generic/implementation_simdjson_result_base-inl.h"
15namespace SIMDJSON_IMPLEMENTATION {
18#ifdef SIMDJSON_THREADS_ENABLED
20inline void stage1_worker::finish() {
25 std::unique_lock<std::mutex> lock(locking_mutex);
26 cond_var.wait(lock, [
this]{
return has_work ==
false;});
29inline stage1_worker::~stage1_worker() {
36inline void stage1_worker::start_thread() {
37 std::unique_lock<std::mutex> lock(locking_mutex);
38 if(thread.joinable()) {
41 thread = std::thread([
this]{
43 std::unique_lock<std::mutex> thread_lock(locking_mutex);
45 cond_var.wait(thread_lock, [
this]{
return has_work || !can_work;});
52 this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
53 this->_next_batch_start);
54 this->has_work =
false;
58 cond_var.notify_one();
66inline void stage1_worker::stop_thread() {
67 std::unique_lock<std::mutex> lock(locking_mutex);
71 cond_var.notify_all();
73 if(thread.joinable()) {
78inline void stage1_worker::run(document_stream * ds, parser * stage1,
size_t next_batch_start) {
79 std::unique_lock<std::mutex> lock(locking_mutex);
81 _next_batch_start = next_batch_start;
82 stage1_thread_parser = stage1;
87 cond_var.notify_one();
94 ondemand::parser &_parser,
98 bool _allow_comma_separated
103 batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
104 allow_comma_separated{_allow_comma_separated},
106 #ifdef SIMDJSON_THREADS_ENABLED
107 , use_thread(_parser.threaded)
110#ifdef SIMDJSON_THREADS_ENABLED
111 if(worker.get() ==
nullptr) {
122 allow_comma_separated{
false},
124 #ifdef SIMDJSON_THREADS_ENABLED
130simdjson_inline document_stream::~document_stream() noexcept
132 #ifdef SIMDJSON_THREADS_ENABLED
142 if(error ==
CAPACITY) {
return len - batch_start; }
143 return parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes] -
parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes + 1];
147 : stream{
nullptr}, finished{
true} {
151 : stream{_stream}, finished{is_end} {
169 if (stream->error) { finished =
true; }
178 if (stream->error ==
EMPTY) { finished =
true; }
191 return finished != other.finished;
195 return finished == other.finished;
208inline void document_stream::start() noexcept {
209 if (error) {
return; }
210 error = parser->allocate(batch_size);
211 if (error) {
return; }
214 error = run_stage1(*parser, batch_start);
215 while(error ==
EMPTY) {
217 batch_start = next_batch_start();
218 if (batch_start >= len) {
return; }
219 error = run_stage1(*parser, batch_start);
221 if (error) {
return; }
222 doc_index = batch_start;
223 doc = document(json_iterator(&buf[batch_start], parser));
224 doc.
iter._streaming =
true;
226 #ifdef SIMDJSON_THREADS_ENABLED
227 if (use_thread && next_batch_start() < len) {
229 error = stage1_thread_parser.allocate(batch_size);
230 if (error) {
return; }
231 worker->start_thread();
232 start_stage1_thread();
233 if (error) {
return; }
238inline void document_stream::next() noexcept {
240 if (error) {
return; }
242 if (error) {
return; }
243 auto cur_struct_index = doc.
iter._root - parser->implementation->structural_indexes.get();
244 doc_index = batch_start + parser->implementation->structural_indexes[cur_struct_index];
247 if(cur_struct_index >=
static_cast<int64_t
>(parser->implementation->n_structural_indexes)) {
250 while (error ==
EMPTY) {
251 batch_start = next_batch_start();
252 if (batch_start >= len) {
break; }
253 #ifdef SIMDJSON_THREADS_ENABLED
255 load_from_stage1_thread();
257 error = run_stage1(*parser, batch_start);
260 error = run_stage1(*parser, batch_start);
296 doc.
iter = json_iterator(&buf[batch_start], parser);
297 doc.
iter._streaming =
true;
302 if (error) {
continue; }
303 doc_index = batch_start;
308inline void document_stream::next_document() noexcept {
310 error = doc.
iter.skip_child(0);
311 if (error) {
return; }
315 if (allow_comma_separated) {
317 static_cast<void>(ignored);
320 doc.
iter._string_buf_loc = parser->string_buf.get();
321 doc.
iter._root = doc.
iter.position();
324inline size_t document_stream::next_batch_start() const noexcept {
325 return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
328inline error_code document_stream::run_stage1(ondemand::parser &p,
size_t _batch_start)
noexcept {
331 size_t remaining = len - _batch_start;
332 if (remaining <= batch_size) {
333 return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
335 return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
339simdjson_inline
size_t document_stream::iterator::current_index() const noexcept {
340 return stream->doc_index;
343simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
344 auto depth = stream->doc.iter.depth();
345 auto cur_struct_index = stream->doc.iter._root - stream->parser->implementation->structural_indexes.get();
348 if (stream->doc.iter.at_root()) {
349 switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
359 auto next_index = stream->parser->implementation->structural_indexes[++cur_struct_index];
361 size_t svlen = next_index - current_index();
362 const char *start =
reinterpret_cast<const char*
>(stream->buf) + current_index();
363 while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] ==
'\0')) {
366 return std::string_view(start, svlen);
372 while (cur_struct_index <=
static_cast<int64_t
>(stream->parser->implementation->n_structural_indexes)) {
373 switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
381 if (depth == 0) {
break; }
385 return std::string_view(
reinterpret_cast<const char*
>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[cur_struct_index] - current_index() + stream->batch_start + 1);;
389 return stream->error;
392#ifdef SIMDJSON_THREADS_ENABLED
394inline void document_stream::load_from_stage1_thread() noexcept {
398 std::swap(stage1_thread_parser,*
parser);
399 error = stage1_thread_error;
400 if (error) {
return; }
403 if (next_batch_start() < len) {
404 start_stage1_thread();
408inline void document_stream::start_stage1_thread() noexcept {
414 size_t _next_batch_start = this->next_batch_start();
416 worker->run(
this, & this->stage1_thread_parser, _next_batch_start);
427simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
430 implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(error)
433simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
434 SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value
436 implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(
437 std::forward<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(value)
iterator & operator++() noexcept
Advance to the next document (prefix).
simdjson_inline bool operator!=(const iterator &other) const noexcept
Check if we're at the end yet.
simdjson_inline iterator() noexcept
Default constructor.
error_code error() const noexcept
Returns error of the stream (if any).
simdjson_inline reference operator*() noexcept
Get the current document (or error).
bool at_end() const noexcept
Returns whether the iterator is at the end.
simdjson_inline iterator end() noexcept
The end of the stream, for iterator comparison purposes.
size_t size_in_bytes() const noexcept
Returns the input size in bytes.
size_t truncated_bytes() const noexcept
After iterating through the stream, this method returns the number of bytes that were not parsed at t...
simdjson_inline iterator begin() noexcept
Start iterating the documents in the stream.
simdjson_inline document_stream() noexcept
Construct an uninitialized document_stream.
json_iterator iter
Current position in the document.
A JSON fragment iterator.
The top level simdjson namespace, containing everything the library provides.
error_code
All possible errors returned by simdjson.
@ CAPACITY
This parser can't support a document that big.
@ EMPTY
no structural element found
@ MEMALLOC
Error allocating memory, most likely out of memory.
@ UNINITIALIZED
unknown error, or uninitialized document
The result of a simdjson operation that could fail.