C++ 列式内存布局数据存储格式 Arrow

Apache Arrow 优点 :
    高性能数据处理: Arrow 使用列式内存布局,这特别适合于数据分析和查询操作,因为它允许对数据进行高效批量处理,减少CPU缓存未命中,从而提升处理速度。
    零拷贝数据共享: Arrow 允许不同系统和进程之间直接共享内存中的数据而无需复制,这对于提高数据密集型应用的效率至关重要,减少了内存使用和CPU开销。
    跨平台兼容性: Arrow 是一个跨语言开发平台,支持C++, Java, Python等多种编程语言,促进了不同软件组件间的互操作性。
    标准化数据格式: 定义了一套统一的数据格式规范,使得数据可以在不同系统间无缝传递,降低了数据转换的成本和复杂性。
    优化大数据处理: 特别是在与大数据框架(如Spark、Pandas)集成时,Arrow 可显著加速数据加载、处理和分析的速度,例如,与PySpark集成后数据处理速度提升高达53倍。
    集成广泛: 被众多数据处理工具和库采用,如Pandas、Parquet、Drill、Spark等,形成了强大的生态系统。
Apache Arrow 缺点 :
    内存消耗: 列式存储相对于行式存储可能需要更多的内存,尤其是在处理稀疏数据或宽表时,因为每一列都需要分配连续的内存空间。
    不适合所有场景: 对于需要频繁随机访问记录或更新操作的场景,Arrow 的列式存储可能不如传统的行式存储高效。
    学习曲线: 对于新用户来说,理解和掌握Arrow的数据结构和API可能需要一定时间,尤其是当他们习惯于使用其他数据处理模型时。
    生态成熟度: 虽然Arrow的生态系统正在快速发展,但在某些特定领域或小众技术栈中,相关支持和工具可能不够丰富或成熟。
    实现复杂性: 对于开发者来说,实现Arrow的高效利用可能涉及到复杂的内存管理和优化策略,这在某些情况下可能会增加开发难度。


