package com.chu.cassandratest;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.transport.TTransportException;
/**
* cassandra连接池
* @author chuer
* @date 2014年12月31日 上午10:05:26
*/
public class CassandraConnectionPool {
Semaphore access = null;
CassandraConnection[] pool = null;
boolean[] used = null;
int round = 0;
int conn_num = 0;
public CassandraConnectionPool(int conn_num) {
this.conn_num = conn_num;
init();
}
private void init() {
access = new Semaphore(conn_num);//有几个连接就允许有几个线程同时访问连接池。
pool = new CassandraConnection[conn_num];
used = new boolean[conn_num];
for (int i = 0; i < pool.length; i++) {
try {
pool[i] = new CassandraConnection(CassandraConnection.HOST);
} catch (TTransportException e) {
e.printStackTrace();
}
}
}
public CassandraConnection getConn() throws InterruptedException {
if (access.tryAcquire(3, TimeUnit.SECONDS)) {
synchronized (this) {
for (int i = 0; i < pool.length; i++) {
if (!used[i]) {
used[i] = true;
return pool[i];
}
}
}
}
throw new RuntimeException("all client is too busy");
}
public void releaseConn(CassandraConnection client) {
boolean released = false;
synchronized (this) {
for (int i = 0; i < pool.length; i++) {
if (client == pool[i] && used[i]) {
used[i] = false;
released = true;
break;
}
}
}
if (released)
access.release();
}
public void shutdownPool() {
if (pool != null) {
for (int i = 0; i < pool.length; i++){
pool[i].close();
}
}
}
}
本文代码,摘抄修改于网上的另一篇文章,网络在哪里了,恕不能注明出处。
package com.chu.cassandratest;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
/**
* cassandra连接
* @author chuer
* @date 2014年12月31日 上午10:09:47
*/
public class CassandraConnection {
private Cassandra.Client client = null;
private TTransport tr = null;
private String host = null;
public final static String HOST = "127.0.0.1";
public CassandraConnection(String host) throws TTransportException {
this.host = host;
this.connect(host);
}
private synchronized void connect(String host) throws TTransportException {
tr = new TSocket(host, 9160);
TProtocol proto = new TBinaryProtocol(tr);
client = new Cassandra.Client(proto);
tr.open();
}
public synchronized void close() {
if (tr != null && tr.isOpen())
tr.close();
}
public Cassandra.Client getClient() {
return client;
}
public String getHost() {
return host;
}
}
package com.chu.cassandratest;
import org.apache.cassandra.thrift.Cassandra.Client;
public class TestCassandraPool {
public static void main(String[] args)throws Exception {
CassandraConnectionPool pool = new CassandraConnectionPool(10);
CassandraConnection conn1 = pool.getConn();
CassandraConnection conn2 = pool.getConn();
CassandraConnection conn3 = pool.getConn();
CassandraConnection conn4 = pool.getConn();
CassandraConnection conn5 = pool.getConn();
Client client1 = conn1.getClient();
Client client2 = conn2.getClient();
Client client3 = conn3.getClient();
Client client4 = conn4.getClient();
Client client5 = conn5.getClient();
System.out.println(client1);
System.out.println(client2);
System.out.println(client3);
System.out.println(client4);
System.out.println(client5);
pool.releaseConn(conn1);
pool.releaseConn(conn2);
pool.releaseConn(conn3);
pool.releaseConn(conn4);
pool.releaseConn(conn5);
}
}
输出如下:
org.apache.cassandra.thrift.Cassandra$Client@1c160cb
org.apache.cassandra.thrift.Cassandra$Client@b9b67b
org.apache.cassandra.thrift.Cassandra$Client@7b4ed7
org.apache.cassandra.thrift.Cassandra$Client@17535b6
org.apache.cassandra.thrift.Cassandra$Client@1979eb