1 #ifndef SIMDJSON_DOCUMENT_STREAM_INL_H
2 #define SIMDJSON_DOCUMENT_STREAM_INL_H
4 #include "simdjson/dom/base.h"
5 #include "simdjson/dom/document_stream.h"
6 #include "simdjson/dom/element-inl.h"
7 #include "simdjson/dom/parser-inl.h"
8 #include "simdjson/error-inl.h"
9 #include "simdjson/internal/dom_parser_implementation.h"
14 #ifdef SIMDJSON_THREADS_ENABLED
16 inline void stage1_worker::finish() {
21 std::unique_lock<std::mutex> lock(locking_mutex);
22 cond_var.wait(lock, [
this]{
return has_work ==
false;});
25 inline stage1_worker::~stage1_worker() {
32 inline void stage1_worker::start_thread() {
33 std::unique_lock<std::mutex> lock(locking_mutex);
34 if(thread.joinable()) {
37 thread = std::thread([
this]{
39 std::unique_lock<std::mutex> thread_lock(locking_mutex);
41 cond_var.wait(thread_lock, [
this]{
return has_work || !can_work;});
48 this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
49 this->_next_batch_start);
50 this->has_work =
false;
54 cond_var.notify_one();
62 inline void stage1_worker::stop_thread() {
63 std::unique_lock<std::mutex> lock(locking_mutex);
67 cond_var.notify_all();
69 if(thread.joinable()) {
74 inline void stage1_worker::run(document_stream * ds, dom::parser * stage1,
size_t next_batch_start) {
75 std::unique_lock<std::mutex> lock(locking_mutex);
77 _next_batch_start = next_batch_start;
78 stage1_thread_parser = stage1;
83 cond_var.notify_one();
97 batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
99 #ifdef SIMDJSON_THREADS_ENABLED
100 , use_thread(_parser.threaded)
103 #ifdef SIMDJSON_THREADS_ENABLED
104 if(worker.get() ==
nullptr) {
116 #ifdef SIMDJSON_THREADS_ENABLED
122 simdjson_inline document_stream::~document_stream() noexcept {
123 #ifdef SIMDJSON_THREADS_ENABLED
129 : stream{
nullptr}, finished{
true} {
143 : stream{_stream}, finished{is_end} {
152 if (stream->error) {
return stream->
error; }
153 return stream->parser->doc.root();
167 if (stream->error) { finished =
true; }
176 if (stream->error ==
EMPTY) { finished =
true; }
184 return finished != other.finished;
187 inline void document_stream::start() noexcept {
188 if (error) {
return; }
189 error =
parser->ensure_capacity(batch_size);
190 if (error) {
return; }
193 error = run_stage1(*parser, batch_start);
194 while(error ==
EMPTY) {
196 batch_start = next_batch_start();
197 if (batch_start >= len) {
return; }
198 error = run_stage1(*parser, batch_start);
200 if (error) {
return; }
201 #ifdef SIMDJSON_THREADS_ENABLED
202 if (use_thread && next_batch_start() < len) {
204 error = stage1_thread_parser.ensure_capacity(batch_size);
205 if (error) {
return; }
206 worker->start_thread();
207 start_stage1_thread();
208 if (error) {
return; }
214 simdjson_inline
size_t document_stream::iterator::current_index() const noexcept {
215 return stream->doc_index;
218 simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
219 const char* start =
reinterpret_cast<const char*
>(stream->buf) + current_index();
220 bool object_or_array = ((*start ==
'[') || (*start ==
'{'));
221 if(object_or_array) {
222 size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index - 1];
223 return std::string_view(start, next_doc_index - current_index() + 1);
225 size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index];
226 size_t svlen = next_doc_index - current_index();
227 while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] ==
'\0')) {
230 return std::string_view(start, svlen);
235 inline void document_stream::next() noexcept {
237 if (error) {
return; }
240 doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
241 error = parser->implementation->stage2_next(parser->doc);
243 while (error ==
EMPTY) {
244 batch_start = next_batch_start();
245 if (batch_start >= len) {
break; }
247 #ifdef SIMDJSON_THREADS_ENABLED
249 load_from_stage1_thread();
251 error = run_stage1(*parser, batch_start);
254 error = run_stage1(*parser, batch_start);
256 if (error) {
continue; }
258 doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
259 error = parser->implementation->stage2_next(parser->doc);
267 if(error ==
CAPACITY) {
return len - batch_start; }
268 return parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes] -
parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes + 1];
271 inline size_t document_stream::next_batch_start() const noexcept {
272 return batch_start +
parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes];
276 size_t remaining = len - _batch_start;
277 if (remaining <= batch_size) {
278 return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
280 return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
284 #ifdef SIMDJSON_THREADS_ENABLED
286 inline void document_stream::load_from_stage1_thread() noexcept {
290 std::swap(*parser, stage1_thread_parser);
291 error = stage1_thread_error;
292 if (error) {
return; }
295 if (next_batch_start() < len) {
296 start_stage1_thread();
300 inline void document_stream::start_stage1_thread() noexcept {
306 size_t _next_batch_start = this->next_batch_start();
308 worker->run(
this, & this->stage1_thread_parser, _next_batch_start);
315 simdjson_inline simdjson_result<dom::document_stream>::simdjson_result() noexcept
316 : simdjson_result_base() {
318 simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(
error_code error) noexcept
319 : simdjson_result_base(error) {
321 simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(dom::document_stream &&value) noexcept
322 : simdjson_result_base(std::forward<dom::document_stream>(value)) {
325 #if SIMDJSON_EXCEPTIONS
326 simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept(false) {
327 if (error()) {
throw simdjson_error(error()); }
328 return first.begin();
330 simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept(false) {
331 if (error()) {
throw simdjson_error(error()); }
335 #ifndef SIMDJSON_DISABLE_DEPRECATED_API
336 simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept {
337 first.error = error();
338 return first.begin();
340 simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept {
341 first.error = error();
An iterator through a forward-only stream of documents.
simdjson_inline reference operator*() noexcept
Get the current document (or error).
simdjson_inline bool operator!=(const iterator &other) const noexcept
Check if we're at the end yet.
simdjson_inline iterator() noexcept
Default constructor.
iterator & operator++() noexcept
Advance to the next document (prefix).
A forward-only stream of documents.
size_t size_in_bytes() const noexcept
Returns the input size in bytes.
size_t truncated_bytes() const noexcept
After iterating through the stream, this method returns the number of bytes that were not parsed at t...
simdjson_inline iterator begin() noexcept
Start iterating the documents in the stream.
simdjson_inline iterator end() noexcept
The end of the stream, for iterator comparison purposes.
simdjson_inline document_stream() noexcept
Construct an uninitialized document_stream.
A persistent document parser.
The top level simdjson namespace, containing everything the library provides.
error_code
All possible errors returned by simdjson.
@ CAPACITY
This parser can't support a document that big.
@ EMPTY
no structural element found
@ MEMALLOC
Error allocating memory, most likely out of memory.
@ UNINITIALIZED
unknown error, or uninitialized document
The result of a simdjson operation that could fail.
simdjson_inline error_code error() const noexcept
The error.