#define ARROW_COMPUTE#include <arrow/compute/api.h>
#include "arrow/pretty_print.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/json/api.h>
#include <arrow/io/api.h>
#include <arrow/table.h>
#include <arrow/pretty_print.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/ipc/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include <memory>
#include <iostream>template <typename T>
using numbuildT = arrow::NumericBuilder<T>;struct ArrowUtil {static arrow::Status read_csv(char const* file_name, std::shared_ptr<arrow::Table>& tb);static arrow::Status read_ipc(char const* file_name, std::shared_ptr<arrow::Table>& tb);static arrow::Status read_parquet(char const* file_name, std::shared_ptr<arrow::Table>& tb);static arrow::Status read_json(char const* file_name, std::shared_ptr<arrow::Table>& tb);static arrow::Status write_ipc(arrow::Table const& tb, char const* file_name);static arrow::Status write_parquet(arrow::Table const& tb, char const* file_name);template <typename T, typename buildT, typename arrayT>inline static std::shared_ptr<arrow::Array> chunked_array_to_array(std::shared_ptr<arrow::ChunkedArray> const& array_a) {buildT int64_builder;int64_builder.Resize(array_a->length());std::vector<T> int64_values;int64_values.reserve(array_a->length());for (int i = 0; i < array_a->num_chunks(); ++i) {auto inner_arr = array_a->chunk(i);auto int_a = std::static_pointer_cast<arrayT>(inner_arr);for (int j = 0; j < int_a->length(); ++j) {int64_values.push_back(int_a->Value(j));}}int64_builder.AppendValues(int64_values);std::shared_ptr<arrow::Array> array_a_res;int64_builder.Finish(&array_a_res);return array_a_res;}template <typename T, typename arrayT>inline static std::vector<T> chunked_array_to_vector(std::shared_ptr<arrow::ChunkedArray> const& array_a) {std::vector<T> int64_values;int64_values.reserve(array_a->length());for (int i = 0; i < array_a->num_chunks(); ++i) {auto inner_arr = array_a->chunk(i);auto int_a = std::static_pointer_cast<arrayT>(inner_arr);for (int j = 0; j < int_a->length(); ++j) {int64_values.push_back(int_a->Value(j));}}return int64_values;}inline static std::vector<std::string> chunked_array_to_str_vector(std::shared_ptr<arrow::ChunkedArray> const& array_a) {std::vector<std::string> int64_values;int64_values.reserve(array_a->length());for (int i = 0; i < array_a->num_chunks(); ++i) {auto inner_arr = array_a->chunk(i);auto int_a = std::static_pointer_cast<arrow::StringArray>(inner_arr);for (int j = 0; j < int_a->length(); ++j) {int64_values.push_back(int_a->Value(j).data());}}return int64_values;}inline static std::shared_ptr<arrow::Array> chunked_array_to_str_array(std::shared_ptr<arrow::ChunkedArray> const& array_a) {arrow::StringBuilder int64_builder;int64_builder.Resize(array_a->length());std::vector<std::string> int64_values;int64_values.reserve(array_a->length());for (int i = 0; i < array_a->num_chunks(); ++i) {auto inner_arr = array_a->chunk(i);auto int_a = std::static_pointer_cast<arrow::StringArray>(inner_arr);for (int j = 0; j < int_a->length(); ++j) {int64_values.push_back(int_a->Value(j).data());}}int64_builder.AppendValues(int64_values);std::shared_ptr<arrow::Array> array_a_res;int64_builder.Finish(&array_a_res);return array_a_res;}
};arrow::Status ArrowUtil::read_csv(char const* file_name, std::shared_ptr<arrow::Table>& tb) {ARROW_ASSIGN_OR_RAISE(auto input_file,arrow::io::ReadableFile::Open(file_name));ARROW_ASSIGN_OR_RAISE(auto csv_reader,arrow::csv::TableReader::Make(arrow::io::default_io_context(), input_file,arrow::csv::ReadOptions::Defaults(),arrow::csv::ParseOptions::Defaults(),arrow::csv::ConvertOptions::Defaults()));ARROW_ASSIGN_OR_RAISE(auto table, csv_reader->Read());tb = table;return arrow::Status::OK();
}arrow::Status ArrowUtil::read_ipc(char const* file_name, std::shared_ptr<arrow::Table>& tb) {ARROW_ASSIGN_OR_RAISE(auto input_file,arrow::io::ReadableFile::Open(file_name));ARROW_ASSIGN_OR_RAISE(auto ipc_reader, arrow::ipc::RecordBatchFileReader::Open(input_file));std::vector<std::shared_ptr<arrow::RecordBatch>> batches;batches.reserve(ipc_reader->num_record_batches());for (int i = 0; i < ipc_reader->num_record_batches(); ++i) {ARROW_ASSIGN_OR_RAISE(auto a_record, ipc_reader->ReadRecordBatch(i));batches.emplace_back(std::move(a_record));}arrow::Table::FromRecordBatches(ipc_reader->schema(), std::move(batches)).Value(&tb);return arrow::Status::OK();
}arrow::Status ArrowUtil::read_parquet(char const* file_name, std::shared_ptr<arrow::Table>& tb) {std::shared_ptr<arrow::io::ReadableFile> infile;PARQUET_ASSIGN_OR_THROW(infile,arrow::io::ReadableFile::Open(file_name,arrow::default_memory_pool()));std::unique_ptr<parquet::arrow::FileReader> reader;PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));std::shared_ptr<arrow::Table> table;PARQUET_THROW_NOT_OK(reader->ReadTable(&table));tb = table;return arrow::Status::OK();
}arrow::Status ArrowUtil::read_json(char const* file_name, std::shared_ptr<arrow::Table>& tb) {std::shared_ptr<arrow::io::ReadableFile> infile;PARQUET_ASSIGN_OR_THROW(infile,arrow::io::ReadableFile::Open(file_name,arrow::default_memory_pool()));ARROW_ASSIGN_OR_RAISE(auto reader, arrow::json::TableReader::Make(arrow::default_memory_pool(), infile, arrow::json::ReadOptions::Defaults(), arrow::json::ParseOptions::Defaults()));ARROW_ASSIGN_OR_RAISE(auto res_tb, reader->Read());tb = res_tb;return arrow::Status::OK();
}arrow::Status ArrowUtil::write_ipc(arrow::Table const& tb, char const* file_name) {ARROW_ASSIGN_OR_RAISE(auto output_file,arrow::io::FileOutputStream::Open(file_name));ARROW_ASSIGN_OR_RAISE(auto batch_writer,arrow::ipc::MakeFileWriter(output_file, tb.schema()));ARROW_RETURN_NOT_OK(batch_writer->WriteTable(tb));ARROW_RETURN_NOT_OK(batch_writer->Close());return arrow::Status::OK();
}arrow::Status ArrowUtil::write_parquet(arrow::Table const& tb, char const* file_name) {std::shared_ptr<arrow::io::FileOutputStream> outfile;PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(file_name));// The last argument to the function call is the size of the RowGroup in// the parquet file. Normally you would choose this to be rather large but// for the example, we use a small value to have multiple RowGroups.PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable(tb, arrow::default_memory_pool(), outfile, 3));return arrow::Status::OK();
}void testReadCSV() {// 读取CSV文件char const* csv_path = "./test.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(csv_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);assert(tb_.num_rows() == 2);
}void testWriteIpc() {// 读取CSV文件并写入IPC文件char const* csv_path = "./test.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(csv_path, tb);auto const& tb_ = *tb;char const* write_csv_path = "./test_dst.arrow";arrow::PrettyPrint(tb_, {}, &std::cerr);auto write_res = ArrowUtil::write_ipc(tb_, write_csv_path);assert(write_res == arrow::Status::OK());
}void testReadIPC() {// 读取Arrow IPC 文件char const* ipc_path = "./test_dst.arrow";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_ipc(ipc_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);assert(tb_.num_rows() == 2);
}void testWriteParquet() {// 写入Parquet文件char const* csv_path = "./test.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(csv_path, tb);auto const& tb_ = *tb;char const* write_parquet_path = "./test_dst.parquet";arrow::PrettyPrint(tb_, {}, &std::cerr);auto write_res = ArrowUtil::write_parquet(tb_, write_parquet_path);assert(write_res == arrow::Status::OK());
}void testReadParquet() {// 读取 Parquetchar const* parquet_path = "./test_dst.parquet";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_parquet(parquet_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);assert(tb_.num_rows() == 2);
}void testReadJson() {// 读取Json文件char const* json_path = "./test.json";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_json(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);assert(tb_.num_rows() == 2);
}void testComputeGreater() {// 比较两列 int 值中 int1 > int2的值, greater函数char const* json_path = "./comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_b = tb_.GetColumnByName("int2");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);auto array_b_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_b);auto compared_datum = arrow::compute::CallFunction("greater", { array_a_res, array_b_res });auto array_a_gt_b_compute = compared_datum->make_array();arrow::PrettyPrint(*array_a_gt_b_compute, {}, &std::cerr);auto schema =arrow::schema({ arrow::field("int1", arrow::int64()), arrow::field("int2", arrow::int64()),arrow::field("a>b? (arrow)", arrow::boolean()) });std::shared_ptr<arrow::Table> my_table = arrow::Table::Make(schema, { array_a_res, array_b_res, array_a_gt_b_compute }, tb_.num_rows());arrow::PrettyPrint(*my_table, {}, &std::cerr);
}void testComputeMinMax() {// 计算int1列的最大值和最小值char const* json_path = "./comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto min_max = arrow::compute::CallFunction("min_max", { array_a_res }, &scalar_aggregate_options);// Unpack struct scalar result (a two-field {"min", "max"} scalar)auto min_value = min_max->scalar_as<arrow::StructScalar>().value[0];auto max_value = min_max->scalar_as<arrow::StructScalar>().value[1];assert(min_value->ToString() == "1");assert(max_value->ToString() == "8");
}#define GTEST_TEST(a, b) void a##_##b()
#define ASSERT_EQ(a, b) assert(a == b)GTEST_TEST(RWTests, ComputeMean) {// 计算int1列的平均值char const* json_path = "../data/comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto mean = arrow::compute::CallFunction("mean", { array_a_res }, &scalar_aggregate_options);auto const& mean_value = mean->scalar_as<arrow::Scalar>();ASSERT_EQ(mean_value.ToString(), "4.5");
}GTEST_TEST(RWTests, ComputeAdd) {// 将第一列的值加3char const* json_path = "../data/comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;std::shared_ptr<arrow::Scalar> increment = std::make_shared<arrow::Int64Scalar>(3);auto add = arrow::compute::CallFunction("add", { array_a_res, increment }, &scalar_aggregate_options);std::shared_ptr<arrow::Array> incremented_array = add->array_as<arrow::Array>();arrow::PrettyPrint(*incremented_array, {}, &std::cerr);
}GTEST_TEST(RWTests, ComputeAddArray) {// int1和int2两列相加char const* json_path = "../data/comp_gt.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("int1");auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);auto array_b = tb_.GetColumnByName("int2");auto array_b_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_b);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto add = arrow::compute::CallFunction("add", { array_a_res, array_b_res }, &scalar_aggregate_options);std::shared_ptr<arrow::Array> incremented_array = add->array_as<arrow::Array>();arrow::PrettyPrint(*incremented_array, {}, &std::cerr);
}GTEST_TEST(RWTests, ComputeStringEqual) {// 比较s1和s2两列是否相等char const* json_path = "../data/comp_s_eq.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("s1");auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a);auto array_b = tb_.GetColumnByName("s2");auto array_b_res = ArrowUtil::chunked_array_to_str_array(array_b);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto eq_ = arrow::compute::CallFunction("equal", { array_a_res, array_b_res }, &scalar_aggregate_options);std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>();arrow::PrettyPrint(*eq_array, {}, &std::cerr);
}GTEST_TEST(RWTests, ComputeCustom) {// 自己写算法逐个比较相等 char const* json_path = "../data/comp_s_eq.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto arr1 = tb_.GetColumnByName("s1");auto arr2 = tb_.GetColumnByName("s2");auto v1 = ArrowUtil::chunked_array_to_str_vector(arr1);auto v2 = ArrowUtil::chunked_array_to_str_vector(arr2);for (std::size_t i = 0; i < v1.size(); ++i) {if (v1[i] != v2[i]) {std::cerr << v1[i] << "!=" << v2[i] << "\n";}}
}GTEST_TEST(RWTests, ComputeCustomDbl) {// 自己写算法比较double值char const* json_path = "../data/custom_dbl.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto arr1 = tb_.GetColumnByName("dbl1");auto arr2 = tb_.GetColumnByName("dbl2");auto v1 = ArrowUtil::chunked_array_to_vector<double, arrow::DoubleArray>(arr1);auto v2 = ArrowUtil::chunked_array_to_vector<double, arrow::DoubleArray>(arr2);for (std::size_t i = 0; i < v1.size(); ++i) {if (v1[i] != v2[i]) {std::cerr << v1[i] << "!=" << v2[i] << "\n";}}
}GTEST_TEST(RWTests, ComputeEqualDbl) {// 使用equal函数比较double值char const* json_path = "../data/custom_dbl.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto arr1 = tb_.GetColumnByName("dbl1");auto arr2 = tb_.GetColumnByName("dbl2");auto dbl_arr1 = ArrowUtil::chunked_array_to_array<double, numbuildT<arrow::DoubleType>, arrow::DoubleArray>(arr1);auto dbl_arr2 = ArrowUtil::chunked_array_to_array<double, numbuildT<arrow::DoubleType>, arrow::DoubleArray>(arr2);arrow::compute::ScalarAggregateOptions scalar_aggregate_options;scalar_aggregate_options.skip_nulls = false;auto eq_ = arrow::compute::CallFunction("equal", { dbl_arr1, dbl_arr2 }, &scalar_aggregate_options);std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>();arrow::PrettyPrint(*eq_array, {}, &std::cerr);
}GTEST_TEST(RWTests, StrStartsWith) {// 计算s1列以是否以 Zha开头的值char const* json_path = "../data/comp_s_eq.csv";std::shared_ptr<arrow::Table> tb;ArrowUtil::read_csv(json_path, tb);auto const& tb_ = *tb;arrow::PrettyPrint(tb_, {}, &std::cerr);auto array_a = tb_.GetColumnByName("s1");auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a);arrow::compute::MatchSubstringOptions options("Zha");auto eq_ = arrow::compute::CallFunction("starts_with", { array_a_res }, &options);std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>();arrow::PrettyPrint(*eq_array, {}, &std::cerr);
}using arrow::Int32Builder;
using arrow::Int64Builder;
using arrow::DoubleBuilder;
using arrow::StringBuilder;struct row_data {int32_t col1;int64_t col2;double col3;std::string col4;
};//行结构#define EXIT_ON_FAILURE(expr)                      \do {                                             \arrow::Status status_ = (expr);                \if (!status_.ok()) {                           \std::cerr << status_.message() << std::endl; \return EXIT_FAILURE;                         \}                                              \} while (0);arrow::Status CreateTable(const std::vector<struct row_data>& rows, std::shared_ptr<arrow::Table>* table) {//使用arrow::jemalloc::MemoryPool::default_pool()构建器更有效,因为这可以适当增加底层内存区域的大小.arrow::MemoryPool* pool = arrow::default_memory_pool();Int32Builder col1_builder(pool);Int64Builder col2_builder(pool);DoubleBuilder col3_builder(pool);StringBuilder col4_builder(pool);//现在我们可以循环我们现有的数据,并将其插入到构建器中。这里的' Append '调用可能会失败(例如,我们无法分配足够的额外内存)。因此我们需要检查它们的返回值。for (const row_data& row : rows) {ARROW_RETURN_NOT_OK(col1_builder.Append(row.col1));ARROW_RETURN_NOT_OK(col2_builder.Append(row.col2));ARROW_RETURN_NOT_OK(col3_builder.Append(row.col3));ARROW_RETURN_NOT_OK(col4_builder.Append(row.col4));}//添加空值,末尾值的元素为空ARROW_RETURN_NOT_OK(col1_builder.AppendNull());ARROW_RETURN_NOT_OK(col2_builder.AppendNull());ARROW_RETURN_NOT_OK(col3_builder.AppendNull());ARROW_RETURN_NOT_OK(col4_builder.AppendNull());std::shared_ptr<arrow::Array> col1_array;ARROW_RETURN_NOT_OK(col1_builder.Finish(&col1_array));std::shared_ptr<arrow::Array> col2_array;ARROW_RETURN_NOT_OK(col2_builder.Finish(&col2_array));std::shared_ptr<arrow::Array> col3_array;ARROW_RETURN_NOT_OK(col3_builder.Finish(&col3_array));std::shared_ptr<arrow::Array> col4_array;ARROW_RETURN_NOT_OK(col4_builder.Finish(&col4_array));std::vector<std::shared_ptr<arrow::Field>> schema_vector = {arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()),arrow::field("col4", arrow::utf8()) };auto schema = std::make_shared<arrow::Schema>(schema_vector);//最终的' table '变量是我们可以传递给其他可以使用Apache Arrow内存结构的函数的变量。这个对象拥有所有引用数据的所有权,//因此一旦我们离开构建表及其底层数组的函数的作用域,就不必关心未定义的引用。*table = arrow::Table::Make(schema, { col1_array, col2_array, col3_array,col4_array });return arrow::Status::OK();
}arrow::Status TableToVector(const std::shared_ptr<arrow::Table>& table,std::vector<struct row_data>* rows) {//检查表结构是否一致std::vector<std::shared_ptr<arrow::Field>> schema_vector = {arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()),arrow::field("col4", arrow::utf8()) };auto expected_schema = std::make_shared<arrow::Schema>(schema_vector);if (!expected_schema->Equals(*table->schema())) {// The table doesn't have the expected schema thus we cannot directly// convert it to our target representation.return arrow::Status::Invalid("Schemas are not matching!");}//获取对应列数据指针auto col1s =std::static_pointer_cast<arrow::Int32Array>(table->column(0)->chunk(0));auto col2s =std::static_pointer_cast<arrow::Int64Array>(table->column(1)->chunk(0));auto col3s =std::static_pointer_cast<arrow::DoubleArray>(table->column(2)->chunk(0));auto col4s =std::static_pointer_cast<arrow::StringArray>(table->column(3)->chunk(0));for (int64_t i = 0; i < table->num_rows(); i++) {if (col1s->IsNull(i)) {assert(i == 3);//第四行为null}else {int32_t col1 = col1s->Value(i);int64_t col2 = col2s->Value(i);double col3 = col3s->Value(i);std::string col4 = col4s->GetString(i);rows->push_back({ col1, col2, col3,col4 });}}return arrow::Status::OK();
}// 行数组和列数组相互转换
int testTableConvertSTL() {//行数组std::vector<row_data> rows = {{1, 11,1.0, "John"}, {2, 22,2.0, "Tom"}, {3,33, 3.0,"Susan"} };std::shared_ptr<arrow::Table> table;EXIT_ON_FAILURE(CreateTable(rows, &table));std::vector<row_data> expected_rows;EXIT_ON_FAILURE(TableToVector(table, &expected_rows));std::cout << expected_rows.size() << std::endl;assert(rows.size() == expected_rows.size());return 0;
}void test() {// 构建一个int8数组arrow::Int8Builder builder;arrow::Int16Builder int16builder;int8_t days_raw[5] = { 1, 12, 17, 23, 28 };int8_t months_raw[5] = { 1, 3, 5, 7, 1 };int16_t years_raw[5] = { 1990, 2000, 1995, 2000, 1995 };builder.AppendValues(days_raw, 5);std::shared_ptr<arrow::Array> days = builder.Finish().MoveValueUnsafe();    builder.AppendValues(months_raw, 5);std::shared_ptr<arrow::Array> months = builder.Finish().MoveValueUnsafe();    int16builder.AppendValues(years_raw, 5);std::shared_ptr<arrow::Array> years = int16builder.Finish().MoveValueUnsafe();// Schema 自定义table// Now, we want a RecordBatch, which has columns and labels for said columns.// This gets us to the 2d data structures we want in Arrow.// These are defined by schema, which have fields -- here we get both those object types// ready.std::shared_ptr<arrow::Field> field_day, field_month, field_year;std::shared_ptr<arrow::Schema> schema;// Every field needs its name and data type.field_day = arrow::field("Day", arrow::int8());field_month = arrow::field("Month", arrow::int8());field_year = arrow::field("Year", arrow::int16());// The schema can be built from a vector of fields, and we do so here.schema = arrow::schema({ field_day, field_month, field_year });// 打印// With the schema and Arrays full of data, we can make our RecordBatch! Here,// each column is internally contiguous. This is in opposition to Tables, which we'll// see next.std::shared_ptr<arrow::RecordBatch> rbatch;// The RecordBatch needs the schema, length for columns, which all must match,// and the actual data itself.rbatch = arrow::RecordBatch::Make(schema, days->length(), { days, months, years });std::cout << rbatch->ToString();/*Day:   [1,12,17,23,28]Month:   [1,3,5,7,1]Year:   [1990,2000,1995,2000,1995]*/// stl vector容器arrow::ArrayVector day_vecs{days};std::shared_ptr<arrow::ChunkedArray> day_chunks =std::make_shared<arrow::ChunkedArray>(day_vecs);testTableConvertSTL();testReadCSV();/*col1: stringcol2: stringcol3: string----col1:[["val1","val1"]]col2:[["val2","val2"]]col3:[["val3","val3"]]*/testWriteIpc();testReadIPC();//testComputeGreater();//testComputeMinMax();
}

