simdjson 4.0.7
Ridiculously Fast JSON
Loading...
Searching...
No Matches
document_stream-inl.h
1#ifndef SIMDJSON_DOCUMENT_STREAM_INL_H
2#define SIMDJSON_DOCUMENT_STREAM_INL_H
3
4#include "simdjson/dom/base.h"
5#include "simdjson/dom/document_stream.h"
6#include "simdjson/dom/element-inl.h"
7#include "simdjson/dom/parser-inl.h"
8#include "simdjson/error-inl.h"
9#include "simdjson/internal/dom_parser_implementation.h"
10
11namespace simdjson {
12namespace dom {
13
14#ifdef SIMDJSON_THREADS_ENABLED
15
16inline void stage1_worker::finish() {
17 // After calling "run" someone would call finish() to wait
18 // for the end of the processing.
19 // This function will wait until either the thread has done
20 // the processing or, else, the destructor has been called.
21 std::unique_lock<std::mutex> lock(locking_mutex);
22 cond_var.wait(lock, [this]{return has_work == false;});
23}
24
25inline stage1_worker::~stage1_worker() {
26 // The thread may never outlive the stage1_worker instance
27 // and will always be stopped/joined before the stage1_worker
28 // instance is gone.
30}
31
32inline void stage1_worker::start_thread() {
33 std::unique_lock<std::mutex> lock(locking_mutex);
34 if(thread.joinable()) {
35 return; // This should never happen but we never want to create more than one thread.
36 }
37 thread = std::thread([this]{
38 while(true) {
39 std::unique_lock<std::mutex> thread_lock(locking_mutex);
40 // We wait for either "run" or "stop_thread" to be called.
41 cond_var.wait(thread_lock, [this]{return has_work || !can_work;});
42 // If, for some reason, the stop_thread() method was called (i.e., the
43 // destructor of stage1_worker is called, then we want to immediately destroy
44 // the thread (and not do any more processing).
45 if(!can_work) {
46 break;
47 }
48 this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
49 this->_next_batch_start);
50 this->has_work = false;
51 // The condition variable call should be moved after thread_lock.unlock() for performance
52 // reasons but thread sanitizers may report it as a data race if we do.
53 // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
54 cond_var.notify_one(); // will notify "finish"
55 thread_lock.unlock();
56 }
57 }
58 );
59}
60
61
62inline void stage1_worker::stop_thread() {
63 std::unique_lock<std::mutex> lock(locking_mutex);
64 // We have to make sure that all locks can be released.
65 can_work = false;
66 has_work = false;
67 cond_var.notify_all();
68 lock.unlock();
69 if(thread.joinable()) {
70 thread.join();
71 }
72}
73
74inline void stage1_worker::run(document_stream * ds, dom::parser * stage1, size_t next_batch_start) {
75 std::unique_lock<std::mutex> lock(locking_mutex);
76 owner = ds;
77 _next_batch_start = next_batch_start;
78 stage1_thread_parser = stage1;
79 has_work = true;
80 // The condition variable call should be moved after thread_lock.unlock() for performance
81 // reasons but thread sanitizers may report it as a data race if we do.
82 // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
83 cond_var.notify_one(); // will notify the thread lock that we have work
84 lock.unlock();
85}
86#endif
87
89 dom::parser &_parser,
90 const uint8_t *_buf,
91 size_t _len,
92 size_t _batch_size
93) noexcept
94 : parser{&_parser},
95 buf{_buf},
96 len{_len},
97 batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
98 error{SUCCESS}
99#ifdef SIMDJSON_THREADS_ENABLED
100 , use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change
101#endif
102{
103#ifdef SIMDJSON_THREADS_ENABLED
104 if(worker.get() == nullptr) {
105 error = MEMALLOC;
106 }
107#endif
108}
109
111 : parser{nullptr},
112 buf{nullptr},
113 len{0},
114 batch_size{0},
115 error{UNINITIALIZED}
116#ifdef SIMDJSON_THREADS_ENABLED
117 , use_thread(false)
118#endif
119{
120}
121
122simdjson_inline document_stream::~document_stream() noexcept {
123#ifdef SIMDJSON_THREADS_ENABLED
124 worker.reset();
125#endif
126}
127
129 : stream{nullptr}, finished{true} {
130}
131
133 start();
134 // If there are no documents, we're finished.
135 return iterator(this, error == EMPTY);
136}
137
139 return iterator(this, true);
140}
141
143 : stream{_stream}, finished{is_end} {
144}
145
147 // Note that in case of error, we do not yet mark
148 // the iterator as "finished": this detection is done
149 // in the operator++ function since it is possible
150 // to call operator++ repeatedly while omitting
151 // calls to operator*.
152 if (stream->error) { return stream->error; }
153 return stream->parser->doc.root();
154}
155
157 // If there is an error, then we want the iterator
158 // to be finished, no matter what. (E.g., we do not
159 // keep generating documents with errors, or go beyond
160 // a document with errors.)
161 //
162 // Users do not have to call "operator*()" when they use operator++,
163 // so we need to end the stream in the operator++ function.
164 //
165 // Note that setting finished = true is essential otherwise
166 // we would enter an infinite loop.
167 if (stream->error) { finished = true; }
168 // Note that stream->error() is guarded against error conditions
169 // (it will immediately return if stream->error casts to false).
170 // In effect, this next function does nothing when (stream->error)
171 // is true (hence the risk of an infinite loop).
172 stream->next();
173 // If that was the last document, we're finished.
174 // It is the only type of error we do not want to appear
175 // in operator*.
176 if (stream->error == EMPTY) { finished = true; }
177 // If we had any other kind of error (not EMPTY) then we want
178 // to pass it along to the operator* and we cannot mark the result
179 // as "finished" just yet.
180 return *this;
181}
182
183simdjson_inline bool document_stream::iterator::operator!=(const document_stream::iterator &other) const noexcept {
184 return finished != other.finished;
185}
186
187inline void document_stream::start() noexcept {
188 if (error) { return; }
189 error = parser->ensure_capacity(batch_size);
190 if (error) { return; }
191 // Always run the first stage 1 parse immediately
192 batch_start = 0;
193 error = run_stage1(*parser, batch_start);
194 while(error == EMPTY) {
195 // In exceptional cases, we may start with an empty block
196 batch_start = next_batch_start();
197 if (batch_start >= len) { return; }
198 error = run_stage1(*parser, batch_start);
199 }
200 if (error) { return; }
201#ifdef SIMDJSON_THREADS_ENABLED
202 if (use_thread && next_batch_start() < len) {
203 // Kick off the first thread if needed
204 error = stage1_thread_parser.ensure_capacity(batch_size);
205 if (error) { return; }
206 worker->start_thread();
207 start_stage1_thread();
208 if (error) { return; }
209 }
210#endif // SIMDJSON_THREADS_ENABLED
211 next();
212}
213
214simdjson_inline size_t document_stream::iterator::current_index() const noexcept {
215 return stream->doc_index;
216}
217
218simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
219 const char* start = reinterpret_cast<const char*>(stream->buf) + current_index();
220 bool object_or_array = ((*start == '[') || (*start == '{'));
221 if(object_or_array) {
222 size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index - 1];
223 return std::string_view(start, next_doc_index - current_index() + 1);
224 } else {
225 size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index];
226 size_t svlen = next_doc_index - current_index();
227 while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] == '\0')) {
228 svlen--;
229 }
230 return std::string_view(start, svlen);
231 }
232}
233
234
235inline void document_stream::next() noexcept {
236 // We always exit at once, once in an error condition.
237 if (error) { return; }
238
239 // Load the next document from the batch
240 doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
241 error = parser->implementation->stage2_next(parser->doc);
242 // If that was the last document in the batch, load another batch (if available)
243 while (error == EMPTY) {
244 batch_start = next_batch_start();
245 if (batch_start >= len) { break; }
246
247#ifdef SIMDJSON_THREADS_ENABLED
248 if(use_thread) {
249 load_from_stage1_thread();
250 } else {
251 error = run_stage1(*parser, batch_start);
252 }
253#else
254 error = run_stage1(*parser, batch_start);
255#endif
256 if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
257 // Run stage 2 on the first document in the batch
258 doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
259 error = parser->implementation->stage2_next(parser->doc);
260 }
261}
263 return len;
264}
265
267 if(error == CAPACITY) { return len - batch_start; }
268 return parser->implementation->structural_indexes[parser->implementation->n_structural_indexes] - parser->implementation->structural_indexes[parser->implementation->n_structural_indexes + 1];
269}
270
271inline size_t document_stream::next_batch_start() const noexcept {
272 return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
273}
274
275inline error_code document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept {
276 size_t remaining = len - _batch_start;
277 if (remaining <= batch_size) {
278 return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
279 } else {
280 return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
281 }
282}
283
284#ifdef SIMDJSON_THREADS_ENABLED
285
286inline void document_stream::load_from_stage1_thread() noexcept {
287 worker->finish();
288 // Swap to the parser that was loaded up in the thread. Make sure the parser has
289 // enough memory to swap to, as well.
290 std::swap(*parser, stage1_thread_parser);
291 error = stage1_thread_error;
292 if (error) { return; }
293
294 // If there's anything left, start the stage 1 thread!
295 if (next_batch_start() < len) {
296 start_stage1_thread();
297 }
298}
299
300inline void document_stream::start_stage1_thread() noexcept {
301 // we call the thread on a lambda that will update
302 // this->stage1_thread_error
303 // there is only one thread that may write to this value
304 // TODO this is NOT exception-safe.
305 this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
306 size_t _next_batch_start = this->next_batch_start();
307
308 worker->run(this, & this->stage1_thread_parser, _next_batch_start);
309}
310
311#endif // SIMDJSON_THREADS_ENABLED
312
313} // namespace dom
314
315simdjson_inline simdjson_result<dom::document_stream>::simdjson_result() noexcept
316 : simdjson_result_base() {
317}
318simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(error_code error) noexcept
319 : simdjson_result_base(error) {
320}
321simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(dom::document_stream &&value) noexcept
322 : simdjson_result_base(std::forward<dom::document_stream>(value)) {
323}
324
325#if SIMDJSON_EXCEPTIONS
326simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept(false) {
327 if (error()) { throw simdjson_error(error()); }
328 return first.begin();
329}
330simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept(false) {
331 if (error()) { throw simdjson_error(error()); }
332 return first.end();
333}
334#else // SIMDJSON_EXCEPTIONS
335#ifndef SIMDJSON_DISABLE_DEPRECATED_API
336simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept {
337 first.error = error();
338 return first.begin();
339}
340simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept {
341 first.error = error();
342 return first.end();
343}
344#endif // SIMDJSON_DISABLE_DEPRECATED_API
345#endif // SIMDJSON_EXCEPTIONS
346
347} // namespace simdjson
348#endif // SIMDJSON_DOCUMENT_STREAM_INL_H
JSON array.
Definition array.h:15
simdjson_inline array() noexcept
Create a new, invalid array.
Definition array-inl.h:76
An iterator through a forward-only stream of documents.
simdjson_inline reference operator*() noexcept
Get the current document (or error).
simdjson_inline bool operator!=(const iterator &other) const noexcept
Check if we're at the end yet.
simdjson_inline iterator() noexcept
Default constructor.
iterator & operator++() noexcept
Advance to the next document (prefix).
A forward-only stream of documents.
size_t size_in_bytes() const noexcept
Returns the input size in bytes.
size_t truncated_bytes() const noexcept
After iterating through the stream, this method returns the number of bytes that were not parsed at t...
simdjson_inline iterator begin() noexcept
Start iterating the documents in the stream.
simdjson_inline iterator end() noexcept
The end of the stream, for iterator comparison purposes.
simdjson_inline document_stream() noexcept
Construct an uninitialized document_stream.
A persistent document parser.
Definition parser.h:30
The top level simdjson namespace, containing everything the library provides.
Definition base.h:8
error_code
All possible errors returned by simdjson.
Definition error.h:19
@ CAPACITY
This parser can't support a document that big.
Definition error.h:21
@ EMPTY
no structural element found
Definition error.h:33
@ MEMALLOC
Error allocating memory, most likely out of memory.
Definition error.h:22
@ SUCCESS
No error.
Definition error.h:20
@ UNINITIALIZED
unknown error, or uninitialized document
Definition error.h:32
The result of a simdjson operation that could fail.
Definition error.h:278