simdjson  3.11.0
Ridiculously Fast JSON
document_stream-inl.h
1 #ifndef SIMDJSON_DOCUMENT_STREAM_INL_H
2 #define SIMDJSON_DOCUMENT_STREAM_INL_H
3 
4 #include "simdjson/dom/base.h"
5 #include "simdjson/dom/document_stream.h"
6 #include "simdjson/dom/element-inl.h"
7 #include "simdjson/dom/parser-inl.h"
8 #include "simdjson/error-inl.h"
9 #include "simdjson/internal/dom_parser_implementation.h"
10 
11 namespace simdjson {
12 namespace dom {
13 
14 #ifdef SIMDJSON_THREADS_ENABLED
15 
16 inline void stage1_worker::finish() {
17  // After calling "run" someone would call finish() to wait
18  // for the end of the processing.
19  // This function will wait until either the thread has done
20  // the processing or, else, the destructor has been called.
21  std::unique_lock<std::mutex> lock(locking_mutex);
22  cond_var.wait(lock, [this]{return has_work == false;});
23 }
24 
25 inline stage1_worker::~stage1_worker() {
26  // The thread may never outlive the stage1_worker instance
27  // and will always be stopped/joined before the stage1_worker
28  // instance is gone.
29  stop_thread();
30 }
31 
32 inline void stage1_worker::start_thread() {
33  std::unique_lock<std::mutex> lock(locking_mutex);
34  if(thread.joinable()) {
35  return; // This should never happen but we never want to create more than one thread.
36  }
37  thread = std::thread([this]{
38  while(true) {
39  std::unique_lock<std::mutex> thread_lock(locking_mutex);
40  // We wait for either "run" or "stop_thread" to be called.
41  cond_var.wait(thread_lock, [this]{return has_work || !can_work;});
42  // If, for some reason, the stop_thread() method was called (i.e., the
43  // destructor of stage1_worker is called, then we want to immediately destroy
44  // the thread (and not do any more processing).
45  if(!can_work) {
46  break;
47  }
48  this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
49  this->_next_batch_start);
50  this->has_work = false;
51  // The condition variable call should be moved after thread_lock.unlock() for performance
52  // reasons but thread sanitizers may report it as a data race if we do.
53  // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
54  cond_var.notify_one(); // will notify "finish"
55  thread_lock.unlock();
56  }
57  }
58  );
59 }
60 
61 
62 inline void stage1_worker::stop_thread() {
63  std::unique_lock<std::mutex> lock(locking_mutex);
64  // We have to make sure that all locks can be released.
65  can_work = false;
66  has_work = false;
67  cond_var.notify_all();
68  lock.unlock();
69  if(thread.joinable()) {
70  thread.join();
71  }
72 }
73 
74 inline void stage1_worker::run(document_stream * ds, dom::parser * stage1, size_t next_batch_start) {
75  std::unique_lock<std::mutex> lock(locking_mutex);
76  owner = ds;
77  _next_batch_start = next_batch_start;
78  stage1_thread_parser = stage1;
79  has_work = true;
80  // The condition variable call should be moved after thread_lock.unlock() for performance
81  // reasons but thread sanitizers may report it as a data race if we do.
82  // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
83  cond_var.notify_one(); // will notify the thread lock that we have work
84  lock.unlock();
85 }
86 #endif
87 
88 simdjson_inline document_stream::document_stream(
89  dom::parser &_parser,
90  const uint8_t *_buf,
91  size_t _len,
92  size_t _batch_size
93 ) noexcept
94  : parser{&_parser},
95  buf{_buf},
96  len{_len},
97  batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
98  error{SUCCESS}
99 #ifdef SIMDJSON_THREADS_ENABLED
100  , use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change
101 #endif
102 {
103 #ifdef SIMDJSON_THREADS_ENABLED
104  if(worker.get() == nullptr) {
105  error = MEMALLOC;
106  }
107 #endif
108 }
109 
110 simdjson_inline document_stream::document_stream() noexcept
111  : parser{nullptr},
112  buf{nullptr},
113  len{0},
114  batch_size{0},
115  error{UNINITIALIZED}
116 #ifdef SIMDJSON_THREADS_ENABLED
117  , use_thread(false)
118 #endif
119 {
120 }
121 
122 simdjson_inline document_stream::~document_stream() noexcept {
123 #ifdef SIMDJSON_THREADS_ENABLED
124  worker.reset();
125 #endif
126 }
127 
128 simdjson_inline document_stream::iterator::iterator() noexcept
129  : stream{nullptr}, finished{true} {
130 }
131 
133  start();
134  // If there are no documents, we're finished.
135  return iterator(this, error == EMPTY);
136 }
137 
139  return iterator(this, true);
140 }
141 
142 simdjson_inline document_stream::iterator::iterator(document_stream* _stream, bool is_end) noexcept
143  : stream{_stream}, finished{is_end} {
144 }
145 
147  // Note that in case of error, we do not yet mark
148  // the iterator as "finished": this detection is done
149  // in the operator++ function since it is possible
150  // to call operator++ repeatedly while omitting
151  // calls to operator*.
152  if (stream->error) { return stream->error; }
153  return stream->parser->doc.root();
154 }
155 
157  // If there is an error, then we want the iterator
158  // to be finished, no matter what. (E.g., we do not
159  // keep generating documents with errors, or go beyond
160  // a document with errors.)
161  //
162  // Users do not have to call "operator*()" when they use operator++,
163  // so we need to end the stream in the operator++ function.
164  //
165  // Note that setting finished = true is essential otherwise
166  // we would enter an infinite loop.
167  if (stream->error) { finished = true; }
168  // Note that stream->error() is guarded against error conditions
169  // (it will immediately return if stream->error casts to false).
170  // In effect, this next function does nothing when (stream->error)
171  // is true (hence the risk of an infinite loop).
172  stream->next();
173  // If that was the last document, we're finished.
174  // It is the only type of error we do not want to appear
175  // in operator*.
176  if (stream->error == EMPTY) { finished = true; }
177  // If we had any other kind of error (not EMPTY) then we want
178  // to pass it along to the operator* and we cannot mark the result
179  // as "finished" just yet.
180  return *this;
181 }
182 
183 simdjson_inline bool document_stream::iterator::operator!=(const document_stream::iterator &other) const noexcept {
184  return finished != other.finished;
185 }
186 
187 inline void document_stream::start() noexcept {
188  if (error) { return; }
189  error = parser->ensure_capacity(batch_size);
190  if (error) { return; }
191  // Always run the first stage 1 parse immediately
192  batch_start = 0;
193  error = run_stage1(*parser, batch_start);
194  while(error == EMPTY) {
195  // In exceptional cases, we may start with an empty block
196  batch_start = next_batch_start();
197  if (batch_start >= len) { return; }
198  error = run_stage1(*parser, batch_start);
199  }
200  if (error) { return; }
201 #ifdef SIMDJSON_THREADS_ENABLED
202  if (use_thread && next_batch_start() < len) {
203  // Kick off the first thread if needed
204  error = stage1_thread_parser.ensure_capacity(batch_size);
205  if (error) { return; }
206  worker->start_thread();
207  start_stage1_thread();
208  if (error) { return; }
209  }
210 #endif // SIMDJSON_THREADS_ENABLED
211  next();
212 }
213 
214 simdjson_inline size_t document_stream::iterator::current_index() const noexcept {
215  return stream->doc_index;
216 }
217 
218 simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
219  const char* start = reinterpret_cast<const char*>(stream->buf) + current_index();
220  bool object_or_array = ((*start == '[') || (*start == '{'));
221  if(object_or_array) {
222  size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index - 1];
223  return std::string_view(start, next_doc_index - current_index() + 1);
224  } else {
225  size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index];
226  size_t svlen = next_doc_index - current_index();
227  while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] == '\0')) {
228  svlen--;
229  }
230  return std::string_view(start, svlen);
231  }
232 }
233 
234 
235 inline void document_stream::next() noexcept {
236  // We always exit at once, once in an error condition.
237  if (error) { return; }
238 
239  // Load the next document from the batch
240  doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
241  error = parser->implementation->stage2_next(parser->doc);
242  // If that was the last document in the batch, load another batch (if available)
243  while (error == EMPTY) {
244  batch_start = next_batch_start();
245  if (batch_start >= len) { break; }
246 
247 #ifdef SIMDJSON_THREADS_ENABLED
248  if(use_thread) {
249  load_from_stage1_thread();
250  } else {
251  error = run_stage1(*parser, batch_start);
252  }
253 #else
254  error = run_stage1(*parser, batch_start);
255 #endif
256  if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
257  // Run stage 2 on the first document in the batch
258  doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
259  error = parser->implementation->stage2_next(parser->doc);
260  }
261 }
262 inline size_t document_stream::size_in_bytes() const noexcept {
263  return len;
264 }
265 
266 inline size_t document_stream::truncated_bytes() const noexcept {
267  if(error == CAPACITY) { return len - batch_start; }
268  return parser->implementation->structural_indexes[parser->implementation->n_structural_indexes] - parser->implementation->structural_indexes[parser->implementation->n_structural_indexes + 1];
269 }
270 
271 inline size_t document_stream::next_batch_start() const noexcept {
272  return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
273 }
274 
275 inline error_code document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept {
276  size_t remaining = len - _batch_start;
277  if (remaining <= batch_size) {
278  return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
279  } else {
280  return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
281  }
282 }
283 
284 #ifdef SIMDJSON_THREADS_ENABLED
285 
286 inline void document_stream::load_from_stage1_thread() noexcept {
287  worker->finish();
288  // Swap to the parser that was loaded up in the thread. Make sure the parser has
289  // enough memory to swap to, as well.
290  std::swap(*parser, stage1_thread_parser);
291  error = stage1_thread_error;
292  if (error) { return; }
293 
294  // If there's anything left, start the stage 1 thread!
295  if (next_batch_start() < len) {
296  start_stage1_thread();
297  }
298 }
299 
300 inline void document_stream::start_stage1_thread() noexcept {
301  // we call the thread on a lambda that will update
302  // this->stage1_thread_error
303  // there is only one thread that may write to this value
304  // TODO this is NOT exception-safe.
305  this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
306  size_t _next_batch_start = this->next_batch_start();
307 
308  worker->run(this, & this->stage1_thread_parser, _next_batch_start);
309 }
310 
311 #endif // SIMDJSON_THREADS_ENABLED
312 
313 } // namespace dom
314 
315 simdjson_inline simdjson_result<dom::document_stream>::simdjson_result() noexcept
316  : simdjson_result_base() {
317 }
318 simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(error_code error) noexcept
319  : simdjson_result_base(error) {
320 }
321 simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(dom::document_stream &&value) noexcept
322  : simdjson_result_base(std::forward<dom::document_stream>(value)) {
323 }
324 
325 #if SIMDJSON_EXCEPTIONS
326 simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept(false) {
327  if (error()) { throw simdjson_error(error()); }
328  return first.begin();
329 }
330 simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept(false) {
331  if (error()) { throw simdjson_error(error()); }
332  return first.end();
333 }
334 #else // SIMDJSON_EXCEPTIONS
335 #ifndef SIMDJSON_DISABLE_DEPRECATED_API
336 simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept {
337  first.error = error();
338  return first.begin();
339 }
340 simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept {
341  first.error = error();
342  return first.end();
343 }
344 #endif // SIMDJSON_DISABLE_DEPRECATED_API
345 #endif // SIMDJSON_EXCEPTIONS
346 
347 } // namespace simdjson
348 #endif // SIMDJSON_DOCUMENT_STREAM_INL_H
An iterator through a forward-only stream of documents.
simdjson_inline reference operator*() noexcept
Get the current document (or error).
simdjson_inline bool operator!=(const iterator &other) const noexcept
Check if we're at the end yet.
simdjson_inline iterator() noexcept
Default constructor.
iterator & operator++() noexcept
Advance to the next document (prefix).
A forward-only stream of documents.
size_t size_in_bytes() const noexcept
Returns the input size in bytes.
size_t truncated_bytes() const noexcept
After iterating through the stream, this method returns the number of bytes that were not parsed at t...
simdjson_inline iterator begin() noexcept
Start iterating the documents in the stream.
simdjson_inline iterator end() noexcept
The end of the stream, for iterator comparison purposes.
simdjson_inline document_stream() noexcept
Construct an uninitialized document_stream.
A persistent document parser.
Definition: parser.h:30
The top level simdjson namespace, containing everything the library provides.
Definition: base.h:8
error_code
All possible errors returned by simdjson.
Definition: error.h:19
@ CAPACITY
This parser can't support a document that big.
Definition: error.h:21
@ EMPTY
no structural element found
Definition: error.h:33
@ MEMALLOC
Error allocating memory, most likely out of memory.
Definition: error.h:22
@ SUCCESS
No error.
Definition: error.h:20
@ UNINITIALIZED
unknown error, or uninitialized document
Definition: error.h:32
The result of a simdjson operation that could fail.
Definition: error.h:215
simdjson_inline error_code error() const noexcept
The error.
Definition: error-inl.h:131