Compute Functions — Apache Arrow v17.0.0

GitHub - apache/arrow: Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing


创作不易,小小的支持一下吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/50127.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【YashanDB知识库】yasdb jdbc驱动集成druid连接池,业务(java)日志中有token IDENTIFIER start异常

问题现象 客户的java日志中有如下异常信息&#xff1a; 问题的风险及影响 对正常的业务流程无影响&#xff0c;但是影响druid的merge sql功能&#xff08;此功能会将sql语句中的字面量替换为绑定变量&#xff0c;然后将替换以后的sql视为同一个&#xff0c;然后用做执行性能统…

Vue3扁平化Tree组件的前端分页实现

大家好&#xff0c;我是小卷。得益于JuanTree的扁平化设计&#xff0c;在数据量很大的情况下除了懒加载&#xff0c;使用前端分页也是一种解决渲染性能问题的可选方案。 用法 要实现的文档&#xff1a; 分页效果&#xff1a; 实现 新增属性&#xff1a; 组件setup方法中新增…

程序员加班现象:成因、影响与应对策略

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a;日常聊聊 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 加班的成因 加班的影响 应对策略 结语 我的其他博客 前言 在现代科技行业中&#xff0c;加班现象已成为一个普遍存在的问题…

配置sublime的中的C++编译器(.sublime-build),实现C++20

