文章目录
- 1. 安装
- 1.1 官方文档
- 1.2 安装前提
- 1.3 安装cassandra
- 2. 集群配置
- 3. Java客户端
- 3.1 Maven依赖
- 3.2 客户端代码
- 4. 性能测试
- 4.1 压测结论
- 4.2 压测代码
1. 安装
1.1 官方文档
http://cassandra.apache.org/doc/latest/getting_started/index.html
1.2 安装前提
- 安装Java 8
$java -version
java version "1.8.0_102"
Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
- 安装Python 2.7
$python -V
Python 2.7.17
1.3 安装cassandra
- 下载
wget http://mirrors.tuna.tsinghua.edu.cn/apache/cassandra/3.11.5/apache-cassandra-3.11.5-bin.tar.gz
- 解压即安装
tar -xzvf apache-cassandra-3.11.5-bin.tar.gz
- 启动脚本在cassandra的根目录的bin下
./bin/cassandra -f
2. 集群配置
-
案例: 配置了3节点集群
- 位于tdatanode1、tdatanode2、tdatanode3
- tdatanode1、tdatanode2做为种子节点
- 3个安装过程同上
-
配置文件修改, $CASSANDRA_HOME/conf/cassandra.yaml
- 所有机器使用相同的集群名称
cluster_name: 'utrackRealTime'
- 所有机器设置seed,均为tdatanode1、tdatanode2
seed_provider:- class_name: org.apache.cassandra.locator.SimpleSeedProviderparameters:- seeds: "tdatanode1,tdatanode2"
- 监听地址设置为本机IP或者host名称
listen_address: tdatanode1rpc_address: tdatanode1
-
启动集群
# -f的意思的前台运行,测试时为了更方便的查看问题
./bin/cassandra -f
- 交互式终端cqlsh连接
# 如果不是连本机,通过环境变量$CQLSH_HOST、$CQLSH_PORT指定主机和端口
cqlsh --request-timeout=600
# 指定机器
cqlsh tdatanode1 --request-timeout=60000
3. Java客户端
3.1 Maven依赖
<dependency><groupId>com.datastax.cassandra</groupId><artifactId>cassandra-driver-core</artifactId><version>3.1.0</version>
</dependency>
3.2 客户端代码
- 创建Session,应该是线程安全的,可以重用
PoolingOptions options = new PoolingOptions();
options.setNewConnectionThreshold(HostDistance.LOCAL,1800);
options.setCoreConnectionsPerHost(HostDistance.LOCAL,5);
options.setMaxConnectionsPerHost(HostDistance.LOCAL,5);Cluster.Builder bulder = Cluster.builder().withPoolingOptions(options).addContactPoint("192.168.36.174").withPort(9042);
Cluster cluster = builder.build();
Session session = cluster.connect();
- 执行操作
ResultSet rs = session.execute("select * FROM utrack.odl_user_hj");
for (Row r : rs.all()) {int size = r.getColumnDefinitions().size();for (int i = 0; i < size; i++) {r.getObject(i).toString();}
}
- 进程退出时的清理
session.close();
cluster.close();
4. 性能测试
4.1 压测结论
连接 | 并发 | QPS |
---|---|---|
1 | 1 | 1452 |
1 | 5 | 8182 |
1 | 10 | 14806 |
1 | 20 | 23584 |
1 | 30 | 32341 |
1 | 40 | 35945 |
1 | 50 | 33647 |
单连接、3节点、普通HDD、单JVM分配8G内存的情况下,并发30~40基本到最高性能, 35K QPS
连接数 | 并发数 | QPS | 备注 |
---|---|---|---|
1 | 40 | 35945 | |
L -5 | 40 | 36179 | - 设置LOCAL最大连接数没有用,估计是 |
L -5 | 40 | ||
NewConnectionThreshold=0 | 25994 | ||
NewConnectionThreshold=1 | 29620 | ||
NewConnectionThreshold=10 | 32840 | ||
NewConnectionThreshold=800 | 37481 | ||
NotSet | 37407 | ||
NewConnectionThreshold=1800 | 37000 | - 阈值到800后再增长,对qps没有明显影响 | |
R 5-10 | 40 | 35842 | |
L 5-10 | 40 | 24113 | - LOCAL coreConnect设置为5的情况下,性能反而差了。 |
不确定系统瓶颈的情况下,不要随意调优,Connection和新建Connection的Threshold调整可能不但不提升性能,反而降低
4.2 压测代码
- 主类
public class ReadPerformance {private static ExecutorService es = Executors.newFixedThreadPool(40);public static void main(String[] args) throws InterruptedException {Session s = CassandraConnectionManager.getSession();PreparedStatement preparedStatement = s.prepare("select * from utrack.simple_test where domain = ?");long start = System.currentTimeMillis();for (int i = 0; i < 10_0000; i++) {final int index = i;es.submit(() -> {ResultSet rs = s.execute(preparedStatement.bind((int) (Math.random() * 200_0000)));rs.one().getObject(0);if (index % 1000 == 999) System.out.println("index: " + index + ", completed");});}System.out.println("await termination....");es.shutdown();if (!es.awaitTermination(1, TimeUnit.HOURS)) {System.out.println("await timeout....");es.shutdownNow();}long last = System.currentTimeMillis() - start;System.out.println("time used: " + last + ", qps:" + (10_0000_000 / last));CassandraConnectionManager.destory();}
}
- 工具类
public class CassandraConnectionManager {public static Session getSession() {return Holder.session;}public static Cluster getCluster() {return Holder.cluster;}public static void destory() {Holder.destroy();}private static class Holder {private static Cluster cluster;private static Session session;static {PoolingOptions options = new PoolingOptions();options.setNewConnectionThreshold(HostDistance.LOCAL,1800);
// options.setCoreConnectionsPerHost(HostDistance.LOCAL,5);options.setMaxConnectionsPerHost(HostDistance.LOCAL,5);Cluster.Builder bulder = Cluster.builder().withPoolingOptions(options).addContactPoint("192.168.36.174").withPort(9042);cluster = bulder.build();session = cluster.connect();}private static void destroy() {if (session != null) session.close();if (cluster != null) cluster.close();}}}