1.什么是kudu
在Kudu出现前,由于传统存储系统的局限性,对于数据的快速输入和分析还没有一个完美的解决方案,要么以缓慢的数据输入为代价实现快速分析,要么以缓慢的分析为代价实现数据快速输入。随着快速输入和分析场景越来越多,传统存储层的局限性越来越明显,Kudu应运而生,它的定位介于HDFS和HBase之间,将低延迟随机访问,逐行插入、更新和快速分析扫描融合到一个存储层中,是一个既支持随机读写又支持OLAP分析的存储引擎
kudu应用场景
- 适用于那些既有随机访问,也有批量数据扫描的复合场景
- 高计算量的场景
- 使用了高性能的存储设备,包括使用更多的内存
- 支持数据更新,避免数据反复迁移
- 支持跨地域的实时数据备份和查询
架构
1、Table
表(Table)是数据库中用来存储数据的对象,是有结构的数据集合。kudu中的表具有schema和全局有序的primary key(主键)。kudu中一个table会被水平分成多个被称之为tablet的片段。
2、Tablet
一个 tablet 是一张 table连续的片段,tablet是kudu表的水平分区,类似于HBase的region。每个tablet存储着一定连续range的数据(key),且tablet两两间的range不会重叠。一张表的所有tablet包含了这张表的所有key空间 tablet 会冗余存储。放置到多个 tablet server上,并且在任何给定的时间点,其中一个副本被认为是leader tablet,其余的被认之为follower tablet。每个tablet都可以进行数据的读请求,但只有Leader tablet负责写数据请求
3、Tablet Server
tablet server负责数据存储,并提供数据读写服务 一个 tablet server 存储了table表的tablet,向kudu client 提供读取数据服务。对于给定的 tablet,一个tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本 只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着
4、Master Server
集群中负责集群管理、元数据管理等功能
2.环境安装
采用docker-composer安装
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
version: "3"
services:kudu-master-1:image: apache/kudu:${KUDU_QUICKSTART_VERSION:-latest}ports:- "7051:7051"- "8051:8051"command: ["master"]volumes:- kudu-master-1:/var/lib/kuduenvironment:- KUDU_MASTERS=kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251# TODO: Use `host.docker.internal` instead of KUDU_QUICKSTART_IP when it# works on Linux (https://github.com/docker/for-linux/issues/264)- >MASTER_ARGS=--fs_wal_dir=/var/lib/kudu/master--rpc_bind_addresses=0.0.0.0:7051--rpc_advertised_addresses=${KUDU_QUICKSTART_IP:?Please set KUDU_QUICKSTART_IP environment variable}:7051--webserver_port=8051--webserver_advertised_addresses=${KUDU_QUICKSTART_IP}:8051--webserver_doc_root=/opt/kudu/www--stderrthreshold=0--use_hybrid_clock=false--unlock_unsafe_flags=truekudu-master-2:image: apache/kudu:${KUDU_QUICKSTART_VERSION:-latest}ports:- "7151:7151"- "8151:8151"command: ["master"]volumes:- kudu-master-2:/var/lib/kuduenvironment:- KUDU_MASTERS=kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251- >MASTER_ARGS=--fs_wal_dir=/var/lib/kudu/master--rpc_bind_addresses=0.0.0.0:7151--rpc_advertised_addresses=${KUDU_QUICKSTART_IP}:7151--webserver_port=8151--webserver_advertised_addresses=${KUDU_QUICKSTART_IP}:8151--webserver_doc_root=/opt/kudu/www--stderrthreshold=0--use_hybrid_clock=false--unlock_unsafe_flags=truekudu-master-3:image: apache/kudu:${KUDU_QUICKSTART_VERSION:-latest}ports:- "7251:7251"- "8251:8251"command: ["master"]volumes:- kudu-master-3:/var/lib/kuduenvironment:- KUDU_MASTERS=kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251- >MASTER_ARGS=--fs_wal_dir=/var/lib/kudu/master--rpc_bind_addresses=0.0.0.0:7251--rpc_advertised_addresses=${KUDU_QUICKSTART_IP}:7251--webserver_port=8251--webserver_advertised_addresses=${KUDU_QUICKSTART_IP}:8251--webserver_doc_root=/opt/kudu/www--stderrthreshold=0--use_hybrid_clock=false--unlock_unsafe_flags=truekudu-tserver-1:image: apache/kudu:${KUDU_QUICKSTART_VERSION:-latest}depends_on:- kudu-master-1- kudu-master-2- kudu-master-3ports:- "7050:7050"- "8050:8050"command: ["tserver"]volumes:- kudu-tserver-1:/var/lib/kuduenvironment:- KUDU_MASTERS=kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251- >TSERVER_ARGS=--fs_wal_dir=/var/lib/kudu/tserver--rpc_bind_addresses=0.0.0.0:7050--rpc_advertised_addresses=${KUDU_QUICKSTART_IP}:7050--webserver_port=8050--webserver_advertised_addresses=${KUDU_QUICKSTART_IP}:8050--webserver_doc_root=/opt/kudu/www--stderrthreshold=0--use_hybrid_clock=false--unlock_unsafe_flags=truekudu-tserver-2:image: apache/kudu:${KUDU_QUICKSTART_VERSION:-latest}depends_on:- kudu-master-1- kudu-master-2- kudu-master-3ports:- "7150:7150"- "8150:8150"command: ["tserver"]volumes:- kudu-tserver-2:/var/lib/kuduenvironment:- KUDU_MASTERS=kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251- >TSERVER_ARGS=--fs_wal_dir=/var/lib/kudu/tserver--rpc_bind_addresses=0.0.0.0:7150--rpc_advertised_addresses=${KUDU_QUICKSTART_IP}:7150--webserver_port=8150--webserver_advertised_addresses=${KUDU_QUICKSTART_IP}:8150--webserver_doc_root=/opt/kudu/www--stderrthreshold=0--use_hybrid_clock=false--unlock_unsafe_flags=truekudu-tserver-3:image: apache/kudu:${KUDU_QUICKSTART_VERSION:-latest}depends_on:- kudu-master-1- kudu-master-2- kudu-master-3ports:- "7250:7250"- "8250:8250"command: ["tserver"]volumes:- kudu-tserver-3:/var/lib/kuduenvironment:- KUDU_MASTERS=kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251- >TSERVER_ARGS=--fs_wal_dir=/var/lib/kudu/tserver--rpc_bind_addresses=0.0.0.0:7250--rpc_advertised_addresses=${KUDU_QUICKSTART_IP}:7250--webserver_port=8250--webserver_advertised_addresses=${KUDU_QUICKSTART_IP}:8250--webserver_doc_root=/opt/kudu/www--stderrthreshold=0--use_hybrid_clock=false--unlock_unsafe_flags=truekudu-tserver-4:image: apache/kudu:${KUDU_QUICKSTART_VERSION:-latest}depends_on:- kudu-master-1- kudu-master-2- kudu-master-3ports:- "7350:7350"- "8350:8350"command: ["tserver"]volumes:- kudu-tserver-4:/var/lib/kuduenvironment:- KUDU_MASTERS=kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251- >TSERVER_ARGS=--fs_wal_dir=/var/lib/kudu/tserver--rpc_bind_addresses=0.0.0.0:7350--rpc_advertised_addresses=${KUDU_QUICKSTART_IP}:7350--webserver_port=8350--webserver_advertised_addresses=${KUDU_QUICKSTART_IP}:8350--webserver_doc_root=/opt/kudu/www--stderrthreshold=0--use_hybrid_clock=false--unlock_unsafe_flags=truekudu-tserver-5:image: apache/kudu:${KUDU_QUICKSTART_VERSION:-latest}depends_on:- kudu-master-1- kudu-master-2- kudu-master-3ports:- "7450:7450"- "8450:8450"command: ["tserver"]volumes:- kudu-tserver-5:/var/lib/kuduenvironment:- KUDU_MASTERS=kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251- >TSERVER_ARGS=--fs_wal_dir=/var/lib/kudu/tserver--rpc_bind_addresses=0.0.0.0:7450--rpc_advertised_addresses=${KUDU_QUICKSTART_IP}:7450--webserver_port=8450--webserver_advertised_addresses=${KUDU_QUICKSTART_IP}:8450--webserver_doc_root=/opt/kudu/www--stderrthreshold=0--use_hybrid_clock=false--unlock_unsafe_flags=true
volumes:kudu-master-1:kudu-master-2:kudu-master-3:kudu-tserver-1:kudu-tserver-2:kudu-tserver-3:kudu-tserver-4:kudu-tserver-5:
windows set env
$env:KUDU_QUICKSTART_VERSION = "1.12.0"
$env:KUDU_QUICKSTART_IP= "10.11.68.77"
Get-ChildItem Env:
linux set env
export KUDU_QUICKSTART_VERSION="1.12.0"
export KUDU_QUICKSTART_IP=$(ifconfig | grep "inet " | grep -Fv 127.0.0.1 | awk '{print $2}' | tail -1)
run
docker-compose -f docker/quickstart.yml up -d
stop
docker-compose -f docker/quickstart.yml down
访问http://localhost:8051/
3.代码工程
实验目的
实现java对kudu的创建表 ,插入,查询,修改操作
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>kudu</artifactId><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><kudu-version>1.12.0</kudu-version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-client</artifactId><version>${kudu-version}</version></dependency><!-- For logging messages. --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.30</version></dependency><dependency><groupId>org.apache.kudu</groupId><artifactId>kudu-test-utils</artifactId><version>${kudu-version}</version><scope>test</scope></dependency></dependencies></project>
example.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.package com.et.kudu;import java.util.ArrayList;
import java.util.List;import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduPredicate.ComparisonOp;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
import org.apache.kudu.client.SessionConfiguration.FlushMode;/** A simple example of using the synchronous Kudu Java client to* - Create a table.* - Insert rows.* - Alter a table.* - Scan rows.* - Delete a table.*/
public class Example {private static final Double DEFAULT_DOUBLE = 12.345;private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "localhost:7051");static void createExampleTable(KuduClient client, String tableName) throws KuduException {// Set up a simple schema.List<ColumnSchema> columns = new ArrayList<>(2);columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).nullable(true).build());Schema schema = new Schema(columns);// Set up the partition schema, which distributes rows to different tablets by hash.// Kudu also supports partitioning by key range. Hash and range partitioning can be combined.// For more information, see http://kudu.apache.org/docs/schema_design.html.CreateTableOptions cto = new CreateTableOptions();List<String> hashKeys = new ArrayList<>(1);hashKeys.add("key");int numBuckets = 8;cto.addHashPartitions(hashKeys, numBuckets);// Create the table.client.createTable(tableName, schema, cto);System.out.println("Created table " + tableName);}static void insertRows(KuduClient client, String tableName, int numRows) throws KuduException {// Open the newly-created table and create a KuduSession.KuduTable table = client.openTable(tableName);KuduSession session = client.newSession();session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND);for (int i = 0; i < numRows; i++) {Insert insert = table.newInsert();PartialRow row = insert.getRow();row.addInt("key", i);// Make even-keyed row have a null 'value'.if (i % 2 == 0) {row.setNull("value");} else {row.addString("value", "value " + i);}session.apply(insert);}// Call session.close() to end the session and ensure the rows are// flushed and errors are returned.// You can also call session.flush() to do the same without ending the session.// When flushing in AUTO_FLUSH_BACKGROUND mode (the mode recommended// for most workloads, you must check the pending errors as shown below, since// write operations are flushed to Kudu in background threads.session.close();if (session.countPendingErrors() != 0) {System.out.println("errors inserting rows");org.apache.kudu.client.RowErrorsAndOverflowStatus roStatus = session.getPendingErrors();org.apache.kudu.client.RowError[] errs = roStatus.getRowErrors();int numErrs = Math.min(errs.length, 5);System.out.println("there were errors inserting rows to Kudu");System.out.println("the first few errors follow:");for (int i = 0; i < numErrs; i++) {System.out.println(errs[i]);}if (roStatus.isOverflowed()) {System.out.println("error buffer overflowed: some errors were discarded");}throw new RuntimeException("error inserting rows to Kudu");}System.out.println("Inserted " + numRows + " rows");}static void scanTableAndCheckResults(KuduClient client, String tableName, int numRows) throws KuduException {KuduTable table = client.openTable(tableName);Schema schema = table.getSchema();// Scan with a predicate on the 'key' column, returning the 'value' and "added" columns.List<String> projectColumns = new ArrayList<>(2);projectColumns.add("key");projectColumns.add("value");projectColumns.add("added");int lowerBound = 0;KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(schema.getColumn("key"),ComparisonOp.GREATER_EQUAL,lowerBound);int upperBound = numRows / 2;KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(schema.getColumn("key"),ComparisonOp.LESS,upperBound);KuduScanner scanner = client.newScannerBuilder(table).setProjectedColumnNames(projectColumns).addPredicate(lowerPred).addPredicate(upperPred).build();// Check the correct number of values and null values are returned, and// that the default value was set for the new column on each row.// Note: scanning a hash-partitioned table will not return results in primary key order.int resultCount = 0;int nullCount = 0;while (scanner.hasMoreRows()) {RowResultIterator results = scanner.nextRows();while (results.hasNext()) {RowResult result = results.next();if (result.isNull("value")) {nullCount++;}double added = result.getDouble("added");if (added != DEFAULT_DOUBLE) {throw new RuntimeException("expected added=" + DEFAULT_DOUBLE +" but got added= " + added);}resultCount++;}}int expectedResultCount = upperBound - lowerBound;if (resultCount != expectedResultCount) {throw new RuntimeException("scan error: expected " + expectedResultCount +" results but got " + resultCount + " results");}int expectedNullCount = expectedResultCount / 2 + (numRows % 2 == 0 ? 1 : 0);if (nullCount != expectedNullCount) {throw new RuntimeException("scan error: expected " + expectedNullCount +" rows with value=null but found " + nullCount);}System.out.println("Scanned some rows and checked the results");}public static void main(String[] args) {System.out.println("-----------------------------------------------");System.out.println("Will try to connect to Kudu master(s) at " + KUDU_MASTERS);System.out.println("Run with -DkuduMasters=master-0:port,master-1:port,... to override.");System.out.println("-----------------------------------------------");String tableName = "java_example-" + System.currentTimeMillis();KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTERS).build();try {createExampleTable(client, tableName);int numRows = 150;insertRows(client, tableName, numRows);// Alter the table, adding a column with a default value.// Note: after altering the table, the table needs to be re-opened.AlterTableOptions ato = new AlterTableOptions();ato.addColumn("added", org.apache.kudu.Type.DOUBLE, DEFAULT_DOUBLE);client.alterTable(tableName, ato);System.out.println("Altered the table");scanTableAndCheckResults(client, tableName, numRows);} catch (Exception e) {e.printStackTrace();} finally {try {client.deleteTable(tableName);System.out.println("Deleted the table");} catch (Exception e) {e.printStackTrace();} finally {try {client.shutdown();} catch (Exception e) {e.printStackTrace();}}}}
}
以上只是一些关键代码,所有代码请参见下面代码仓库
代码仓库
- GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.
4.测试
测试创建表
@Test
public void testCreateExampleTable() throws KuduException {String tableName = "test_create_example";Example.createExampleTable(client, tableName);assertTrue(client.tableExists(tableName));
}
插入数据
@Test
public void testInsertRows() throws KuduException {String tableName = "test_create_example";// Example.insertRows(client,tableName,100);System.out.println(client.getTableStatistics(tableName).getLiveRowCount());
}
dashboard可以看到我们创建的表和记录条数
5.引用
- Apache Kudu - Fast Analytics on Fast Data
- Spring Boot集成kudu快速入门Demo | Harries Blog™