GCC 4.8: 支持 C11 (部分) GCC 4.9: 支持 C11 和 C14 (部分) GCC 5: 完全支持 C14 GCC 6: 支持 C14 和 C17 (部分) GCC 7: 支持 C17 (大部分) GCC 8: 完全支持 C17&#xff0c;部分支持 C20 GCC 9: 支持更多的 C20 特性 GCC 10: 支持大部分 C20 特性 GCC 11: 更全面地支持 C20 …

ES中的数据类型学习之ARRAY

Arrays | Elasticsearch Guide [7.17] | Elastic 中文翻译 &#xff1a;Array Elasticsearch 5.4 中文文档 看云 Arrays In Elasticsearch, there is no dedicated array data type. Any field can contain zero or more values by default, however, all values in the a…

SpringBoot 自动配置原理

一、Condition Condition 是在 Spring 4.0 增加的条件判断功能&#xff0c;通过这个可以功能可以实现选择性的创建 Bean 操 作。 思考&#xff1a; SpringBoot 是如何知道要创建哪个 Bean 的&#xff1f;比如 SpringBoot 是如何知道要创建 RedisTemplate 的&#xff1f; …

mysql的B+树索引结构介绍

一、B树 特性&#xff1a; 所有的叶子结点中包含了全部关键字的信息&#xff0c;非叶子节点只存储键值信息&#xff0c;及指向含有这些关键字记录的指针&#xff0c;且叶子结点本身依关键字的大小自小而大的顺序链接&#xff0c;所有的非终端结点可以看成是索引部分&#xff0…

