simdjson 4.2.1
Ridiculously Fast JSON
Loading...
Searching...
No Matches
document_stream-inl.h
1#ifndef SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
2
3#ifndef SIMDJSON_CONDITIONAL_INCLUDE
4#define SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
5#include "simdjson/generic/ondemand/base.h"
6#include "simdjson/generic/ondemand/document_stream.h"
7#include "simdjson/generic/ondemand/document-inl.h"
8#include "simdjson/generic/implementation_simdjson_result_base-inl.h"
9#endif // SIMDJSON_CONDITIONAL_INCLUDE
10
11#include <algorithm>
12#include <stdexcept>
13
14namespace simdjson {
15namespace SIMDJSON_IMPLEMENTATION {
16namespace ondemand {
17
18#ifdef SIMDJSON_THREADS_ENABLED
19
20inline void stage1_worker::finish() {
21 // After calling "run" someone would call finish() to wait
22 // for the end of the processing.
23 // This function will wait until either the thread has done
24 // the processing or, else, the destructor has been called.
25 std::unique_lock<std::mutex> lock(locking_mutex);
26 cond_var.wait(lock, [this]{return has_work == false;});
27}
28
29inline stage1_worker::~stage1_worker() {
30 // The thread may never outlive the stage1_worker instance
31 // and will always be stopped/joined before the stage1_worker
32 // instance is gone.
33 stop_thread();
34}
35
36inline void stage1_worker::start_thread() {
37 std::unique_lock<std::mutex> lock(locking_mutex);
38 if(thread.joinable()) {
39 return; // This should never happen but we never want to create more than one thread.
40 }
41 thread = std::thread([this]{
42 while(true) {
43 std::unique_lock<std::mutex> thread_lock(locking_mutex);
44 // We wait for either "run" or "stop_thread" to be called.
45 cond_var.wait(thread_lock, [this]{return has_work || !can_work;});
46 // If, for some reason, the stop_thread() method was called (i.e., the
47 // destructor of stage1_worker is called, then we want to immediately destroy
48 // the thread (and not do any more processing).
49 if(!can_work) {
50 break;
51 }
52 this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
53 this->_next_batch_start);
54 this->has_work = false;
55 // The condition variable call should be moved after thread_lock.unlock() for performance
56 // reasons but thread sanitizers may report it as a data race if we do.
57 // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
58 cond_var.notify_one(); // will notify "finish"
59 thread_lock.unlock();
60 }
61 }
62 );
63}
64
65
66inline void stage1_worker::stop_thread() {
67 std::unique_lock<std::mutex> lock(locking_mutex);
68 // We have to make sure that all locks can be released.
69 can_work = false;
70 has_work = false;
71 cond_var.notify_all();
72 lock.unlock();
73 if(thread.joinable()) {
74 thread.join();
75 }
76}
77
78inline void stage1_worker::run(document_stream * ds, parser * stage1, size_t next_batch_start) {
79 std::unique_lock<std::mutex> lock(locking_mutex);
80 owner = ds;
81 _next_batch_start = next_batch_start;
82 stage1_thread_parser = stage1;
83 has_work = true;
84 // The condition variable call should be moved after thread_lock.unlock() for performance
85 // reasons but thread sanitizers may report it as a data race if we do.
86 // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
87 cond_var.notify_one(); // will notify the thread lock that we have work
88 lock.unlock();
89}
90
91#endif // SIMDJSON_THREADS_ENABLED
92
94 ondemand::parser &_parser,
95 const uint8_t *_buf,
96 size_t _len,
97 size_t _batch_size,
98 bool _allow_comma_separated
99) noexcept
100 : parser{&_parser},
101 buf{_buf},
102 len{_len},
103 batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
104 allow_comma_separated{_allow_comma_separated},
105 error{SUCCESS}
106 #ifdef SIMDJSON_THREADS_ENABLED
107 , use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change
108 #endif
109{
110#ifdef SIMDJSON_THREADS_ENABLED
111 if(worker.get() == nullptr) {
112 error = MEMALLOC;
113 }
114#endif
115}
116
117simdjson_inline document_stream::document_stream() noexcept
118 : parser{nullptr},
119 buf{nullptr},
120 len{0},
121 batch_size{0},
122 allow_comma_separated{false},
123 error{UNINITIALIZED}
124 #ifdef SIMDJSON_THREADS_ENABLED
125 , use_thread(false)
126 #endif
127{
128}
129
130simdjson_inline document_stream::~document_stream() noexcept
131{
132 #ifdef SIMDJSON_THREADS_ENABLED
133 worker.reset();
134 #endif
135}
136
137inline size_t document_stream::size_in_bytes() const noexcept {
138 return len;
139}
140
141inline size_t document_stream::truncated_bytes() const noexcept {
142 if(error == CAPACITY) { return len - batch_start; }
143 return parser->implementation->structural_indexes[parser->implementation->n_structural_indexes] - parser->implementation->structural_indexes[parser->implementation->n_structural_indexes + 1];
144}
145
146simdjson_inline document_stream::iterator::iterator() noexcept
147 : stream{nullptr}, finished{true} {
148}
149
150simdjson_inline document_stream::iterator::iterator(document_stream* _stream, bool is_end) noexcept
151 : stream{_stream}, finished{is_end} {
152}
153
157
159 // If there is an error, then we want the iterator
160 // to be finished, no matter what. (E.g., we do not
161 // keep generating documents with errors, or go beyond
162 // a document with errors.)
163 //
164 // Users do not have to call "operator*()" when they use operator++,
165 // so we need to end the stream in the operator++ function.
166 //
167 // Note that setting finished = true is essential otherwise
168 // we would enter an infinite loop.
169 if (stream->error) { finished = true; }
170 // Note that stream->error() is guarded against error conditions
171 // (it will immediately return if stream->error casts to false).
172 // In effect, this next function does nothing when (stream->error)
173 // is true (hence the risk of an infinite loop).
174 stream->next();
175 // If that was the last document, we're finished.
176 // It is the only type of error we do not want to appear
177 // in operator*.
178 if (stream->error == EMPTY) { finished = true; }
179 // If we had any other kind of error (not EMPTY) then we want
180 // to pass it along to the operator* and we cannot mark the result
181 // as "finished" just yet.
182 return *this;
183}
184
185simdjson_inline bool document_stream::iterator::at_end() const noexcept {
186 return finished;
187}
188
189
190simdjson_inline bool document_stream::iterator::operator!=(const document_stream::iterator &other) const noexcept {
191 return finished != other.finished;
192}
193
194simdjson_inline bool document_stream::iterator::operator==(const document_stream::iterator &other) const noexcept {
195 return finished == other.finished;
196}
197
199 start();
200 // If there are no documents, we're finished.
201 return iterator(this, error == EMPTY);
202}
203
205 return iterator(this, true);
206}
207
208inline void document_stream::start() noexcept {
209 if (error) { return; }
210 error = parser->allocate(batch_size);
211 if (error) { return; }
212 // Always run the first stage 1 parse immediately
213 batch_start = 0;
214 error = run_stage1(*parser, batch_start);
215 while(error == EMPTY) {
216 // In exceptional cases, we may start with an empty block
217 batch_start = next_batch_start();
218 if (batch_start >= len) { return; }
219 error = run_stage1(*parser, batch_start);
220 }
221 if (error) { return; }
222 doc_index = batch_start;
223 doc = document(json_iterator(&buf[batch_start], parser));
224 doc.iter._streaming = true;
225
226 #ifdef SIMDJSON_THREADS_ENABLED
227 if (use_thread && next_batch_start() < len) {
228 // Kick off the first thread on next batch if needed
229 error = stage1_thread_parser.allocate(batch_size);
230 if (error) { return; }
231 worker->start_thread();
232 start_stage1_thread();
233 if (error) { return; }
234 }
235 #endif // SIMDJSON_THREADS_ENABLED
236}
237
238inline void document_stream::next() noexcept {
239 // We always enter at once once in an error condition.
240 if (error) { return; }
241 next_document();
242 if (error) { return; }
243 auto cur_struct_index = doc.iter._root - parser->implementation->structural_indexes.get();
244 doc_index = batch_start + parser->implementation->structural_indexes[cur_struct_index];
245
246 // Check if at end of structural indexes (i.e. at end of batch)
247 if(cur_struct_index >= static_cast<int64_t>(parser->implementation->n_structural_indexes)) {
248 error = EMPTY;
249 // Load another batch (if available)
250 while (error == EMPTY) {
251 batch_start = next_batch_start();
252 if (batch_start >= len) { break; }
253 #ifdef SIMDJSON_THREADS_ENABLED
254 if(use_thread) {
255 load_from_stage1_thread();
256 } else {
257 error = run_stage1(*parser, batch_start);
258 }
259 #else
260 error = run_stage1(*parser, batch_start);
261 #endif
296 doc.iter = json_iterator(&buf[batch_start], parser);
297 doc.iter._streaming = true;
302 if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
303 doc_index = batch_start;
304 }
305 }
306}
307
308inline void document_stream::next_document() noexcept {
309 // Go to next place where depth=0 (document depth)
310 error = doc.iter.skip_child(0);
311 if (error) { return; }
312 // Always set depth=1 at the start of document
313 doc.iter._depth = 1;
314 // consume comma if comma separated is allowed
315 if (allow_comma_separated) {
316 error_code ignored = doc.iter.consume_character(',');
317 static_cast<void>(ignored); // ignored on purpose
318 }
319 // Resets the string buffer at the beginning, thus invalidating the strings.
320 doc.iter._string_buf_loc = parser->string_buf.get();
321 doc.iter._root = doc.iter.position();
322}
323
324inline size_t document_stream::next_batch_start() const noexcept {
325 return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
326}
327
328inline error_code document_stream::run_stage1(ondemand::parser &p, size_t _batch_start) noexcept {
329 // This code only updates the structural index in the parser, it does not update any json_iterator
330 // instance.
331 size_t remaining = len - _batch_start;
332 if (remaining <= batch_size) {
333 return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
334 } else {
335 return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
336 }
337}
338
339simdjson_inline size_t document_stream::iterator::current_index() const noexcept {
340 return stream->doc_index;
341}
342
343simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
344 auto depth = stream->doc.iter.depth();
345 auto cur_struct_index = stream->doc.iter._root - stream->parser->implementation->structural_indexes.get();
346
347 // If at root, process the first token to determine if scalar value
348 if (stream->doc.iter.at_root()) {
349 switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
350 case '{': case '[': // Depth=1 already at start of document
351 break;
352 case '}': case ']':
353 depth--;
354 break;
355 default: // Scalar value document
356 // TODO: We could remove trailing whitespaces
357 // This returns a string spanning from start of value to the beginning of the next document (excluded)
358 {
359 auto next_index = stream->parser->implementation->structural_indexes[++cur_struct_index];
360 // normally the length would be next_index - current_index() - 1, except for the last document
361 size_t svlen = next_index - current_index();
362 const char *start = reinterpret_cast<const char*>(stream->buf) + current_index();
363 while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] == '\0')) {
364 svlen--;
365 }
366 return std::string_view(start, svlen);
367 }
368 }
369 cur_struct_index++;
370 }
371
372 while (cur_struct_index <= static_cast<int64_t>(stream->parser->implementation->n_structural_indexes)) {
373 switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
374 case '{': case '[':
375 depth++;
376 break;
377 case '}': case ']':
378 depth--;
379 break;
380 }
381 if (depth == 0) { break; }
382 cur_struct_index++;
383 }
384
385 return std::string_view(reinterpret_cast<const char*>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[cur_struct_index] - current_index() + stream->batch_start + 1);;
386}
387
389 return stream->error;
390}
391
392#ifdef SIMDJSON_THREADS_ENABLED
393
394inline void document_stream::load_from_stage1_thread() noexcept {
395 worker->finish();
396 // Swap to the parser that was loaded up in the thread. Make sure the parser has
397 // enough memory to swap to, as well.
398 std::swap(stage1_thread_parser,*parser);
399 error = stage1_thread_error;
400 if (error) { return; }
401
402 // If there's anything left, start the stage 1 thread!
403 if (next_batch_start() < len) {
404 start_stage1_thread();
405 }
406}
407
408inline void document_stream::start_stage1_thread() noexcept {
409 // we call the thread on a lambda that will update
410 // this->stage1_thread_error
411 // there is only one thread that may write to this value
412 // TODO this is NOT exception-safe.
413 this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
414 size_t _next_batch_start = this->next_batch_start();
415
416 worker->run(this, & this->stage1_thread_parser, _next_batch_start);
417}
418
419#endif // SIMDJSON_THREADS_ENABLED
420
421} // namespace ondemand
422} // namespace SIMDJSON_IMPLEMENTATION
423} // namespace simdjson
424
425namespace simdjson {
426
427simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
428 error_code error
429) noexcept :
430 implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(error)
431{
432}
433simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
434 SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value
435) noexcept :
436 implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(
437 std::forward<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(value)
438 )
439{
440}
441
442}
443
444#endif // SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
iterator & operator++() noexcept
Advance to the next document (prefix).
simdjson_inline bool operator!=(const iterator &other) const noexcept
Check if we're at the end yet.
error_code error() const noexcept
Returns error of the stream (if any).
simdjson_inline reference operator*() noexcept
Get the current document (or error).
bool at_end() const noexcept
Returns whether the iterator is at the end.
simdjson_inline iterator end() noexcept
The end of the stream, for iterator comparison purposes.
size_t size_in_bytes() const noexcept
Returns the input size in bytes.
size_t truncated_bytes() const noexcept
After iterating through the stream, this method returns the number of bytes that were not parsed at t...
simdjson_inline iterator begin() noexcept
Start iterating the documents in the stream.
simdjson_inline document_stream() noexcept
Construct an uninitialized document_stream.
json_iterator iter
Current position in the document.
Definition document.h:793
The top level simdjson namespace, containing everything the library provides.
Definition base.h:8
error_code
All possible errors returned by simdjson.
Definition error.h:19
@ CAPACITY
This parser can't support a document that big.
Definition error.h:21
@ EMPTY
no structural element found
Definition error.h:33
@ MEMALLOC
Error allocating memory, most likely out of memory.
Definition error.h:22
@ SUCCESS
No error.
Definition error.h:20
@ UNINITIALIZED
unknown error, or uninitialized document
Definition error.h:32
The result of a simdjson operation that could fail.
Definition error.h:278