simdjson  3.11.0
Ridiculously Fast JSON
document_stream-inl.h
1 #ifndef SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
2 
3 #ifndef SIMDJSON_CONDITIONAL_INCLUDE
4 #define SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
5 #include "simdjson/generic/ondemand/base.h"
6 #include "simdjson/generic/ondemand/document_stream.h"
7 #include "simdjson/generic/ondemand/document-inl.h"
8 #include "simdjson/generic/implementation_simdjson_result_base-inl.h"
9 #endif // SIMDJSON_CONDITIONAL_INCLUDE
10 
11 #include <algorithm>
12 #include <stdexcept>
13 
14 namespace simdjson {
15 namespace SIMDJSON_IMPLEMENTATION {
16 namespace ondemand {
17 
18 #ifdef SIMDJSON_THREADS_ENABLED
19 
20 inline void stage1_worker::finish() {
21  // After calling "run" someone would call finish() to wait
22  // for the end of the processing.
23  // This function will wait until either the thread has done
24  // the processing or, else, the destructor has been called.
25  std::unique_lock<std::mutex> lock(locking_mutex);
26  cond_var.wait(lock, [this]{return has_work == false;});
27 }
28 
29 inline stage1_worker::~stage1_worker() {
30  // The thread may never outlive the stage1_worker instance
31  // and will always be stopped/joined before the stage1_worker
32  // instance is gone.
33  stop_thread();
34 }
35 
36 inline void stage1_worker::start_thread() {
37  std::unique_lock<std::mutex> lock(locking_mutex);
38  if(thread.joinable()) {
39  return; // This should never happen but we never want to create more than one thread.
40  }
41  thread = std::thread([this]{
42  while(true) {
43  std::unique_lock<std::mutex> thread_lock(locking_mutex);
44  // We wait for either "run" or "stop_thread" to be called.
45  cond_var.wait(thread_lock, [this]{return has_work || !can_work;});
46  // If, for some reason, the stop_thread() method was called (i.e., the
47  // destructor of stage1_worker is called, then we want to immediately destroy
48  // the thread (and not do any more processing).
49  if(!can_work) {
50  break;
51  }
52  this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
53  this->_next_batch_start);
54  this->has_work = false;
55  // The condition variable call should be moved after thread_lock.unlock() for performance
56  // reasons but thread sanitizers may report it as a data race if we do.
57  // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
58  cond_var.notify_one(); // will notify "finish"
59  thread_lock.unlock();
60  }
61  }
62  );
63 }
64 
65 
66 inline void stage1_worker::stop_thread() {
67  std::unique_lock<std::mutex> lock(locking_mutex);
68  // We have to make sure that all locks can be released.
69  can_work = false;
70  has_work = false;
71  cond_var.notify_all();
72  lock.unlock();
73  if(thread.joinable()) {
74  thread.join();
75  }
76 }
77 
78 inline void stage1_worker::run(document_stream * ds, parser * stage1, size_t next_batch_start) {
79  std::unique_lock<std::mutex> lock(locking_mutex);
80  owner = ds;
81  _next_batch_start = next_batch_start;
82  stage1_thread_parser = stage1;
83  has_work = true;
84  // The condition variable call should be moved after thread_lock.unlock() for performance
85  // reasons but thread sanitizers may report it as a data race if we do.
86  // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
87  cond_var.notify_one(); // will notify the thread lock that we have work
88  lock.unlock();
89 }
90 
91 #endif // SIMDJSON_THREADS_ENABLED
92 
93 simdjson_inline document_stream::document_stream(
94  ondemand::parser &_parser,
95  const uint8_t *_buf,
96  size_t _len,
97  size_t _batch_size,
98  bool _allow_comma_separated
99 ) noexcept
100  : parser{&_parser},
101  buf{_buf},
102  len{_len},
103  batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
104  allow_comma_separated{_allow_comma_separated},
105  error{SUCCESS}
106  #ifdef SIMDJSON_THREADS_ENABLED
107  , use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change
108  #endif
109 {
110 #ifdef SIMDJSON_THREADS_ENABLED
111  if(worker.get() == nullptr) {
112  error = MEMALLOC;
113  }
114 #endif
115 }
116 
117 simdjson_inline document_stream::document_stream() noexcept
118  : parser{nullptr},
119  buf{nullptr},
120  len{0},
121  batch_size{0},
122  allow_comma_separated{false},
123  error{UNINITIALIZED}
124  #ifdef SIMDJSON_THREADS_ENABLED
125  , use_thread(false)
126  #endif
127 {
128 }
129 
130 simdjson_inline document_stream::~document_stream() noexcept
131 {
132  #ifdef SIMDJSON_THREADS_ENABLED
133  worker.reset();
134  #endif
135 }
136 
137 inline size_t document_stream::size_in_bytes() const noexcept {
138  return len;
139 }
140 
141 inline size_t document_stream::truncated_bytes() const noexcept {
142  if(error == CAPACITY) { return len - batch_start; }
143  return parser->implementation->structural_indexes[parser->implementation->n_structural_indexes] - parser->implementation->structural_indexes[parser->implementation->n_structural_indexes + 1];
144 }
145 
146 simdjson_inline document_stream::iterator::iterator() noexcept
147  : stream{nullptr}, finished{true} {
148 }
149 
150 simdjson_inline document_stream::iterator::iterator(document_stream* _stream, bool is_end) noexcept
151  : stream{_stream}, finished{is_end} {
152 }
153 
155  return simdjson_result<ondemand::document_reference>(stream->doc, stream->error);
156 }
157 
159  // If there is an error, then we want the iterator
160  // to be finished, no matter what. (E.g., we do not
161  // keep generating documents with errors, or go beyond
162  // a document with errors.)
163  //
164  // Users do not have to call "operator*()" when they use operator++,
165  // so we need to end the stream in the operator++ function.
166  //
167  // Note that setting finished = true is essential otherwise
168  // we would enter an infinite loop.
169  if (stream->error) { finished = true; }
170  // Note that stream->error() is guarded against error conditions
171  // (it will immediately return if stream->error casts to false).
172  // In effect, this next function does nothing when (stream->error)
173  // is true (hence the risk of an infinite loop).
174  stream->next();
175  // If that was the last document, we're finished.
176  // It is the only type of error we do not want to appear
177  // in operator*.
178  if (stream->error == EMPTY) { finished = true; }
179  // If we had any other kind of error (not EMPTY) then we want
180  // to pass it along to the operator* and we cannot mark the result
181  // as "finished" just yet.
182  return *this;
183 }
184 
185 simdjson_inline bool document_stream::iterator::operator!=(const document_stream::iterator &other) const noexcept {
186  return finished != other.finished;
187 }
188 
190  start();
191  // If there are no documents, we're finished.
192  return iterator(this, error == EMPTY);
193 }
194 
196  return iterator(this, true);
197 }
198 
199 inline void document_stream::start() noexcept {
200  if (error) { return; }
201  error = parser->allocate(batch_size);
202  if (error) { return; }
203  // Always run the first stage 1 parse immediately
204  batch_start = 0;
205  error = run_stage1(*parser, batch_start);
206  while(error == EMPTY) {
207  // In exceptional cases, we may start with an empty block
208  batch_start = next_batch_start();
209  if (batch_start >= len) { return; }
210  error = run_stage1(*parser, batch_start);
211  }
212  if (error) { return; }
213  doc_index = batch_start;
214  doc = document(json_iterator(&buf[batch_start], parser));
215  doc.iter._streaming = true;
216 
217  #ifdef SIMDJSON_THREADS_ENABLED
218  if (use_thread && next_batch_start() < len) {
219  // Kick off the first thread on next batch if needed
220  error = stage1_thread_parser.allocate(batch_size);
221  if (error) { return; }
222  worker->start_thread();
223  start_stage1_thread();
224  if (error) { return; }
225  }
226  #endif // SIMDJSON_THREADS_ENABLED
227 }
228 
229 inline void document_stream::next() noexcept {
230  // We always enter at once once in an error condition.
231  if (error) { return; }
232  next_document();
233  if (error) { return; }
234  auto cur_struct_index = doc.iter._root - parser->implementation->structural_indexes.get();
235  doc_index = batch_start + parser->implementation->structural_indexes[cur_struct_index];
236 
237  // Check if at end of structural indexes (i.e. at end of batch)
238  if(cur_struct_index >= static_cast<int64_t>(parser->implementation->n_structural_indexes)) {
239  error = EMPTY;
240  // Load another batch (if available)
241  while (error == EMPTY) {
242  batch_start = next_batch_start();
243  if (batch_start >= len) { break; }
244  #ifdef SIMDJSON_THREADS_ENABLED
245  if(use_thread) {
246  load_from_stage1_thread();
247  } else {
248  error = run_stage1(*parser, batch_start);
249  }
250  #else
251  error = run_stage1(*parser, batch_start);
252  #endif
287  doc.iter = json_iterator(&buf[batch_start], parser);
288  doc.iter._streaming = true;
293  if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
294  doc_index = batch_start;
295  }
296  }
297 }
298 
299 inline void document_stream::next_document() noexcept {
300  // Go to next place where depth=0 (document depth)
301  error = doc.iter.skip_child(0);
302  if (error) { return; }
303  // Always set depth=1 at the start of document
304  doc.iter._depth = 1;
305  // consume comma if comma separated is allowed
306  if (allow_comma_separated) { doc.iter.consume_character(','); }
307  // Resets the string buffer at the beginning, thus invalidating the strings.
308  doc.iter._string_buf_loc = parser->string_buf.get();
309  doc.iter._root = doc.iter.position();
310 }
311 
312 inline size_t document_stream::next_batch_start() const noexcept {
313  return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
314 }
315 
316 inline error_code document_stream::run_stage1(ondemand::parser &p, size_t _batch_start) noexcept {
317  // This code only updates the structural index in the parser, it does not update any json_iterator
318  // instance.
319  size_t remaining = len - _batch_start;
320  if (remaining <= batch_size) {
321  return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
322  } else {
323  return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
324  }
325 }
326 
327 simdjson_inline size_t document_stream::iterator::current_index() const noexcept {
328  return stream->doc_index;
329 }
330 
331 simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
332  auto depth = stream->doc.iter.depth();
333  auto cur_struct_index = stream->doc.iter._root - stream->parser->implementation->structural_indexes.get();
334 
335  // If at root, process the first token to determine if scalar value
336  if (stream->doc.iter.at_root()) {
337  switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
338  case '{': case '[': // Depth=1 already at start of document
339  break;
340  case '}': case ']':
341  depth--;
342  break;
343  default: // Scalar value document
344  // TODO: We could remove trailing whitespaces
345  // This returns a string spanning from start of value to the beginning of the next document (excluded)
346  {
347  auto next_index = stream->parser->implementation->structural_indexes[++cur_struct_index];
348  // normally the length would be next_index - current_index() - 1, except for the last document
349  size_t svlen = next_index - current_index();
350  const char *start = reinterpret_cast<const char*>(stream->buf) + current_index();
351  while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] == '\0')) {
352  svlen--;
353  }
354  return std::string_view(start, svlen);
355  }
356  }
357  cur_struct_index++;
358  }
359 
360  while (cur_struct_index <= static_cast<int64_t>(stream->parser->implementation->n_structural_indexes)) {
361  switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
362  case '{': case '[':
363  depth++;
364  break;
365  case '}': case ']':
366  depth--;
367  break;
368  }
369  if (depth == 0) { break; }
370  cur_struct_index++;
371  }
372 
373  return std::string_view(reinterpret_cast<const char*>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[cur_struct_index] - current_index() + stream->batch_start + 1);;
374 }
375 
377  return stream->error;
378 }
379 
380 #ifdef SIMDJSON_THREADS_ENABLED
381 
382 inline void document_stream::load_from_stage1_thread() noexcept {
383  worker->finish();
384  // Swap to the parser that was loaded up in the thread. Make sure the parser has
385  // enough memory to swap to, as well.
386  std::swap(stage1_thread_parser,*parser);
387  error = stage1_thread_error;
388  if (error) { return; }
389 
390  // If there's anything left, start the stage 1 thread!
391  if (next_batch_start() < len) {
392  start_stage1_thread();
393  }
394 }
395 
396 inline void document_stream::start_stage1_thread() noexcept {
397  // we call the thread on a lambda that will update
398  // this->stage1_thread_error
399  // there is only one thread that may write to this value
400  // TODO this is NOT exception-safe.
401  this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
402  size_t _next_batch_start = this->next_batch_start();
403 
404  worker->run(this, & this->stage1_thread_parser, _next_batch_start);
405 }
406 
407 #endif // SIMDJSON_THREADS_ENABLED
408 
409 } // namespace ondemand
410 } // namespace SIMDJSON_IMPLEMENTATION
411 } // namespace simdjson
412 
413 namespace simdjson {
414 
415 simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
416  error_code error
417 ) noexcept :
418  implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(error)
419 {
420 }
421 simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
422  SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value
423 ) noexcept :
424  implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(
425  std::forward<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(value)
426  )
427 {
428 }
429 
430 }
431 
432 #endif // SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H
iterator & operator++() noexcept
Advance to the next document (prefix).
simdjson_inline bool operator!=(const iterator &other) const noexcept
Check if we're at the end yet.
error_code error() const noexcept
Returns error of the stream (if any).
simdjson_inline reference operator*() noexcept
Get the current document (or error).
simdjson_inline iterator end() noexcept
The end of the stream, for iterator comparison purposes.
size_t size_in_bytes() const noexcept
Returns the input size in bytes.
size_t truncated_bytes() const noexcept
After iterating through the stream, this method returns the number of bytes that were not parsed at t...
simdjson_inline iterator begin() noexcept
Start iterating the documents in the stream.
simdjson_inline document_stream() noexcept
Construct an uninitialized document_stream.
json_iterator iter
Current position in the document.
Definition: document.h:722
The top level simdjson namespace, containing everything the library provides.
Definition: base.h:8
error_code
All possible errors returned by simdjson.
Definition: error.h:19
@ CAPACITY
This parser can't support a document that big.
Definition: error.h:21
@ EMPTY
no structural element found
Definition: error.h:33
@ MEMALLOC
Error allocating memory, most likely out of memory.
Definition: error.h:22
@ SUCCESS
No error.
Definition: error.h:20
@ UNINITIALIZED
unknown error, or uninitialized document
Definition: error.h:32
The result of a simdjson operation that could fail.
Definition: error.h:215