MySQL数据库基本用法

了解数据库基本概念 什么是数据库&#xff1f; • 长期存放在计算机内&#xff0c;有组织、可共享的大量数据的集合&#xff0c;是一个数据“仓库” MySQL数据库的特点 • 开源免费&#xff0c;小巧但功能齐全 • 可在Windows和Linux系统上运行 • 操作方便&#xff0c;…

昇思25天学习打卡营第22天|munger85

LSTMCRF序列标注 我们希望得到这个模型来对词进行标注&#xff0c;B是开始&#xff0c;I是实体词的非开始&#xff0c;O是非实体词。 我们首先需要lstm对序列里token的记忆&#xff0c;和计算每个token发到crf的分数&#xff0c;发完了再退出来&#xff0c;最后形成1模型。那么…

免费可视化工具大显身手:公司财务报表一键生成

面对海量的财务数据&#xff0c;如何快速、准确地提炼出有价值的信息&#xff0c;并以直观易懂的方式呈现给管理层及利益相关者&#xff0c;成为了每一家企业面临的重大挑战。 传统财务报表编制过程繁琐&#xff0c;不仅耗时耗力&#xff0c;还容易出错。而一些可视化工具&…

Java学习笔记(四)控制流程语句、循环、跳转控制语句

Hi i,m JinXiang ⭐ 前言 ⭐ 本篇文章主要介绍Java控制流程语句、循环、跳转控制语句使用以及部分理论知识 &#x1f349;欢迎点赞 &#x1f44d; 收藏 ⭐留言评论 &#x1f4dd;私信必回哟&#x1f601; &#x1f349;博主收将持续更新学习记录获&#xff0c;友友们有任何问题…

