mirror of
				https://github.com/MariaDB/server.git
				synced 2025-11-04 12:56:14 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			849 lines
		
	
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			849 lines
		
	
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* -*- c-basic-offset: 2 -*- */
 | 
						|
/*
 | 
						|
  Copyright(C) 2017 Brazil
 | 
						|
 | 
						|
  This library is free software; you can redistribute it and/or
 | 
						|
  modify it under the terms of the GNU Lesser General Public
 | 
						|
  License version 2.1 as published by the Free Software Foundation.
 | 
						|
 | 
						|
  This library is distributed in the hope that it will be useful,
 | 
						|
  but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | 
						|
  Lesser General Public License for more details.
 | 
						|
 | 
						|
  You should have received a copy of the GNU Lesser General Public
 | 
						|
  License along with this library; if not, write to the Free Software
 | 
						|
  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
 | 
						|
*/
 | 
						|
 | 
						|
#include "grn.h"
 | 
						|
#include "grn_db.h"
 | 
						|
 | 
						|
#ifdef GRN_WITH_ARROW
 | 
						|
#include <groonga/arrow.hpp>
 | 
						|
 | 
						|
#include <arrow/api.h>
 | 
						|
#include <arrow/io/file.h>
 | 
						|
#include <arrow/ipc/api.h>
 | 
						|
 | 
						|
#include <sstream>
 | 
						|
 | 
						|
