1#ifndef SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H 
    3#ifndef SIMDJSON_CONDITIONAL_INCLUDE 
    4#define SIMDJSON_GENERIC_ONDEMAND_DOCUMENT_STREAM_INL_H 
    5#include "simdjson/generic/ondemand/base.h" 
    6#include "simdjson/generic/ondemand/document_stream.h" 
    7#include "simdjson/generic/ondemand/document-inl.h" 
    8#include "simdjson/generic/implementation_simdjson_result_base-inl.h" 
   15namespace SIMDJSON_IMPLEMENTATION {
 
   18#ifdef SIMDJSON_THREADS_ENABLED 
   20inline void stage1_worker::finish() {
 
   25  std::unique_lock<std::mutex> lock(locking_mutex);
 
   26  cond_var.wait(lock, [
this]{
return has_work == 
false;});
 
   29inline stage1_worker::~stage1_worker() {
 
   36inline void stage1_worker::start_thread() {
 
   37  std::unique_lock<std::mutex> lock(locking_mutex);
 
   38  if(thread.joinable()) {
 
   41  thread = std::thread([
this]{
 
   43        std::unique_lock<std::mutex> thread_lock(locking_mutex);
 
   45        cond_var.wait(thread_lock, [
this]{
return has_work || !can_work;});
 
   52        this->owner->stage1_thread_error = this->owner->run_stage1(*this->stage1_thread_parser,
 
   53              this->_next_batch_start);
 
   54        this->has_work = 
false;
 
   58        cond_var.notify_one(); 
 
   66inline void stage1_worker::stop_thread() {
 
   67  std::unique_lock<std::mutex> lock(locking_mutex);
 
   71  cond_var.notify_all();
 
   73  if(thread.joinable()) {
 
   78inline void stage1_worker::run(document_stream * ds, parser * stage1, 
size_t next_batch_start) {
 
   79  std::unique_lock<std::mutex> lock(locking_mutex);
 
   81  _next_batch_start = next_batch_start;
 
   82  stage1_thread_parser = stage1;
 
   87  cond_var.notify_one(); 
 
   94  ondemand::parser &_parser,
 
   98  bool _allow_comma_separated
 
  103    batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
 
  104    allow_comma_separated{_allow_comma_separated},
 
  106    #ifdef SIMDJSON_THREADS_ENABLED 
  107    , use_thread(_parser.threaded) 
 
  110#ifdef SIMDJSON_THREADS_ENABLED 
  111  if(worker.get() == 
nullptr) {
 
  122    allow_comma_separated{
false},
 
  124    #ifdef SIMDJSON_THREADS_ENABLED 
 
  130simdjson_inline document_stream::~document_stream() noexcept
 
  132  #ifdef SIMDJSON_THREADS_ENABLED 
  142  if(error == 
CAPACITY) { 
return len - batch_start; }
 
  143  return parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes] - 
parser->implementation->structural_indexes[
parser->implementation->n_structural_indexes + 1];
 
 
  147  : stream{
nullptr}, finished{
true} {
 
 
  151  : stream{_stream}, finished{is_end} {
 
  169  if (stream->error) { finished = 
true; }
 
  178  if (stream->error == 
EMPTY) { finished = 
true; }
 
 
  191  return finished != other.finished;
 
 
  195  return finished == other.finished;
 
  208inline void document_stream::start() noexcept {
 
  209  if (error) { 
return; }
 
  210  error = parser->allocate(batch_size);
 
  211  if (error) { 
return; }
 
  214  error = run_stage1(*parser, batch_start);
 
  215  while(error == 
EMPTY) {
 
  217    batch_start = next_batch_start();
 
  218    if (batch_start >= len) { 
return; }
 
  219    error = run_stage1(*parser, batch_start);
 
  221  if (error) { 
return; }
 
  222  doc_index = batch_start;
 
  223  doc = document(json_iterator(&buf[batch_start], parser));
 
  224  doc.
iter._streaming = 
true;
 
  226  #ifdef SIMDJSON_THREADS_ENABLED 
  227  if (use_thread && next_batch_start() < len) {
 
  229    error = stage1_thread_parser.allocate(batch_size);
 
  230    if (error) { 
return; }
 
  231    worker->start_thread();
 
  232    start_stage1_thread();
 
  233    if (error) { 
return; }
 
  238inline void document_stream::next() noexcept {
 
  240  if (error) { 
return; }
 
  242  if (error) { 
return; }
 
  243  auto cur_struct_index = doc.
iter._root - parser->implementation->structural_indexes.get();
 
  244  doc_index = batch_start + parser->implementation->structural_indexes[cur_struct_index];
 
  247  if(cur_struct_index >= 
static_cast<int64_t
>(parser->implementation->n_structural_indexes)) {
 
  250    while (error == 
EMPTY) {
 
  251      batch_start = next_batch_start();
 
  252      if (batch_start >= len) { 
break; }
 
  253      #ifdef SIMDJSON_THREADS_ENABLED 
  255        load_from_stage1_thread();
 
  257        error = run_stage1(*parser, batch_start);
 
  260      error = run_stage1(*parser, batch_start);
 
  296      doc.
iter = json_iterator(&buf[batch_start], parser);
 
  297      doc.
iter._streaming = 
true;
 
  302      if (error) { 
continue; } 
 
  303      doc_index = batch_start;
 
  308inline void document_stream::next_document() noexcept {
 
  310  error = doc.
iter.skip_child(0);
 
  311  if (error) { 
return; }
 
  315  if (allow_comma_separated) {
 
  317    static_cast<void>(ignored); 
 
  320  doc.
iter._string_buf_loc = parser->string_buf.get();
 
  321  doc.
iter._root = doc.
iter.position();
 
  324inline size_t document_stream::next_batch_start() const noexcept {
 
  325  return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
 
  328inline error_code document_stream::run_stage1(ondemand::parser &p, 
size_t _batch_start) 
noexcept {
 
  331  size_t remaining = len - _batch_start;
 
  332  if (remaining <= batch_size) {
 
  333    return p.implementation->stage1(&buf[_batch_start], remaining, stage1_mode::streaming_final);
 
  335    return p.implementation->stage1(&buf[_batch_start], batch_size, stage1_mode::streaming_partial);
 
  339simdjson_inline 
size_t document_stream::iterator::current_index() const noexcept {
 
  340  return stream->doc_index;
 
  343simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
 
  344  auto depth = stream->doc.iter.depth();
 
  345  auto cur_struct_index = stream->doc.iter._root - stream->parser->implementation->structural_indexes.get();
 
  348  if (stream->doc.iter.at_root()) {
 
  349    switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
 
  359          auto next_index = stream->parser->implementation->structural_indexes[++cur_struct_index];
 
  361          size_t svlen = next_index - current_index();
 
  362          const char *start = 
reinterpret_cast<const char*
>(stream->buf) + current_index();
 
  363          while(svlen > 1 && (std::isspace(start[svlen-1]) || start[svlen-1] == 
'\0')) {
 
  366          return std::string_view(start, svlen);
 
  372  while (cur_struct_index <= 
static_cast<int64_t
>(stream->parser->implementation->n_structural_indexes)) {
 
  373    switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
 
  381    if (depth == 0) { 
break; }
 
  385  return std::string_view(
reinterpret_cast<const char*
>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[cur_struct_index] - current_index() + stream->batch_start + 1);;
 
  389  return stream->error;
 
 
  392#ifdef SIMDJSON_THREADS_ENABLED 
  394inline void document_stream::load_from_stage1_thread() noexcept {
 
  398  std::swap(stage1_thread_parser,*
parser);
 
  399  error = stage1_thread_error;
 
  400  if (error) { 
return; }
 
  403  if (next_batch_start() < len) {
 
  404    start_stage1_thread();
 
  408inline void document_stream::start_stage1_thread() noexcept {
 
  414  size_t _next_batch_start = this->next_batch_start();
 
  416  worker->run(
this, & this->stage1_thread_parser, _next_batch_start);
 
  427simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
 
  430    implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(error)
 
  433simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
 
  434  SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value
 
  436    implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(
 
  437      std::forward<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(value)
 
iterator & operator++() noexcept
Advance to the next document (prefix).
 
simdjson_inline bool operator!=(const iterator &other) const noexcept
Check if we're at the end yet.
 
simdjson_inline iterator() noexcept
Default constructor.
 
error_code error() const noexcept
Returns error of the stream (if any).
 
simdjson_inline reference operator*() noexcept
Get the current document (or error).
 
bool at_end() const noexcept
Returns whether the iterator is at the end.
 
simdjson_inline iterator end() noexcept
The end of the stream, for iterator comparison purposes.
 
size_t size_in_bytes() const noexcept
Returns the input size in bytes.
 
size_t truncated_bytes() const noexcept
After iterating through the stream, this method returns the number of bytes that were not parsed at t...
 
simdjson_inline iterator begin() noexcept
Start iterating the documents in the stream.
 
simdjson_inline document_stream() noexcept
Construct an uninitialized document_stream.
 
json_iterator iter
Current position in the document.
 
A JSON fragment iterator.
 
The top level simdjson namespace, containing everything the library provides.
 
error_code
All possible errors returned by simdjson.
 
@ CAPACITY
This parser can't support a document that big.
 
@ EMPTY
no structural element found
 
@ MEMALLOC
Error allocating memory, most likely out of memory.
 
@ UNINITIALIZED
unknown error, or uninitialized document
 
The result of a simdjson operation that could fail.