Java多线线程-----等待唤醒机制(wait notify)

目录 一.等待唤醒机制简介&#xff1a; 二.synchronized,wait(),notify(): 三.等待唤醒机制案例: 例题一&#xff1a; 例题二&#xff1a; 四.什么时候释放锁—wait&#xff08;&#xff09;、notify&#xff08;&#xff09; 一.等待唤醒机制简介&#xff1a; 由于线程之…

pyqt5制作音乐播放器(第三版)

这次接入了数据库&#xff0c;增加了翻页模式&#xff0c;更新了功能跳转之间的细节 数据设计&#xff1a; 收藏 like1时表示被收藏&#xff0c;展示show0的时候表示表数据被搜索 from peewee import Model, PrimaryKeyField, CharField, BooleanField, MySQLDatabase,Integer…

【区块链+绿色低碳】基于区块链的碳排放管理系统 | FISCO BCOS应用案例

目前业内的碳排放核查方式主要依靠于第三方人工核查、手动填报数据&#xff0c;然后由具备有认证资质的机构进行核验 盖章。但在此过程中存在数据造假的情况&#xff0c;给碳排放量核算的准确性、可靠性带来挑战。 中科易云采用国产开源联盟链 FISCO BCOS&#xff0c;推出基于…

搭建博客系统#Golang