namespace grnarrow {
 | 
						|
  grn_rc status_to_rc(arrow::Status &status) {
 | 
						|
    switch (status.code()) {
 | 
						|
    case arrow::StatusCode::OK:
 | 
						|
      return GRN_SUCCESS;
 | 
						|
    case arrow::StatusCode::OutOfMemory:
 | 
						|
      return GRN_NO_MEMORY_AVAILABLE;
 | 
						|
    case arrow::StatusCode::KeyError:
 | 
						|
      return GRN_INVALID_ARGUMENT; // TODO
 | 
						|
    case arrow::StatusCode::TypeError:
 | 
						|
      return GRN_INVALID_ARGUMENT; // TODO
 | 
						|
    case arrow::StatusCode::Invalid:
 | 
						|
      return GRN_INVALID_ARGUMENT;
 | 
						|
    case arrow::StatusCode::IOError:
 | 
						|
      return GRN_INPUT_OUTPUT_ERROR;
 | 
						|
    case arrow::StatusCode::UnknownError:
 | 
						|
      return GRN_UNKNOWN_ERROR;
 | 
						|
    case arrow::StatusCode::NotImplemented:
 | 
						|
      return GRN_FUNCTION_NOT_IMPLEMENTED;
 | 
						|
    default:
 | 
						|
      return GRN_UNKNOWN_ERROR;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  grn_bool check_status(grn_ctx *ctx,
 | 
						|
                        arrow::Status &status,
 | 
						|
                        const char *context) {
 | 
						|
    if (status.ok()) {
 | 
						|
      return GRN_TRUE;
 | 
						|
    } else {
 | 
						|
      auto rc = status_to_rc(status);
 | 
						|
      auto message = status.ToString();
 | 
						|
      ERR(rc, "%s: %s", context, message.c_str());
 | 
						|
      return GRN_FALSE;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  grn_bool check_status(grn_ctx *ctx,
 | 
						|
                        arrow::Status &status,
 | 
						|
                        std::ostream &output) {
 | 
						|
    return check_status(ctx,
 | 
						|
                        status,
 | 
						|
                        static_cast<std::stringstream &>(output).str().c_str());
 | 
						|
  }
 | 
						|
 | 
						|
  class ColumnLoadVisitor : public arrow::ArrayVisitor {
 | 
						|
  public:
 | 
						|
    ColumnLoadVisitor(grn_ctx *ctx,
 | 
						|
                      grn_obj *grn_table,
 | 
						|
                      std::shared_ptr<arrow::Column> &arrow_column,
 | 
						|
                      const grn_id *ids)
 | 
						|
      : ctx_(ctx),
 | 
						|
        grn_table_(grn_table),
 | 
						|
        ids_(ids),
 | 
						|
        time_unit_(arrow::TimeUnit::SECOND) {
 | 
						|
      auto column_name = arrow_column->name();
 | 
						|
      grn_column_ = grn_obj_column(ctx_, grn_table_,
 | 
						|
                                   column_name.data(),
 | 
						|
                                   column_name.size());
 | 
						|
 | 
						|
      auto arrow_type = arrow_column->type();
 | 
						|
      grn_id type_id;
 | 
						|
      switch (arrow_type->id()) {
 | 
						|
      case arrow::Type::BOOL :
 | 
						|
        type_id = GRN_DB_BOOL;
 | 
						|
        break;
 | 
						|
      case arrow::Type::UINT8 :
 | 
						|
        type_id = GRN_DB_UINT8;
 | 
						|
        break;
 | 
						|
      case arrow::Type::INT8 :
 | 
						|
        type_id = GRN_DB_INT8;
 | 
						|
        break;
 | 
						|
      case arrow::Type::UINT16 :
 | 
						|
        type_id = GRN_DB_UINT16;
 | 
						|
        break;
 | 
						|
      case arrow::Type::INT16 :
 | 
						|
        type_id = GRN_DB_INT16;
 | 
						|
        break;
 | 
						|
      case arrow::Type::UINT32 :
 | 
						|
        type_id = GRN_DB_UINT32;
 | 
						|
        break;
 | 
						|
      case arrow::Type::INT32 :
 | 
						|
        type_id = GRN_DB_INT32;
 | 
						|
        break;
 | 
						|
      case arrow::Type::UINT64 :
 | 
						|
        type_id = GRN_DB_UINT64;
 | 
						|
        break;
 | 
						|
      case arrow::Type::INT64 :
 | 
						|
        type_id = GRN_DB_INT64;
 | 
						|
        break;
 | 
						|
      case arrow::Type::HALF_FLOAT :
 | 
						|
      case arrow::Type::FLOAT :
 | 
						|
      case arrow::Type::DOUBLE :
 | 
						|
        type_id = GRN_DB_FLOAT;
 | 
						|
        break;
 | 
						|
      case arrow::Type::STRING :
 | 
						|
        type_id = GRN_DB_TEXT;
 | 
						|
        break;
 | 
						|
      case arrow::Type::DATE64 :
 | 
						|
        type_id = GRN_DB_TIME;
 | 
						|
        break;
 | 
						|
      case arrow::Type::TIMESTAMP :
 | 
						|
        type_id = GRN_DB_TIME;
 | 
						|
        {
 | 
						|
          auto arrow_timestamp_type =
 | 
						|
            std::static_pointer_cast<arrow::TimestampType>(arrow_type);
 | 
						|
          time_unit_ = arrow_timestamp_type->unit();
 | 
						|
        }
 | 
						|
        break;
 | 
						|
      default :
 | 
						|
        type_id = GRN_DB_VOID;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      if (type_id == GRN_DB_VOID) {
 | 
						|
        // TODO
 | 
						|
        return;
 | 
						|
      }
 | 
						|
 | 
						|
      if (!grn_column_) {
 | 
						|
        grn_column_ = grn_column_create(ctx_,
 | 
						|
                                        grn_table_,
 | 
						|
                                        column_name.data(),
 | 
						|
                                        column_name.size(),
 | 
						|
                                        NULL,
 | 
						|
                                        GRN_OBJ_COLUMN_SCALAR,
 | 
						|
                                        grn_ctx_at(ctx_, type_id));
 | 
						|
      }
 | 
						|
      if (type_id == GRN_DB_TEXT) {
 | 
						|
        GRN_TEXT_INIT(&buffer_, GRN_OBJ_DO_SHALLOW_COPY);
 | 
						|
      } else {
 | 
						|
        GRN_VALUE_FIX_SIZE_INIT(&buffer_, 0, type_id);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    ~ColumnLoadVisitor() {
 | 
						|
      if (grn_obj_is_accessor(ctx_, grn_column_)) {
 | 
						|
        grn_obj_unlink(ctx_, grn_column_);
 | 
						|
      }
 | 
						|
      GRN_OBJ_FIN(ctx_, &buffer_);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::BooleanArray &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::Int8Array &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::UInt8Array &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::Int16Array &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::UInt16Array &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::Int32Array &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::UInt32Array &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::Int64Array &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::UInt64Array &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::HalfFloatArray &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::FloatArray &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::DoubleArray &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::StringArray &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::Date64Array &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status Visit(const arrow::TimestampArray &array) {
 | 
						|
      return set_values(array);
 | 
						|
    }
 | 
						|
 | 
						|
  private:
 | 
						|
    grn_ctx *ctx_;
 | 
						|
    grn_obj *grn_table_;
 | 
						|
    const grn_id *ids_;
 | 
						|
    arrow::TimeUnit::type time_unit_;
 | 
						|
    grn_obj *grn_column_;
 | 
						|
    grn_obj buffer_;
 | 
						|
 | 
						|
    template <typename T>
 | 
						|
    arrow::Status set_values(const T &array) {
 | 
						|
      int64_t n_rows = array.length();
 | 
						|
      for (int i = 0; i < n_rows; ++i) {
 | 
						|
        auto id = ids_[i];
 | 
						|
        GRN_BULK_REWIND(&buffer_);
 | 
						|
        get_value(array, i);
 | 
						|
        grn_obj_set_value(ctx_, grn_column_, id, &buffer_, GRN_OBJ_SET);
 | 
						|
      }
 | 
						|
      return arrow::Status::OK();
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::BooleanArray &array, int i) {
 | 
						|
      GRN_BOOL_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::UInt8Array &array, int i) {
 | 
						|
      GRN_UINT8_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::Int8Array &array, int i) {
 | 
						|
      GRN_INT8_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::UInt16Array &array, int i) {
 | 
						|
      GRN_UINT16_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::Int16Array &array, int i) {
 | 
						|
      GRN_INT16_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::UInt32Array &array, int i) {
 | 
						|
      GRN_UINT32_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::Int32Array &array, int i) {
 | 
						|
      GRN_INT32_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::UInt64Array &array, int i) {
 | 
						|
      GRN_UINT64_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::Int64Array &array, int i) {
 | 
						|
      GRN_INT64_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::HalfFloatArray &array, int i) {
 | 
						|
      GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::FloatArray &array, int i) {
 | 
						|
      GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::DoubleArray &array, int i) {
 | 
						|
      GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::StringArray &array, int i) {
 | 
						|
      int32_t size;
 | 
						|
      const auto data = array.GetValue(i, &size);
 | 
						|
      GRN_TEXT_SET(ctx_, &buffer_, data, size);
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::Date64Array &array, int i) {
 | 
						|
      GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
    }
 | 
						|
 | 
						|
    void
 | 
						|
    get_value(const arrow::TimestampArray &array, int i) {
 | 
						|
      switch (time_unit_) {
 | 
						|
      case arrow::TimeUnit::SECOND :
 | 
						|
        GRN_TIME_SET(ctx_, &buffer_, GRN_TIME_PACK(array.Value(i), 0));
 | 
						|
        break;
 | 
						|
      case arrow::TimeUnit::MILLI :
 | 
						|
        GRN_TIME_SET(ctx_, &buffer_, array.Value(i) * 1000);
 | 
						|
        break;
 | 
						|
      case arrow::TimeUnit::MICRO :
 | 
						|
        GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
 | 
						|
        break;
 | 
						|
      case arrow::TimeUnit::NANO :
 | 
						|
        GRN_TIME_SET(ctx_, &buffer_, array.Value(i) / 1000);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  };
 | 
						|
 | 
						|
  class FileLoader {
 | 
						|
  public:
 | 
						|
    FileLoader(grn_ctx *ctx, grn_obj *grn_table)
 | 
						|
      : ctx_(ctx),
 | 
						|
        grn_table_(grn_table),
 | 
						|
        key_column_name_("") {
 | 
						|
    }
 | 
						|
 | 
						|
    ~FileLoader() {
 | 
						|
    }
 | 
						|
 | 
						|
    grn_rc load_table(const std::shared_ptr<arrow::Table> &arrow_table) {
 | 
						|
      int n_columns = arrow_table->num_columns();
 | 
						|
 | 
						|
      if (key_column_name_.empty()) {
 | 
						|
        grn_obj ids;
 | 
						|
        GRN_RECORD_INIT(&ids, GRN_OBJ_VECTOR, grn_obj_id(ctx_, grn_table_));
 | 
						|
        auto n_records = arrow_table->num_rows();
 | 
						|
        for (int64_t i = 0; i < n_records; ++i) {
 | 
						|
          auto id = grn_table_add(ctx_, grn_table_, NULL, 0, NULL);
 | 
						|
          GRN_RECORD_PUT(ctx_, &ids, id);
 | 
						|
        }
 | 
						|
        for (int i = 0; i < n_columns; ++i) {
 | 
						|
          int64_t offset = 0;
 | 
						|
          auto arrow_column = arrow_table->column(i);
 | 
						|
          auto arrow_chunked_data = arrow_column->data();
 | 
						|
          for (auto arrow_array : arrow_chunked_data->chunks()) {
 | 
						|
            grn_id *sub_ids =
 | 
						|
              reinterpret_cast<grn_id *>(GRN_BULK_HEAD(&ids)) + offset;
 | 
						|
            ColumnLoadVisitor visitor(ctx_,
 | 
						|
                                      grn_table_,
 | 
						|
                                      arrow_column,
 | 
						|
                                      sub_ids);
 | 
						|
            arrow_array->Accept(&visitor);
 | 
						|
            offset += arrow_array->length();
 | 
						|
          }
 | 
						|
        }
 | 
						|
        GRN_OBJ_FIN(ctx_, &ids);
 | 
						|
      } else {
 | 
						|
        auto status = arrow::Status::NotImplemented("_key isn't supported yet");
 | 
						|
        check_status(ctx_, status, "[arrow][load]");
 | 
						|
      }
 | 
						|
      return ctx_->rc;
 | 
						|
    };
 | 
						|
 | 
						|
    grn_rc load_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) {
 | 
						|
      std::shared_ptr<arrow::Table> arrow_table;
 | 
						|
      std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches(1);
 | 
						|
      arrow_record_batches[0] = arrow_record_batch;
 | 
						|
      auto status =
 | 
						|
        arrow::Table::FromRecordBatches(arrow_record_batches, &arrow_table);
 | 
						|
      if (!check_status(ctx_,
 | 
						|
                        status,
 | 
						|
                        "[arrow][load] "
 | 
						|
                        "failed to convert record batch to table")) {
 | 
						|
        return ctx_->rc;
 | 
						|
      }
 | 
						|
      return load_table(arrow_table);
 | 
						|
    };
 | 
						|
 | 
						|
  private:
 | 
						|
    grn_ctx *ctx_;
 | 
						|
    grn_obj *grn_table_;
 | 
						|
    std::string key_column_name_;
 | 
						|
  };
 | 
						|
 | 
						|
  class FileDumper {
 | 
						|
  public:
 | 
						|
    FileDumper(grn_ctx *ctx, grn_obj *grn_table, grn_obj *grn_columns)
 | 
						|
      : ctx_(ctx),
 | 
						|
        grn_table_(grn_table),
 | 
						|
        grn_columns_(grn_columns) {
 | 
						|
    }
 | 
						|
 | 
						|
    ~FileDumper() {
 | 
						|
    }
 | 
						|
 | 
						|
    grn_rc dump(arrow::io::OutputStream *output) {
 | 
						|
      std::vector<std::shared_ptr<arrow::Field>> fields;
 | 
						|
      auto n_columns = GRN_BULK_VSIZE(grn_columns_) / sizeof(grn_obj *);
 | 
						|
      for (auto i = 0; i < n_columns; ++i) {
 | 
						|
        auto column = GRN_PTR_VALUE_AT(grn_columns_, i);
 | 
						|
 | 
						|
        char column_name[GRN_TABLE_MAX_KEY_SIZE];
 | 
						|
        int column_name_size;
 | 
						|
        column_name_size =
 | 
						|
          grn_column_name(ctx_, column, column_name, GRN_TABLE_MAX_KEY_SIZE);
 | 
						|
        std::string field_name(column_name, column_name_size);
 | 
						|
        std::shared_ptr<arrow::DataType> field_type;
 | 
						|
        switch (grn_obj_get_range(ctx_, column)) {
 | 
						|
        case GRN_DB_BOOL :
 | 
						|
          field_type = arrow::boolean();
 | 
						|
          break;
 | 
						|
        case GRN_DB_UINT8 :
 | 
						|
          field_type = arrow::uint8();
 | 
						|
          break;
 | 
						|
        case GRN_DB_INT8 :
 | 
						|
          field_type = arrow::int8();
 | 
						|
          break;
 | 
						|
        case GRN_DB_UINT16 :
 | 
						|
          field_type = arrow::uint16();
 | 
						|
          break;
 | 
						|
        case GRN_DB_INT16 :
 | 
						|
          field_type = arrow::int16();
 | 
						|
          break;
 | 
						|
        case GRN_DB_UINT32 :
 | 
						|
          field_type = arrow::uint32();
 | 
						|
          break;
 | 
						|
        case GRN_DB_INT32 :
 | 
						|
          field_type = arrow::int32();
 | 
						|
          break;
 | 
						|
        case GRN_DB_UINT64 :
 | 
						|
          field_type = arrow::uint64();
 | 
						|
          break;
 | 
						|
        case GRN_DB_INT64 :
 | 
						|
          field_type = arrow::int64();
 | 
						|
          break;
 | 
						|
        case GRN_DB_FLOAT :
 | 
						|
          field_type = arrow::float64();
 | 
						|
          break;
 | 
						|
        case GRN_DB_TIME :
 | 
						|
          field_type =
 | 
						|
            std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
 | 
						|
          break;
 | 
						|
        case GRN_DB_SHORT_TEXT :
 | 
						|
        case GRN_DB_TEXT :
 | 
						|
        case GRN_DB_LONG_TEXT :
 | 
						|
          field_type = arrow::utf8();
 | 
						|
          break;
 | 
						|
        default :
 | 
						|
          break;
 | 
						|
        }
 | 
						|
        if (!field_type) {
 | 
						|
          continue;
 | 
						|
        }
 | 
						|
 | 
						|
        auto field = std::make_shared<arrow::Field>(field_name,
 | 
						|
                                                    field_type,
 | 
						|
                                                    false);
 | 
						|
        fields.push_back(field);
 | 
						|
      };
 | 
						|
 | 
						|
      auto schema = std::make_shared<arrow::Schema>(fields);
 | 
						|
 | 
						|
      std::shared_ptr<arrow::ipc::RecordBatchFileWriter> writer;
 | 
						|
      auto status =
 | 
						|
        arrow::ipc::RecordBatchFileWriter::Open(output, schema, &writer);
 | 
						|
      if (!check_status(ctx_,
 | 
						|
                        status,
 | 
						|
                        "[arrow][dump] failed to create file format writer")) {
 | 
						|
        return ctx_->rc;
 | 
						|
      }
 | 
						|
 | 
						|
      std::vector<grn_id> ids;
 | 
						|
      int n_records_per_batch = 1000;
 | 
						|
      GRN_TABLE_EACH_BEGIN(ctx_, grn_table_, table_cursor, record_id) {
 | 
						|
        ids.push_back(record_id);
 | 
						|
        if (ids.size() == n_records_per_batch) {
 | 
						|
          write_record_batch(ids, schema, writer);
 | 
						|
          ids.clear();
 | 
						|
        }
 | 
						|
      } GRN_TABLE_EACH_END(ctx_, table_cursor);
 | 
						|
      if (!ids.empty()) {
 | 
						|
        write_record_batch(ids, schema, writer);
 | 
						|
      }
 | 
						|
      writer->Close();
 | 
						|
 | 
						|
      return ctx_->rc;
 | 
						|
    }
 | 
						|
 | 
						|
  private:
 | 
						|
    grn_ctx *ctx_;
 | 
						|
    grn_obj *grn_table_;
 | 
						|
    grn_obj *grn_columns_;
 | 
						|
 | 
						|
    void write_record_batch(std::vector<grn_id> &ids,
 | 
						|
                            std::shared_ptr<arrow::Schema> &schema,
 | 
						|
                            std::shared_ptr<arrow::ipc::RecordBatchFileWriter> &writer) {
 | 
						|
      std::vector<std::shared_ptr<arrow::Array>> columns;
 | 
						|
      auto n_columns = GRN_BULK_VSIZE(grn_columns_) / sizeof(grn_obj *);
 | 
						|
      for (auto i = 0; i < n_columns; ++i) {
 | 
						|
        auto grn_column = GRN_PTR_VALUE_AT(grn_columns_, i);
 | 
						|
 | 
						|
        arrow::Status status;
 | 
						|
        std::shared_ptr<arrow::Array> column;
 | 
						|
 | 
						|
        switch (grn_obj_get_range(ctx_, grn_column)) {
 | 
						|
        case GRN_DB_BOOL :
 | 
						|
          status = build_boolean_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_UINT8 :
 | 
						|
          status = build_uint8_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_INT8 :
 | 
						|
          status = build_int8_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_UINT16 :
 | 
						|
          status = build_uint16_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_INT16 :
 | 
						|
          status = build_int16_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_UINT32 :
 | 
						|
          status = build_uint32_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_INT32 :
 | 
						|
          status = build_int32_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_UINT64 :
 | 
						|
          status = build_uint64_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_INT64 :
 | 
						|
          status = build_int64_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_FLOAT :
 | 
						|
          status = build_double_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_TIME :
 | 
						|
          status = build_timestamp_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        case GRN_DB_SHORT_TEXT :
 | 
						|
        case GRN_DB_TEXT :
 | 
						|
        case GRN_DB_LONG_TEXT :
 | 
						|
          status = build_utf8_array(ids, grn_column, &column);
 | 
						|
          break;
 | 
						|
        default :
 | 
						|
          status =
 | 
						|
            arrow::Status::NotImplemented("[arrow][dumper] not supported type: TODO");
 | 
						|
          break;
 | 
						|
        }
 | 
						|
        if (!status.ok()) {
 | 
						|
          continue;
 | 
						|
        }
 | 
						|
        columns.push_back(column);
 | 
						|
      }
 | 
						|
 | 
						|
      arrow::RecordBatch record_batch(schema, ids.size(), columns);
 | 
						|
      writer->WriteRecordBatch(record_batch);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_boolean_array(std::vector<grn_id> &ids,
 | 
						|
                                      grn_obj *grn_column,
 | 
						|
                                      std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::BooleanBuilder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const grn_bool *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_uint8_array(std::vector<grn_id> &ids,
 | 
						|
                                    grn_obj *grn_column,
 | 
						|
                                    std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::UInt8Builder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const uint8_t *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_int8_array(std::vector<grn_id> &ids,
 | 
						|
                                   grn_obj *grn_column,
 | 
						|
                                   std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::Int8Builder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const int8_t *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_uint16_array(std::vector<grn_id> &ids,
 | 
						|
                                     grn_obj *grn_column,
 | 
						|
                                     std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::UInt16Builder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const uint16_t *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_int16_array(std::vector<grn_id> &ids,
 | 
						|
                                    grn_obj *grn_column,
 | 
						|
                                    std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::Int16Builder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const int16_t *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_uint32_array(std::vector<grn_id> &ids,
 | 
						|
                                     grn_obj *grn_column,
 | 
						|
                                     std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::UInt32Builder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const uint32_t *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_int32_array(std::vector<grn_id> &ids,
 | 
						|
                                    grn_obj *grn_column,
 | 
						|
                                    std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::Int32Builder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const int32_t *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
    arrow::Status build_uint64_array(std::vector<grn_id> &ids,
 | 
						|
                                     grn_obj *grn_column,
 | 
						|
                                     std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::UInt64Builder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const uint64_t *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_int64_array(std::vector<grn_id> &ids,
 | 
						|
                                    grn_obj *grn_column,
 | 
						|
                                    std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::Int64Builder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const int64_t *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_double_array(std::vector<grn_id> &ids,
 | 
						|
                                     grn_obj *grn_column,
 | 
						|
                                     std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::DoubleBuilder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(*(reinterpret_cast<const double *>(data)));
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_timestamp_array(std::vector<grn_id> &ids,
 | 
						|
                                        grn_obj *grn_column,
 | 
						|
                                        std::shared_ptr<arrow::Array> *array) {
 | 
						|
      auto timestamp_ns_data_type =
 | 
						|
        std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
 | 
						|
      arrow::TimestampBuilder builder(arrow::default_memory_pool(),
 | 
						|
                                      timestamp_ns_data_type);
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        auto timestamp_ns = *(reinterpret_cast<const int64_t *>(data));
 | 
						|
        builder.Append(timestamp_ns);
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
 | 
						|
    arrow::Status build_utf8_array(std::vector<grn_id> &ids,
 | 
						|
                                   grn_obj *grn_column,
 | 
						|
                                   std::shared_ptr<arrow::Array> *array) {
 | 
						|
      arrow::StringBuilder builder(arrow::default_memory_pool());
 | 
						|
      for (auto id : ids) {
 | 
						|
        uint32_t size;
 | 
						|
        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | 
						|
        builder.Append(data, size);
 | 
						|
      }
 | 
						|
      return builder.Finish(array);
 | 
						|
    }
 | 
						|
  };
 | 
						|
}
 | 
						|
#endif /* GRN_WITH_ARROW */
 | 
						|
 | 
						|
extern "C" {
 | 
						|
grn_rc
 | 
						|
grn_arrow_load(grn_ctx *ctx,
 | 
						|
               grn_obj *table,
 | 
						|
               const char *path)
 | 
						|
{
 | 
						|
  GRN_API_ENTER;
 | 
						|
#ifdef GRN_WITH_ARROW
 | 
						|
  std::shared_ptr<arrow::io::MemoryMappedFile> input;
 | 
						|
  auto status =
 | 
						|
    arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ, &input);
 | 
						|
  if (!grnarrow::check_status(ctx,
 | 
						|
                              status,
 | 
						|
                              std::ostringstream() <<
 | 
						|
                              "[arrow][load] failed to open path: " <<
 | 
						|
                              "<" << path << ">")) {
 | 
						|
    GRN_API_RETURN(ctx->rc);
 | 
						|
  }
 | 
						|
  std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader;
 | 
						|
  status = arrow::ipc::RecordBatchFileReader::Open(input, &reader);
 | 
						|
  if (!grnarrow::check_status(ctx,
 | 
						|
                              status,
 | 
						|
                              "[arrow][load] "
 | 
						|
                              "failed to create file format reader")) {
 | 
						|
    GRN_API_RETURN(ctx->rc);
 | 
						|
  }
 | 
						|
 | 
						|
  grnarrow::FileLoader loader(ctx, table);
 | 
						|
  int n_record_batches = reader->num_record_batches();
 | 
						|
  for (int i = 0; i < n_record_batches; ++i) {
 | 
						|
    std::shared_ptr<arrow::RecordBatch> record_batch;
 | 
						|
    status = reader->ReadRecordBatch(i, &record_batch);
 | 
						|
    if (!grnarrow::check_status(ctx,
 | 
						|
                                status,
 | 
						|
                                std::ostringstream("") <<
 | 
						|
                                "[arrow][load] failed to get " <<
 | 
						|
                                "the " << i << "-th " << "record")) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    loader.load_record_batch(record_batch);
 | 
						|
    if (ctx->rc != GRN_SUCCESS) {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
  }
 | 
						|
#else /* GRN_WITH_ARROW */
 | 
						|
  ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
 | 
						|
      "[arrow][load] Apache Arrow support isn't enabled");
 | 
						|
#endif /* GRN_WITH_ARROW */
 | 
						|
  GRN_API_RETURN(ctx->rc);
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_arrow_dump(grn_ctx *ctx,
 | 
						|
               grn_obj *table,
 | 
						|
               const char *path)
 | 
						|
{
 | 
						|
  GRN_API_ENTER;
 | 
						|
#ifdef GRN_WITH_ARROW
 | 
						|
  auto all_columns =
 | 
						|
    grn_hash_create(ctx,
 | 
						|
                    NULL,
 | 
						|
                    sizeof(grn_id),
 | 
						|
                    0,
 | 
						|
                    GRN_OBJ_TABLE_HASH_KEY | GRN_HASH_TINY);
 | 
						|
  grn_table_columns(ctx,
 | 
						|
                    table,
 | 
						|
                    "", 0,
 | 
						|
                    reinterpret_cast<grn_obj *>(all_columns));
 | 
						|
 | 
						|
  grn_obj columns;
 | 
						|
  GRN_PTR_INIT(&columns, GRN_OBJ_VECTOR, GRN_ID_NIL);
 | 
						|
  GRN_HASH_EACH_BEGIN(ctx, all_columns, cursor, id) {
 | 
						|
    void *key;
 | 
						|
    grn_hash_cursor_get_key(ctx, cursor, &key);
 | 
						|
    auto column_id = static_cast<grn_id *>(key);
 | 
						|
    auto column = grn_ctx_at(ctx, *column_id);
 | 
						|
    GRN_PTR_PUT(ctx, &columns, column);
 | 
						|
  } GRN_HASH_EACH_END(ctx, cursor);
 | 
						|
  grn_hash_close(ctx, all_columns);
 | 
						|
 | 
						|
  grn_arrow_dump_columns(ctx, table, &columns, path);
 | 
						|
  GRN_OBJ_FIN(ctx, &columns);
 | 
						|
#else /* GRN_WITH_ARROW */
 | 
						|
  ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
 | 
						|
      "[arrow][dump] Apache Arrow support isn't enabled");
 | 
						|
#endif /* GRN_WITH_ARROW */
 | 
						|
  GRN_API_RETURN(ctx->rc);
 | 
						|
}
 | 
						|
 | 
						|
grn_rc
 | 
						|
grn_arrow_dump_columns(grn_ctx *ctx,
 | 
						|
                       grn_obj *table,
 | 
						|
                       grn_obj *columns,
 | 
						|
                       const char *path)
 | 
						|
{
 | 
						|
  GRN_API_ENTER;
 | 
						|
#ifdef GRN_WITH_ARROW
 | 
						|
  std::shared_ptr<arrow::io::FileOutputStream> output;
 | 
						|
  auto status = arrow::io::FileOutputStream::Open(path, &output);
 | 
						|
  if (!grnarrow::check_status(ctx,
 | 
						|
                              status,
 | 
						|
                              std::stringstream() <<
 | 
						|
                              "[arrow][dump] failed to open path: " <<
 | 
						|
                              "<" << path << ">")) {
 | 
						|
    GRN_API_RETURN(ctx->rc);
 | 
						|
  }
 | 
						|
 | 
						|
  grnarrow::FileDumper dumper(ctx, table, columns);
 | 
						|
  dumper.dump(output.get());
 | 
						|
#else /* GRN_WITH_ARROW */
 | 
						|
  ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
 | 
						|
      "[arrow][dump] Apache Arrow support isn't enabled");
 | 
						|
#endif /* GRN_WITH_ARROW */
 | 
						|
  GRN_API_RETURN(ctx->rc);
 | 
						|
}
 | 
						|
}
 |