Hbase 的三个应用
- 分布式自旋锁。
- 分布式的唯一序列号
- 分布式系统黑名单
分布式自旋锁是一种在分布式系统中用于实现并发控制的锁机制。它通过自旋操作来等待锁的释放,并尝试获取锁,以保证共享资源的访问的一致性和正确性。
以下是一个简单的伪代码示例,用于演示分布式自旋锁的使用:
# 定义一个全局变量,用于表示锁的状态
lock_status = "unlocked"# 尝试获取锁的函数
def acquire_lock():global lock_statuswhile True:if lock_status == "unlocked":# 将锁的状态设置为"locked"lock_status = "locked"return# 释放锁的函数
def release_lock():global lock_statuslock_status = "unlocked"# 在多个节点中同时运行以下代码# 尝试获取锁
acquire_lock()# 执行共享资源的操作
# ...# 释放锁
release_lock()
在上述伪代码中,acquire_lock
函数通过自旋操作来等待锁的释放,并在锁可用时获取锁。release_lock
函数用于释放锁,将锁状态设置为可用。在多个节点同时运行这段代码时,只有一个节点能够成功获取锁,其他节点会在获取锁失败后进行自旋操作等待。这样就可以保证共享资源的并发访问的一致性。
分布式的唯一序列号是一个在分布式系统中生成全局唯一标识符(GUID)或序列号的机制。它用于确保在分布式环境下生成的标识符是唯一的,以避免冲突和重复。
以下是一个简单的Java伪代码示例,用于演示如何在分布式系统中生成唯一序列号:
// 生成唯一序列号的函数
public static String generateUniqueSequence() {// 使用UUID库生成唯一标识符String uniqueId = distributeUUID.sequence();return uniqueId;
}// 在多个节点中同时运行以下代码// 生成唯一序列号
String uniqueSequence = generateUniqueSequence();// 输出唯一序列号
System.out.println(uniqueSequence);
在上述Java伪代码中,generateUniqueSequence
函数使用UUID类来生成一个唯一的标识符,该标识符是基于时间和计算机的唯一性。在多个节点同时运行这段代码时,每个节点都可以生成一个唯一的序列号,以确保生成的序列号在整个分布式系统中是唯一的。
分布式系统黑名单是指在分布式系统中对某些IP地址、用户或其他实体进行限制或拒绝访问的机制。它用于阻止恶意用户或恶意行为对系统的攻击或滥用。
以下是一个简单的使用HBase的checkAndMutate实现分布式系统黑名单功能的伪代码示例:
public void checkAndupdate(String custId){
}
在 checkAndupdate 是一个原子性的,当 custId 存在于 黑名单中,则返回 false , 否则返回 true ,并将 custId 插入到黑名单中。
代码
分布式自旋锁。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.Scanner;
import java.util.function.Function;/*** @className: DistributeCAS* @Description:* @Author: wangyifei* @Date: 2023/12/31 12:48* get 'distribute_app:hbase_test', 'global_cas'**/
public class DistributeCAS {private static Logger logger = LoggerFactory.getLogger(DistributeCAS.class);private Configuration configuration = HBaseConfiguration.create();private Connection connection;{configuration.set("hbase.master", "server1:16000");try {connection = ConnectionFactory.createConnection(configuration);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {new DistributeCAS().acquire(new Function<Integer,Integer>(){@Overridepublic Integer apply(Integer integer) {System.out.println("get lock");return 1;}},1);}public void acquire(Function<Integer,Integer> action , Integer input){byte[] tableName = Bytes.toBytes("distribute_app:hbase_test");try {byte[] rowKey = Bytes.toBytes("global_cas");byte[] family = Bytes.toBytes("f1");byte[] qualifier = Bytes.toBytes("cas");byte[] wanted = Bytes.toBytes("0");Table table = connection.getTable(TableName.valueOf(tableName));boolean retry = true ;while(retry){retry = !table.checkAndMutate(rowKey, family).qualifier(qualifier).ifEquals(wanted).thenPut(new Put(rowKey).addColumn(family , qualifier , Bytes.toBytes("1")));logger.info("{} , do not get lock , try again" , Thread.currentThread().getName());}action.apply(input);Scanner scanner = new Scanner(String.valueOf(System.in.read()));table.put(new Put(rowKey).addColumn(family , qualifier, Bytes.toBytes("0")));table.close();} catch (IOException e) {e.printStackTrace();}}
}
上面的代码是一个分布式CAS(CompareAndSet)锁的实现示例。它使用HBase的checkAndMutate
方法来实现锁的获取和释放。
概括地描述代码逻辑如下:
- 创建HBase的连接和配置对象。
- 在
main
方法中,创建DistributeCAS
对象,并调用acquire
方法来获取锁。 acquire
方法接收一个函数和一个输入参数,表示锁获取后需要执行的动作和该动作所需的输入。- 在
acquire
方法中,设置HBase表的相关信息。 - 通过循环使用
checkAndMutate
方法来尝试获取锁,如果获取失败,则继续循环。 - 获取锁后,执行传入的函数。
- 锁释放后,用户输入任意字符,然后将锁的状态设置为可用,并关闭HBase的连接。
这段代码的总结如下:
这段代码实现了基于HBase的分布式CAS锁获取和释放的功能。它通过checkAndMutate
方法来实现原子操作,确保在并发情况下只有一个线程可以获取到锁。它的主要思路是通过循环尝试获取锁,直到成功获取为止。在获取到锁之后,执行用户指定的函数,然后通过用户输入来释放锁的状态。
注意:上述代码中的表名、列族、列限定符等信息需要根据实际情况进行调整和修改。此外,代码中没有处理连接关闭的异常情况,实际应用中应该加入适当的异常处理代码来确保资源的正确关闭。
分布式的唯一序列号
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;/*** @className: GlobalIDGenerator* @Description:* @Author: wangyifei* @Date: 2023/12/31 12:42*/
public class GlobalIDGenerator {private static Logger logger = LoggerFactory.getLogger(GlobalIDGenerator.class);public static void main(String[] args) {new GlobalIDGenerator().IDGenerator();}public void IDGenerator(){Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.master", "server1:16000");try {Connection connection = ConnectionFactory.createConnection(configuration);TableName blacklist = TableName.valueOf("distribute_app:hbase_test");byte[] rowKey = Bytes.toBytes("id_generator");byte[] family = Bytes.toBytes("f1");byte[] quailifier = Bytes.toBytes("id");Table table = connection.getTable(blacklist);Increment increment = new Increment(rowKey);increment.addColumn(family , quailifier , 1L);Result result = table.increment(increment);for (Cell cell : result.rawCells()) {logger.info("quailifier:{} , value:{}", Bytes.toString(CellUtil.cloneQualifier(cell))// 下面的就是这个自动生成的 ID, Bytes.toLong(CellUtil.cloneValue(cell)));}table.close();connection.close();} catch (IOException e) {e.printStackTrace();}}
}
上述代码是一个使用HBase实现全局唯一ID生成器的示例。
这段代码的总结如下:
这段代码实现了使用HBase作为后端存储的全局唯一ID生成器。它利用了 HBase 的 increment 功能,生成一个全局递增的原子操作。
分布式系统黑名单
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;/*** @className: Testmain* @Description:* @Author: wangyifei* @Date: 2023/12/29 21:13** Hbase table ddl* create 'distribute_app:hbase_test', {NAME => 'f1', VERSIONS => 5}****/
public class TesBlacklist {private static Logger logger = LoggerFactory.getLogger(DistributeCAS.class);public static void main(String[] args) {new TesBlacklist().testBlacklist(args);}public void testBlacklist(String[] args){Configuration configuration = HBaseConfiguration.create();
// configuration.set("hbase.zookeeper.quorum", "192.168.175.113");
// configuration.set("hbase.zookeeper.quorum", "server1");
// configuration.set("hbase.zookeeper.property.clientPort", "2181");configuration.set("hbase.master", "server1:16000");try {Connection connection = ConnectionFactory.createConnection(configuration);TableName blacklist = TableName.valueOf("distribute_app:hbase_test");byte[] rowKey = Bytes.toBytes(args[0]);byte[] family = Bytes.toBytes("f1");byte[] quailifier = Bytes.toBytes("exits");byte[] value = Bytes.toBytes(args[0]);Table table = connection.getTable(blacklist);boolean b = table.checkAndMutate(rowKey, family).qualifier(quailifier).ifNotExists().thenPut(new Put(rowKey).addColumn(family, quailifier, value));logger.info("b:{}",b);logger.info("cust_001 is {} in blacklist", b?" not " : "");table.close();connection.close();} catch (IOException e) {e.printStackTrace();}}
}
上述代码利用了 Hbase 的 checkAndMutate 功能,先检查 cust 是否存在,如果存在则返回 true ,用户需要被屏蔽,如果不存在,则返回 false ,并将用户插入到 Hbase table 中,整个过程是原子性的。