WANLI 博客系统 项目介绍 基于vue3和gin框架开发的前后端分离个人博客系统&#xff0c;包含md格式的文本编辑展示&#xff0c;点赞评论收藏&#xff0c;新闻热点&#xff0c;匿名聊天室&#xff0c;文章搜索等功能。 项目已经部署并运行&#xff0c;快速开发可以查看博客&am…

培训第十一天(nfs与samba共享文件)

上午 1、环境准备 &#xff08;1&#xff09;yum源 &#xff08;一个云仓库pepl仓库&#xff09; [rootweb ~]# vim /etc/yum.repos.d/hh.repo [a]nameabaseurlfile:///mntgpgcheck0[rootweb ~]# vim /etc/fstab /dev/cdrom /mnt iso9660 defaults 0 0[rootweb ~]# mount -a[…

JavaSE--基础语法--继承和多态(第三期)

一.继承 1.1我们为什么需要继承? 首先&#xff0c;Java中使用类对现实世界中实体来进行描述&#xff0c;类经过实例化之后的产物对象&#xff0c;则可以用来表示现实中的实体&#xff0c;但是 现实世界错综复杂&#xff0c;事物之间可能会存在一些关联&#xff0c;那在设计程…

Java之数组应用-冒泡排序-二分查找

冒泡排序 冒泡(Bubble Sort)排序是一种简单排序算法&#xff0c;它通过依次比较交换两个相邻元素实现功能。每一次冒泡会让至少一个元素移动到它应该在的位置上&#xff0c;这样 n 次冒泡就完成了 n 个数据的排序工作。 这个算法的名字由来是因为越小的元素会经由交换慢慢“浮”…

实在智能RPA助力三大运营商用“AI+RPA”打造新质生产力!

近年来&#xff0c;人工智能及自动化技术的突破性进展&#xff0c;正深刻地影响和重塑全球的生活生产模式。 作为我国现代化和数字化进程中的先行军的运营商行业&#xff0c;以中国电信、中国联通和中国移动等为代表的运营商企业&#xff0c;正致力于把握这一历史机遇&#xff…

SpringBoot项目配置多环境env

javaSpringBoot项目多环境配置 为什么maven Profiles 救命项目的pom文件管理 为什么 项目里面需要集成测试环境、开发、生产、多云环境&#xff0c;不仅需要application.yml,还需要加载别的config配置文件 故&#xff0c;我需要便捷多环境配置管理 maven Profiles 救命 项目的…