mirror of
				https://github.com/MariaDB/server.git
				synced 2025-10-30 18:36:12 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			849 lines
		
	
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			849 lines
		
	
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* -*- c-basic-offset: 2 -*- */
 | |
| /*
 | |
|   Copyright(C) 2017 Brazil
 | |
| 
 | |
|   This library is free software; you can redistribute it and/or
 | |
|   modify it under the terms of the GNU Lesser General Public
 | |
|   License version 2.1 as published by the Free Software Foundation.
 | |
| 
 | |
|   This library is distributed in the hope that it will be useful,
 | |
|   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | |
|   Lesser General Public License for more details.
 | |
| 
 | |
|   You should have received a copy of the GNU Lesser General Public
 | |
|   License along with this library; if not, write to the Free Software
 | |
|   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
 | |
| */
 | |
| 
 | |
| #include "grn.h"
 | |
| #include "grn_db.h"
 | |
| 
 | |
| #ifdef GRN_WITH_ARROW
 | |
| #include <groonga/arrow.hpp>
 | |
| 
 | |
| #include <arrow/api.h>
 | |
| #include <arrow/io/file.h>
 | |
| #include <arrow/ipc/api.h>
 | |
| 
 | |
| #include <sstream>
 | |
| 
 | |
| namespace grnarrow {
 | |
|   grn_rc status_to_rc(arrow::Status &status) {
 | |
|     switch (status.code()) {
 | |
|     case arrow::StatusCode::OK:
 | |
|       return GRN_SUCCESS;
 | |
|     case arrow::StatusCode::OutOfMemory:
 | |
|       return GRN_NO_MEMORY_AVAILABLE;
 | |
|     case arrow::StatusCode::KeyError:
 | |
|       return GRN_INVALID_ARGUMENT; // TODO
 | |
|     case arrow::StatusCode::TypeError:
 | |
|       return GRN_INVALID_ARGUMENT; // TODO
 | |
|     case arrow::StatusCode::Invalid:
 | |
|       return GRN_INVALID_ARGUMENT;
 | |
|     case arrow::StatusCode::IOError:
 | |
|       return GRN_INPUT_OUTPUT_ERROR;
 | |
|     case arrow::StatusCode::UnknownError:
 | |
|       return GRN_UNKNOWN_ERROR;
 | |
|     case arrow::StatusCode::NotImplemented:
 | |
|       return GRN_FUNCTION_NOT_IMPLEMENTED;
 | |
|     default:
 | |
|       return GRN_UNKNOWN_ERROR;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   grn_bool check_status(grn_ctx *ctx,
 | |
|                         arrow::Status &status,
 | |
|                         const char *context) {
 | |
|     if (status.ok()) {
 | |
|       return GRN_TRUE;
 | |
|     } else {
 | |
|       auto rc = status_to_rc(status);
 | |
|       auto message = status.ToString();
 | |
|       ERR(rc, "%s: %s", context, message.c_str());
 | |
|       return GRN_FALSE;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   grn_bool check_status(grn_ctx *ctx,
 | |
|                         arrow::Status &status,
 | |
|                         std::ostream &output) {
 | |
|     return check_status(ctx,
 | |
|                         status,
 | |
|                         static_cast<std::stringstream &>(output).str().c_str());
 | |
|   }
 | |
| 
 | |
|   class ColumnLoadVisitor : public arrow::ArrayVisitor {
 | |
|   public:
 | |
|     ColumnLoadVisitor(grn_ctx *ctx,
 | |
|                       grn_obj *grn_table,
 | |
|                       std::shared_ptr<arrow::Column> &arrow_column,
 | |
|                       const grn_id *ids)
 | |
|       : ctx_(ctx),
 | |
|         grn_table_(grn_table),
 | |
|         ids_(ids),
 | |
|         time_unit_(arrow::TimeUnit::SECOND) {
 | |
|       auto column_name = arrow_column->name();
 | |
|       grn_column_ = grn_obj_column(ctx_, grn_table_,
 | |
|                                    column_name.data(),
 | |
|                                    column_name.size());
 | |
| 
 | |
|       auto arrow_type = arrow_column->type();
 | |
|       grn_id type_id;
 | |
|       switch (arrow_type->id()) {
 | |
|       case arrow::Type::BOOL :
 | |
|         type_id = GRN_DB_BOOL;
 | |
|         break;
 | |
|       case arrow::Type::UINT8 :
 | |
|         type_id = GRN_DB_UINT8;
 | |
|         break;
 | |
|       case arrow::Type::INT8 :
 | |
|         type_id = GRN_DB_INT8;
 | |
|         break;
 | |
|       case arrow::Type::UINT16 :
 | |
|         type_id = GRN_DB_UINT16;
 | |
|         break;
 | |
|       case arrow::Type::INT16 :
 | |
|         type_id = GRN_DB_INT16;
 | |
|         break;
 | |
|       case arrow::Type::UINT32 :
 | |
|         type_id = GRN_DB_UINT32;
 | |
|         break;
 | |
|       case arrow::Type::INT32 :
 | |
|         type_id = GRN_DB_INT32;
 | |
|         break;
 | |
|       case arrow::Type::UINT64 :
 | |
|         type_id = GRN_DB_UINT64;
 | |
|         break;
 | |
|       case arrow::Type::INT64 :
 | |
|         type_id = GRN_DB_INT64;
 | |
|         break;
 | |
|       case arrow::Type::HALF_FLOAT :
 | |
|       case arrow::Type::FLOAT :
 | |
|       case arrow::Type::DOUBLE :
 | |
|         type_id = GRN_DB_FLOAT;
 | |
|         break;
 | |
|       case arrow::Type::STRING :
 | |
|         type_id = GRN_DB_TEXT;
 | |
|         break;
 | |
|       case arrow::Type::DATE64 :
 | |
|         type_id = GRN_DB_TIME;
 | |
|         break;
 | |
|       case arrow::Type::TIMESTAMP :
 | |
|         type_id = GRN_DB_TIME;
 | |
|         {
 | |
|           auto arrow_timestamp_type =
 | |
|             std::static_pointer_cast<arrow::TimestampType>(arrow_type);
 | |
|           time_unit_ = arrow_timestamp_type->unit();
 | |
|         }
 | |
|         break;
 | |
|       default :
 | |
|         type_id = GRN_DB_VOID;
 | |
|         break;
 | |
|       }
 | |
| 
 | |
|       if (type_id == GRN_DB_VOID) {
 | |
|         // TODO
 | |
|         return;
 | |
|       }
 | |
| 
 | |
|       if (!grn_column_) {
 | |
|         grn_column_ = grn_column_create(ctx_,
 | |
|                                         grn_table_,
 | |
|                                         column_name.data(),
 | |
|                                         column_name.size(),
 | |
|                                         NULL,
 | |
|                                         GRN_OBJ_COLUMN_SCALAR,
 | |
|                                         grn_ctx_at(ctx_, type_id));
 | |
|       }
 | |
|       if (type_id == GRN_DB_TEXT) {
 | |
|         GRN_TEXT_INIT(&buffer_, GRN_OBJ_DO_SHALLOW_COPY);
 | |
|       } else {
 | |
|         GRN_VALUE_FIX_SIZE_INIT(&buffer_, 0, type_id);
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     ~ColumnLoadVisitor() {
 | |
|       if (grn_obj_is_accessor(ctx_, grn_column_)) {
 | |
|         grn_obj_unlink(ctx_, grn_column_);
 | |
|       }
 | |
|       GRN_OBJ_FIN(ctx_, &buffer_);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::BooleanArray &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::Int8Array &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::UInt8Array &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::Int16Array &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::UInt16Array &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::Int32Array &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::UInt32Array &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::Int64Array &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::UInt64Array &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::HalfFloatArray &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::FloatArray &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::DoubleArray &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::StringArray &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::Date64Array &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status Visit(const arrow::TimestampArray &array) {
 | |
|       return set_values(array);
 | |
|     }
 | |
| 
 | |
|   private:
 | |
|     grn_ctx *ctx_;
 | |
|     grn_obj *grn_table_;
 | |
|     const grn_id *ids_;
 | |
|     arrow::TimeUnit::type time_unit_;
 | |
|     grn_obj *grn_column_;
 | |
|     grn_obj buffer_;
 | |
| 
 | |
|     template <typename T>
 | |
|     arrow::Status set_values(const T &array) {
 | |
|       int64_t n_rows = array.length();
 | |
|       for (int i = 0; i < n_rows; ++i) {
 | |
|         auto id = ids_[i];
 | |
|         GRN_BULK_REWIND(&buffer_);
 | |
|         get_value(array, i);
 | |
|         grn_obj_set_value(ctx_, grn_column_, id, &buffer_, GRN_OBJ_SET);
 | |
|       }
 | |
|       return arrow::Status::OK();
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::BooleanArray &array, int i) {
 | |
|       GRN_BOOL_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::UInt8Array &array, int i) {
 | |
|       GRN_UINT8_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::Int8Array &array, int i) {
 | |
|       GRN_INT8_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::UInt16Array &array, int i) {
 | |
|       GRN_UINT16_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::Int16Array &array, int i) {
 | |
|       GRN_INT16_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::UInt32Array &array, int i) {
 | |
|       GRN_UINT32_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::Int32Array &array, int i) {
 | |
|       GRN_INT32_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::UInt64Array &array, int i) {
 | |
|       GRN_UINT64_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::Int64Array &array, int i) {
 | |
|       GRN_INT64_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::HalfFloatArray &array, int i) {
 | |
|       GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::FloatArray &array, int i) {
 | |
|       GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::DoubleArray &array, int i) {
 | |
|       GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::StringArray &array, int i) {
 | |
|       int32_t size;
 | |
|       const auto data = array.GetValue(i, &size);
 | |
|       GRN_TEXT_SET(ctx_, &buffer_, data, size);
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::Date64Array &array, int i) {
 | |
|       GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
 | |
|     }
 | |
| 
 | |
|     void
 | |
|     get_value(const arrow::TimestampArray &array, int i) {
 | |
|       switch (time_unit_) {
 | |
|       case arrow::TimeUnit::SECOND :
 | |
|         GRN_TIME_SET(ctx_, &buffer_, GRN_TIME_PACK(array.Value(i), 0));
 | |
|         break;
 | |
|       case arrow::TimeUnit::MILLI :
 | |
|         GRN_TIME_SET(ctx_, &buffer_, array.Value(i) * 1000);
 | |
|         break;
 | |
|       case arrow::TimeUnit::MICRO :
 | |
|         GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
 | |
|         break;
 | |
|       case arrow::TimeUnit::NANO :
 | |
|         GRN_TIME_SET(ctx_, &buffer_, array.Value(i) / 1000);
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
|   };
 | |
| 
 | |
|   class FileLoader {
 | |
|   public:
 | |
|     FileLoader(grn_ctx *ctx, grn_obj *grn_table)
 | |
|       : ctx_(ctx),
 | |
|         grn_table_(grn_table),
 | |
|         key_column_name_("") {
 | |
|     }
 | |
| 
 | |
|     ~FileLoader() {
 | |
|     }
 | |
| 
 | |
|     grn_rc load_table(const std::shared_ptr<arrow::Table> &arrow_table) {
 | |
|       int n_columns = arrow_table->num_columns();
 | |
| 
 | |
|       if (key_column_name_.empty()) {
 | |
|         grn_obj ids;
 | |
|         GRN_RECORD_INIT(&ids, GRN_OBJ_VECTOR, grn_obj_id(ctx_, grn_table_));
 | |
|         auto n_records = arrow_table->num_rows();
 | |
|         for (int64_t i = 0; i < n_records; ++i) {
 | |
|           auto id = grn_table_add(ctx_, grn_table_, NULL, 0, NULL);
 | |
|           GRN_RECORD_PUT(ctx_, &ids, id);
 | |
|         }
 | |
|         for (int i = 0; i < n_columns; ++i) {
 | |
|           int64_t offset = 0;
 | |
|           auto arrow_column = arrow_table->column(i);
 | |
|           auto arrow_chunked_data = arrow_column->data();
 | |
|           for (auto arrow_array : arrow_chunked_data->chunks()) {
 | |
|             grn_id *sub_ids =
 | |
|               reinterpret_cast<grn_id *>(GRN_BULK_HEAD(&ids)) + offset;
 | |
|             ColumnLoadVisitor visitor(ctx_,
 | |
|                                       grn_table_,
 | |
|                                       arrow_column,
 | |
|                                       sub_ids);
 | |
|             arrow_array->Accept(&visitor);
 | |
|             offset += arrow_array->length();
 | |
|           }
 | |
|         }
 | |
|         GRN_OBJ_FIN(ctx_, &ids);
 | |
|       } else {
 | |
|         auto status = arrow::Status::NotImplemented("_key isn't supported yet");
 | |
|         check_status(ctx_, status, "[arrow][load]");
 | |
|       }
 | |
|       return ctx_->rc;
 | |
|     };
 | |
| 
 | |
|     grn_rc load_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) {
 | |
|       std::shared_ptr<arrow::Table> arrow_table;
 | |
|       std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches(1);
 | |
|       arrow_record_batches[0] = arrow_record_batch;
 | |
|       auto status =
 | |
|         arrow::Table::FromRecordBatches(arrow_record_batches, &arrow_table);
 | |
|       if (!check_status(ctx_,
 | |
|                         status,
 | |
|                         "[arrow][load] "
 | |
|                         "failed to convert record batch to table")) {
 | |
|         return ctx_->rc;
 | |
|       }
 | |
|       return load_table(arrow_table);
 | |
|     };
 | |
| 
 | |
|   private:
 | |
|     grn_ctx *ctx_;
 | |
|     grn_obj *grn_table_;
 | |
|     std::string key_column_name_;
 | |
|   };
 | |
| 
 | |
|   class FileDumper {
 | |
|   public:
 | |
|     FileDumper(grn_ctx *ctx, grn_obj *grn_table, grn_obj *grn_columns)
 | |
|       : ctx_(ctx),
 | |
|         grn_table_(grn_table),
 | |
|         grn_columns_(grn_columns) {
 | |
|     }
 | |
| 
 | |
|     ~FileDumper() {
 | |
|     }
 | |
| 
 | |
|     grn_rc dump(arrow::io::OutputStream *output) {
 | |
|       std::vector<std::shared_ptr<arrow::Field>> fields;
 | |
|       auto n_columns = GRN_BULK_VSIZE(grn_columns_) / sizeof(grn_obj *);
 | |
|       for (auto i = 0; i < n_columns; ++i) {
 | |
|         auto column = GRN_PTR_VALUE_AT(grn_columns_, i);
 | |
| 
 | |
|         char column_name[GRN_TABLE_MAX_KEY_SIZE];
 | |
|         int column_name_size;
 | |
|         column_name_size =
 | |
|           grn_column_name(ctx_, column, column_name, GRN_TABLE_MAX_KEY_SIZE);
 | |
|         std::string field_name(column_name, column_name_size);
 | |
|         std::shared_ptr<arrow::DataType> field_type;
 | |
|         switch (grn_obj_get_range(ctx_, column)) {
 | |
|         case GRN_DB_BOOL :
 | |
|           field_type = arrow::boolean();
 | |
|           break;
 | |
|         case GRN_DB_UINT8 :
 | |
|           field_type = arrow::uint8();
 | |
|           break;
 | |
|         case GRN_DB_INT8 :
 | |
|           field_type = arrow::int8();
 | |
|           break;
 | |
|         case GRN_DB_UINT16 :
 | |
|           field_type = arrow::uint16();
 | |
|           break;
 | |
|         case GRN_DB_INT16 :
 | |
|           field_type = arrow::int16();
 | |
|           break;
 | |
|         case GRN_DB_UINT32 :
 | |
|           field_type = arrow::uint32();
 | |
|           break;
 | |
|         case GRN_DB_INT32 :
 | |
|           field_type = arrow::int32();
 | |
|           break;
 | |
|         case GRN_DB_UINT64 :
 | |
|           field_type = arrow::uint64();
 | |
|           break;
 | |
|         case GRN_DB_INT64 :
 | |
|           field_type = arrow::int64();
 | |
|           break;
 | |
|         case GRN_DB_FLOAT :
 | |
|           field_type = arrow::float64();
 | |
|           break;
 | |
|         case GRN_DB_TIME :
 | |
|           field_type =
 | |
|             std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
 | |
|           break;
 | |
|         case GRN_DB_SHORT_TEXT :
 | |
|         case GRN_DB_TEXT :
 | |
|         case GRN_DB_LONG_TEXT :
 | |
|           field_type = arrow::utf8();
 | |
|           break;
 | |
|         default :
 | |
|           break;
 | |
|         }
 | |
|         if (!field_type) {
 | |
|           continue;
 | |
|         }
 | |
| 
 | |
|         auto field = std::make_shared<arrow::Field>(field_name,
 | |
|                                                     field_type,
 | |
|                                                     false);
 | |
|         fields.push_back(field);
 | |
|       };
 | |
| 
 | |
|       auto schema = std::make_shared<arrow::Schema>(fields);
 | |
| 
 | |
|       std::shared_ptr<arrow::ipc::RecordBatchFileWriter> writer;
 | |
|       auto status =
 | |
|         arrow::ipc::RecordBatchFileWriter::Open(output, schema, &writer);
 | |
|       if (!check_status(ctx_,
 | |
|                         status,
 | |
|                         "[arrow][dump] failed to create file format writer")) {
 | |
|         return ctx_->rc;
 | |
|       }
 | |
| 
 | |
|       std::vector<grn_id> ids;
 | |
|       int n_records_per_batch = 1000;
 | |
|       GRN_TABLE_EACH_BEGIN(ctx_, grn_table_, table_cursor, record_id) {
 | |
|         ids.push_back(record_id);
 | |
|         if (ids.size() == n_records_per_batch) {
 | |
|           write_record_batch(ids, schema, writer);
 | |
|           ids.clear();
 | |
|         }
 | |
|       } GRN_TABLE_EACH_END(ctx_, table_cursor);
 | |
|       if (!ids.empty()) {
 | |
|         write_record_batch(ids, schema, writer);
 | |
|       }
 | |
|       writer->Close();
 | |
| 
 | |
|       return ctx_->rc;
 | |
|     }
 | |
| 
 | |
|   private:
 | |
|     grn_ctx *ctx_;
 | |
|     grn_obj *grn_table_;
 | |
|     grn_obj *grn_columns_;
 | |
| 
 | |
|     void write_record_batch(std::vector<grn_id> &ids,
 | |
|                             std::shared_ptr<arrow::Schema> &schema,
 | |
|                             std::shared_ptr<arrow::ipc::RecordBatchFileWriter> &writer) {
 | |
|       std::vector<std::shared_ptr<arrow::Array>> columns;
 | |
|       auto n_columns = GRN_BULK_VSIZE(grn_columns_) / sizeof(grn_obj *);
 | |
|       for (auto i = 0; i < n_columns; ++i) {
 | |
|         auto grn_column = GRN_PTR_VALUE_AT(grn_columns_, i);
 | |
| 
 | |
|         arrow::Status status;
 | |
|         std::shared_ptr<arrow::Array> column;
 | |
| 
 | |
|         switch (grn_obj_get_range(ctx_, grn_column)) {
 | |
|         case GRN_DB_BOOL :
 | |
|           status = build_boolean_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_UINT8 :
 | |
|           status = build_uint8_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_INT8 :
 | |
|           status = build_int8_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_UINT16 :
 | |
|           status = build_uint16_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_INT16 :
 | |
|           status = build_int16_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_UINT32 :
 | |
|           status = build_uint32_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_INT32 :
 | |
|           status = build_int32_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_UINT64 :
 | |
|           status = build_uint64_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_INT64 :
 | |
|           status = build_int64_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_FLOAT :
 | |
|           status = build_double_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_TIME :
 | |
|           status = build_timestamp_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         case GRN_DB_SHORT_TEXT :
 | |
|         case GRN_DB_TEXT :
 | |
|         case GRN_DB_LONG_TEXT :
 | |
|           status = build_utf8_array(ids, grn_column, &column);
 | |
|           break;
 | |
|         default :
 | |
|           status =
 | |
|             arrow::Status::NotImplemented("[arrow][dumper] not supported type: TODO");
 | |
|           break;
 | |
|         }
 | |
|         if (!status.ok()) {
 | |
|           continue;
 | |
|         }
 | |
|         columns.push_back(column);
 | |
|       }
 | |
| 
 | |
|       arrow::RecordBatch record_batch(schema, ids.size(), columns);
 | |
|       writer->WriteRecordBatch(record_batch);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_boolean_array(std::vector<grn_id> &ids,
 | |
|                                       grn_obj *grn_column,
 | |
|                                       std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::BooleanBuilder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const grn_bool *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_uint8_array(std::vector<grn_id> &ids,
 | |
|                                     grn_obj *grn_column,
 | |
|                                     std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::UInt8Builder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const uint8_t *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_int8_array(std::vector<grn_id> &ids,
 | |
|                                    grn_obj *grn_column,
 | |
|                                    std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::Int8Builder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const int8_t *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_uint16_array(std::vector<grn_id> &ids,
 | |
|                                      grn_obj *grn_column,
 | |
|                                      std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::UInt16Builder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const uint16_t *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_int16_array(std::vector<grn_id> &ids,
 | |
|                                     grn_obj *grn_column,
 | |
|                                     std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::Int16Builder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const int16_t *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_uint32_array(std::vector<grn_id> &ids,
 | |
|                                      grn_obj *grn_column,
 | |
|                                      std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::UInt32Builder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const uint32_t *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_int32_array(std::vector<grn_id> &ids,
 | |
|                                     grn_obj *grn_column,
 | |
|                                     std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::Int32Builder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const int32_t *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
|     arrow::Status build_uint64_array(std::vector<grn_id> &ids,
 | |
|                                      grn_obj *grn_column,
 | |
|                                      std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::UInt64Builder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const uint64_t *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_int64_array(std::vector<grn_id> &ids,
 | |
|                                     grn_obj *grn_column,
 | |
|                                     std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::Int64Builder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const int64_t *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_double_array(std::vector<grn_id> &ids,
 | |
|                                      grn_obj *grn_column,
 | |
|                                      std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::DoubleBuilder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(*(reinterpret_cast<const double *>(data)));
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_timestamp_array(std::vector<grn_id> &ids,
 | |
|                                         grn_obj *grn_column,
 | |
|                                         std::shared_ptr<arrow::Array> *array) {
 | |
|       auto timestamp_ns_data_type =
 | |
|         std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
 | |
|       arrow::TimestampBuilder builder(arrow::default_memory_pool(),
 | |
|                                       timestamp_ns_data_type);
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         auto timestamp_ns = *(reinterpret_cast<const int64_t *>(data));
 | |
|         builder.Append(timestamp_ns);
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
| 
 | |
|     arrow::Status build_utf8_array(std::vector<grn_id> &ids,
 | |
|                                    grn_obj *grn_column,
 | |
|                                    std::shared_ptr<arrow::Array> *array) {
 | |
|       arrow::StringBuilder builder(arrow::default_memory_pool());
 | |
|       for (auto id : ids) {
 | |
|         uint32_t size;
 | |
|         auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
 | |
|         builder.Append(data, size);
 | |
|       }
 | |
|       return builder.Finish(array);
 | |
|     }
 | |
|   };
 | |
| }
 | |
| #endif /* GRN_WITH_ARROW */
 | |
| 
 | |
| extern "C" {
 | |
| grn_rc
 | |
| grn_arrow_load(grn_ctx *ctx,
 | |
|                grn_obj *table,
 | |
|                const char *path)
 | |
| {
 | |
|   GRN_API_ENTER;
 | |
| #ifdef GRN_WITH_ARROW
 | |
|   std::shared_ptr<arrow::io::MemoryMappedFile> input;
 | |
|   auto status =
 | |
|     arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ, &input);
 | |
|   if (!grnarrow::check_status(ctx,
 | |
|                               status,
 | |
|                               std::ostringstream() <<
 | |
|                               "[arrow][load] failed to open path: " <<
 | |
|                               "<" << path << ">")) {
 | |
|     GRN_API_RETURN(ctx->rc);
 | |
|   }
 | |
|   std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader;
 | |
|   status = arrow::ipc::RecordBatchFileReader::Open(input, &reader);
 | |
|   if (!grnarrow::check_status(ctx,
 | |
|                               status,
 | |
|                               "[arrow][load] "
 | |
|                               "failed to create file format reader")) {
 | |
|     GRN_API_RETURN(ctx->rc);
 | |
|   }
 | |
| 
 | |
|   grnarrow::FileLoader loader(ctx, table);
 | |
|   int n_record_batches = reader->num_record_batches();
 | |
|   for (int i = 0; i < n_record_batches; ++i) {
 | |
|     std::shared_ptr<arrow::RecordBatch> record_batch;
 | |
|     status = reader->ReadRecordBatch(i, &record_batch);
 | |
|     if (!grnarrow::check_status(ctx,
 | |
|                                 status,
 | |
|                                 std::ostringstream("") <<
 | |
|                                 "[arrow][load] failed to get " <<
 | |
|                                 "the " << i << "-th " << "record")) {
 | |
|       break;
 | |
|     }
 | |
|     loader.load_record_batch(record_batch);
 | |
|     if (ctx->rc != GRN_SUCCESS) {
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
| #else /* GRN_WITH_ARROW */
 | |
|   ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
 | |
|       "[arrow][load] Apache Arrow support isn't enabled");
 | |
| #endif /* GRN_WITH_ARROW */
 | |
|   GRN_API_RETURN(ctx->rc);
 | |
| }
 | |
| 
 | |
| grn_rc
 | |
| grn_arrow_dump(grn_ctx *ctx,
 | |
|                grn_obj *table,
 | |
|                const char *path)
 | |
| {
 | |
|   GRN_API_ENTER;
 | |
| #ifdef GRN_WITH_ARROW
 | |
|   auto all_columns =
 | |
|     grn_hash_create(ctx,
 | |
|                     NULL,
 | |
|                     sizeof(grn_id),
 | |
|                     0,
 | |
|                     GRN_OBJ_TABLE_HASH_KEY | GRN_HASH_TINY);
 | |
|   grn_table_columns(ctx,
 | |
|                     table,
 | |
|                     "", 0,
 | |
|                     reinterpret_cast<grn_obj *>(all_columns));
 | |
| 
 | |
|   grn_obj columns;
 | |
|   GRN_PTR_INIT(&columns, GRN_OBJ_VECTOR, GRN_ID_NIL);
 | |
|   GRN_HASH_EACH_BEGIN(ctx, all_columns, cursor, id) {
 | |
|     void *key;
 | |
|     grn_hash_cursor_get_key(ctx, cursor, &key);
 | |
|     auto column_id = static_cast<grn_id *>(key);
 | |
|     auto column = grn_ctx_at(ctx, *column_id);
 | |
|     GRN_PTR_PUT(ctx, &columns, column);
 | |
|   } GRN_HASH_EACH_END(ctx, cursor);
 | |
|   grn_hash_close(ctx, all_columns);
 | |
| 
 | |
|   grn_arrow_dump_columns(ctx, table, &columns, path);
 | |
|   GRN_OBJ_FIN(ctx, &columns);
 | |
| #else /* GRN_WITH_ARROW */
 | |
|   ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
 | |
|       "[arrow][dump] Apache Arrow support isn't enabled");
 | |
| #endif /* GRN_WITH_ARROW */
 | |
|   GRN_API_RETURN(ctx->rc);
 | |
| }
 | |
| 
 | |
| grn_rc
 | |
| grn_arrow_dump_columns(grn_ctx *ctx,
 | |
|                        grn_obj *table,
 | |
|                        grn_obj *columns,
 | |
|                        const char *path)
 | |
| {
 | |
|   GRN_API_ENTER;
 | |
| #ifdef GRN_WITH_ARROW
 | |
|   std::shared_ptr<arrow::io::FileOutputStream> output;
 | |
|   auto status = arrow::io::FileOutputStream::Open(path, &output);
 | |
|   if (!grnarrow::check_status(ctx,
 | |
|                               status,
 | |
|                               std::stringstream() <<
 | |
|                               "[arrow][dump] failed to open path: " <<
 | |
|                               "<" << path << ">")) {
 | |
|     GRN_API_RETURN(ctx->rc);
 | |
|   }
 | |
| 
 | |
|   grnarrow::FileDumper dumper(ctx, table, columns);
 | |
|   dumper.dump(output.get());
 | |
| #else /* GRN_WITH_ARROW */
 | |
|   ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
 | |
|       "[arrow][dump] Apache Arrow support isn't enabled");
 | |
| #endif /* GRN_WITH_ARROW */
 | |
|   GRN_API_RETURN(ctx->rc);
 | |
| }
 | |
| }
 | 
