1 #ifndef SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
3 #ifndef SIMDJSON_CONDITIONAL_INCLUDE
4 #define SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
5 #include "simdjson/generic/ondemand/base.h"
6 #include "simdjson/generic/ondemand/document_stream.h"
7 #include "simdjson/generic/ondemand/document-inl.h"
8 #include "simdjson/generic/implementation_simdjson_result_base-inl.h"
15 namespace SIMDJSON_IMPLEMENTATION {
18 #ifdef SIMDJSON_THREADS_ENABLED
20 inline void stage1_worker::finish() {
25 std::unique_lock<std::mutex> lock(locking_mutex);
26 cond_var.wait(lock, [
this]{
return has_work ==
false;});
29 inline stage1_worker::~stage1_worker() {
36 inline void stage1_worker::start_thread() {
37 std::unique_lock<std::mutex> lock(locking_mutex);
38 if(thread.joinable()) {
41 thread = std::thread([
this]{
43 std::unique_lock<std::mutex> thread_lock(locking_mutex);
45 cond_var.wait(thread_lock, [
this]{
return has_work || !can_work;});
52 this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
53 this->_next_batch_start);
54 this->has_work =
false;
58 cond_var.notify_one();
66 inline void stage1_worker::stop_thread() {
67 std::unique_lock<std::mutex> lock(locking_mutex);
71 cond_var.notify_all();
73 if(thread.joinable()) {
78 inline void stage1_worker::run(document_stream * ds, parser * stage1,
size_t next_batch_start) {
79 std::unique_lock<std::mutex> lock(locking_mutex);
81 _next_batch_start = next_batch_start;
82 stage1_thread_parser = stage1;
87 cond_var.notify_one();
94 ondemand::parser &_parser,
98 bool _allow_comma_separated
103 batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
104 allow_comma_separated{_allow_comma_separated},
106 #ifdef SIMDJSON_THREADS_ENABLED
107 , use_thread(_parser.threaded)
110 #ifdef SIMDJSON_THREADS_ENABLED
111 if(worker.get() ==
nullptr) {
122 allow_comma_separated{
false},
124 #ifdef SIMDJSON_THREADS_ENABLED
130 simdjson_inline document_stream::~document_stream() noexcept
132 #ifdef SIMDJSON_THREADS_ENABLED
142 if(error ==
CAPACITY) {
return len - batch_start; }
143 return parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes] -
parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes + 1];
147 : stream{
nullptr}, finished{
true} {
151 : stream{_stream}, finished{is_end} {
169 if (stream->error) { finished =
true; }
178 if (stream->error ==
EMPTY) { finished =
true; }
186 return finished != other.finished;
199 inline void document_stream::start() noexcept {
200 if (error) {
return; }
201 error = parser->allocate(batch_size);
202 if (error) {
return; }
205 error = run_stage1(*parser, batch_start);
206 while(error ==
EMPTY) {
208 batch_start = next_batch_start();
209 if (batch_start >= len) {
return; }
210 error = run_stage1(*parser, batch_start);
212 if (error) {
return; }
213 doc_index = batch_start;
214 doc = document(json_iterator(&buf[batch_start], parser));
215 doc.
iter._streaming =
true;
217 #ifdef SIMDJSON_THREADS_ENABLED
218 if (use_thread && next_batch_start() < len) {
220 error = stage1_thread_parser.allocate(batch_size);
221 if (error) {
return; }
222 worker->start_thread();
223 start_stage1_thread();
224 if (error) {
return; }
229 inline void document_stream::next() noexcept {
231 if (error) {
return; }
233 if (error) {
return; }
234 auto cur_struct_index = doc.
iter._root - parser->implementation->structural_indexes.get();
235 doc_index = batch_start + parser->implementation->structural_indexes[cur_struct_index];
238 if(cur_struct_index >=
static_cast<int64_t
>(parser->implementation->n_structural_indexes)) {
241 while (error ==
EMPTY) {
242 batch_start = next_batch_start();
243 if (batch_start >= len) {
break; }
244 #ifdef SIMDJSON_THREADS_ENABLED
246 load_from_stage1_thread();
248 error = run_stage1(*parser, batch_start);
251 error = run_stage1(*parser, batch_start);
287 doc.
iter = json_iterator(&buf[batch_start], parser);
288 doc.
iter._streaming =
true;
293 if (error) {
continue; }
294 doc_index = batch_start;
299 inline void document_stream::next_document() noexcept {
301 error = doc.
iter.skip_child(0);
302 if (error) {
return; }
306 if (allow_comma_separated) { doc.
iter.consume_character(
','); }
308 doc.
iter._string_buf_loc = parser->string_buf.get();
309 doc.
iter._root = doc.
iter.position();
312 inline size_t document_stream::next_batch_start() const noexcept {
313 return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
316 inline error_code document_stream::run_stage1(ondemand::parser &p,
size_t _batch_start) noexcept {
319 size_t remaining = len - _batch_start;
320 if (remaining <= batch_size) {
321 return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
323 return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
327 simdjson_inline
size_t document_stream::iterator::current_index() const noexcept {
328 return stream->doc_index;
331 simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
332 auto depth = stream->doc.iter.depth();
333 auto cur_struct_index = stream->doc.iter._root - stream->parser->implementation->structural_indexes.get();
336 if (stream->doc.iter.at_root()) {
337 switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
347 auto next_index = stream->parser->implementation->structural_indexes[++cur_struct_index];
349 size_t svlen = next_index - current_index();
350 const char *start =
reinterpret_cast<const char*
>(stream->buf) + current_index();
351 while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] ==
'\0')) {
354 return std::string_view(start, svlen);
360 while (cur_struct_index <=
static_cast<int64_t
>(stream->parser->implementation->n_structural_indexes)) {
361 switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
369 if (depth == 0) {
break; }
373 return std::string_view(
reinterpret_cast<const char*
>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[cur_struct_index] - current_index() + stream->batch_start + 1);;
377 return stream->error;
380 #ifdef SIMDJSON_THREADS_ENABLED
382 inline void document_stream::load_from_stage1_thread() noexcept {
386 std::swap(stage1_thread_parser,*
parser);
387 error = stage1_thread_error;
388 if (error) {
return; }
391 if (next_batch_start() < len) {
392 start_stage1_thread();
396 inline void document_stream::start_stage1_thread() noexcept {
402 size_t _next_batch_start = this->next_batch_start();
404 worker->run(
this, & this->stage1_thread_parser, _next_batch_start);
415 simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
418 implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(error)
421 simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
422 SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value
424 implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(
425 std::forward<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(value)
iterator & operator++() noexcept
Advance to the next document (prefix).
simdjson_inline bool operator!=(const iterator &other) const noexcept
Check if we're at the end yet.
simdjson_inline iterator() noexcept
Default constructor.
error_code error() const noexcept
Returns error of the stream (if any).
simdjson_inline reference operator*() noexcept
Get the current document (or error).
simdjson_inline iterator end() noexcept
The end of the stream, for iterator comparison purposes.
size_t size_in_bytes() const noexcept
Returns the input size in bytes.
size_t truncated_bytes() const noexcept
After iterating through the stream, this method returns the number of bytes that were not parsed at t...
simdjson_inline iterator begin() noexcept
Start iterating the documents in the stream.
simdjson_inline document_stream() noexcept
Construct an uninitialized document_stream.
json_iterator iter
Current position in the document.
A JSON fragment iterator.
The top level simdjson namespace, containing everything the library provides.
error_code
All possible errors returned by simdjson.
@ CAPACITY
This parser can't support a document that big.
@ EMPTY
no structural element found
@ MEMALLOC
Error allocating memory, most likely out of memory.
@ UNINITIALIZED
unknown error, or uninitialized document
The result of a simdjson operation that could fail.