基于HBase和ElasticSearch构建大数据实时检索项目
- 一、项目说明
- 二、环境搭建
- 三、编写程序
- 四、测试流程
一、项目说明
- 利用HBase存储海量数据,解决海量数据存储和实时更新查询的问题;
- 利用ElasticSearch作为HBase索引,加快大数据集中实时查询数据;
- 使用到的大数据组件有:Hadoop-2.7.3、HBase-1.3.1、zookeeper-3.4.5、ElasticSearch-7.8.0
- 实验环境:
虚拟机(操作系统CentOS7.6) + 个人PC(Windows)+ Eclipse或者Idea - 大数据环境:3节点构成的
全分布式环境
- 项目系统架构图如下:
- 本项目是利用hbase和elasticsearch的API来完成数据的写入和检索
二、环境搭建
- 创建3台虚拟机,即3节点,主节点内存4G、从节点内存3G,可根据自己电脑配置来设置;
- 安装部署Hadoop全分布式,可参考:Hadoop2.7.3全分布式环境搭建
- 安装部署zookeeper全分布式,可参考:Zookeeper的集群安装
- 安装部署HBase全分布式,可参考:HBase几种安装方式,
注意
:需要先安装zookeeper并启动后,再安装和启动hbase - 安装部署ElasticSearch集群,可参考:Linux下安装ElasticSearch集群,
注意:需要使用es普通用户启动集群
,安装成功后各个节点上启动
三、编写程序
本项目是在eclipse上编写
-
构建maven工程,配置settings.xml(可配置阿里或华为maven仓库),如下所示:
<?xml version="1.0" encoding="utf-8"?> <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"><mirrors><mirror> <id>nexus-aliyun</id> <mirrorOf>central</mirrorOf> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </mirror> <mirror> <id>net-cn</id> <mirrorOf>central</mirrorOf> <name>Nexus net</name> <url>http://maven.net.cn/content/groups/public/</url> </mirror> </mirrors> <profiles> <profile> <repositories> <repository> <id>nexus</id> <name>local private nexus</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>nexus</id> <name>local private nexus</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> </profile> </profiles> <activeProfiles> <activeProfile>nexus</activeProfile> </activeProfiles> </settings>
-
添加依赖到pom.xml中,如下所示:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bigdata</groupId><artifactId>realtimesearch</artifactId><version>1.0-SNAPSHOT</version><!-- Spring boot 父引用 --><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.4.0.RELEASE</version></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><!--仓库源--><repositories><repository><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></repository></repositories><dependencies><!-- Spring boot 核心web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 解决thymeleaf模板引擎对h5页面检查太严格问题 --><dependency><groupId>net.sourceforge.nekohtml</groupId><artifactId>nekohtml</artifactId><version>1.9.22</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><!-- HBase --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version><exclusions><exclusion><artifactId>hadoop-mapreduce-client-core</artifactId><groupId>org.apache.hadoop</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-protocol</artifactId><version>1.3.1</version></dependency><!-- ElasticSearch --><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>7.8.0</version></dependency><!-- 解锁ES运行时没有对应方法的的错误 --><dependency><groupId>org.locationtech.spatial4j</groupId><artifactId>spatial4j</artifactId><version>0.6</version></dependency><!-- zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.9</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><!-- 解决ES和HBase中 io.netty包冲突 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.16.Final</version></dependency><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.13</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>C:\Program Files\Java\jdk1.8.0_301\lib\tools.jar</systemPath></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource></resources></build> </project>
-
新建data目录,并将测试数据放在该目录下,如下图所示:测试数据下载
-
添加各类配置文件,如
conf.properties
、application.properties
、log4j.properties
、log4j2.properties
等,如下图所示:
-
配置conf.properties,内容如下:
#原始数据路径inputPath =data/#HBase的配置#通过CloudTable服务列表获取的ZK连接地址,运行后可看到日志打印具体内网地址ZKServer=hostname01:2181,hostname02:2181,hostname03:2181#HBase表名tableName=PublicSecurity#HBase列族columnFamily1=BasiccolumnFamily2=OtherInfo#ElasticSearch的配置,如ES集群名称,虚拟机IP,默认端口clusterName=Es-clusterhostName=192.168.1.109tcpPort=9300indexName=publicsecuritytypeName=info
-
配置application.properties,内容如下:
server.port=8084server.contextPath=/bigdata#web页面热布署spring.thymeleaf.cache=false#解决html5检查太严格问题spring.thymeleaf.mode = LEGACYHTML5
-
配置log4j.properties,内容如下:
log4j.rootLogger=INFO,consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
配置log4j2.properties,内容如下:
name = PropertiesConfigproperty.filename = target/logs#appenders = console, file#配置值是appender的类型,并不是具体appender实例的nameappenders = rollingappender.rolling.type = RollingFileappender.rolling.name = RollingLogFileappender.rolling.fileName=${filename}/automationlogs.logappender.rolling.filePattern = ${filename}/automationlogs-%d{MM-dd-yy-HH-mm-ss}-%i.logappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 5rootLogger.level = INFO,consolerootLogger.appenderRef.rolling.ref = rollingrootLogger.appenderRef.rolling.ref = RollingLogFile
-
编写读取配置文件的工具类ConstantUtil,代码如下:
package com.bigdata.utils;import org.apache.log4j.PropertyConfigurator;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.FileInputStream;import java.io.IOException;import java.util.Properties;/*** {@docRoot 用于读取配置内容}* @author suben*/public class ConstantUtil {public static final Properties PROPS = new Properties();public static final Logger LOG = LoggerFactory.getLogger(ConstantUtil.class);public static final String INPUT_PATH;public static final String ZK_SERVER;public static final String TABLE_NAME;public static final String COLUMN_FAMILY_1;public static final String COLUMN_FAMILY_2;public static final String INDEX_NAME;public static final String TYPE_NAME;//ES集群名,默认值elasticsearchpublic static final String CLUSTER_NAME;//ES集群中某个节点public static final String HOSTNAME;//ES连接端口号public static final int TCP_PORT;static {try {//加载日志配置PropertyConfigurator.configure(ConstantUtil.class.getClassLoader().getResource("log4j.properties").getPath());//加载连接配置PROPS.load(new FileInputStream(ConstantUtil.class.getClassLoader().getResource("conf.properties").getPath()));} catch (IOException e) {e.printStackTrace();}INPUT_PATH = PROPS.getProperty("inputPath");ZK_SERVER = PROPS.getProperty("ZKServer");TABLE_NAME = PROPS.getProperty("tableName");INDEX_NAME = PROPS.getProperty("indexName").toLowerCase();TYPE_NAME = PROPS.getProperty("typeName");COLUMN_FAMILY_1 = PROPS.getProperty("columnFamily1");COLUMN_FAMILY_2 = PROPS.getProperty("columnFamily2");CLUSTER_NAME = PROPS.getProperty("clusterName");HOSTNAME = PROPS.getProperty("hostName");TCP_PORT = Integer.valueOf(PROPS.getProperty("tcpPort"));}}
-
编写HBase工具类,代码如下:
package com.bigdata.utils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class HBaseUtil {public static Admin admin = null;public static Configuration conf = null;public static Connection conn = null;private HashMap<String, Table> tables = null;private static final Logger LOG = ConstantUtil.LOG;public HBaseUtil() {this(ConstantUtil.ZK_SERVER);}public HBaseUtil(String zkServer) {init(zkServer);}private void ifNotConnTableJustConn(String tableName) {if (!tables.containsKey(tableName)) {this.addTable(tableName);}}public Table getTable(String tableName) {ifNotConnTableJustConn(tableName);return tables.get(tableName);}public void addTable(String tableName) {try {tables.put(tableName, conn.getTable(TableName.valueOf(tableName)));} catch (IOException e) {e.printStackTrace();}}public boolean put(String tableName, List<Put> putList) throws Exception {boolean res = false;ifNotConnTableJustConn(tableName);try {getTable(tableName).put(putList);res = true;} catch (IOException e) {e.printStackTrace();}return res;}public Result get(String tableName, String row) throws IOException {Result result = null;ifNotConnTableJustConn(tableName);Table newTable = getTable(tableName);Get get = new Get(Bytes.toBytes(row));try {result = newTable.get(get);KeyValue[] raw = result.raw();} catch (IOException e) {e.printStackTrace();}return result;}public boolean createTable(String tableName, String... columnFamilys) {boolean result = false;try {if (admin.tableExists(TableName.valueOf(tableName))) {LOG.info(tableName + "表已经存在!");} else {HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));for (String columnFamily : columnFamilys) {tableDesc.addFamily(new HColumnDescriptor(columnFamily.getBytes()));}admin.createTable(tableDesc);result = true;LOG.info(tableName + "表创建成功!");}} catch (IOException e) {e.printStackTrace();LOG.info(tableName + "表创建失败 !");}return result;}public boolean tableExists(String tableName) throws IOException {return admin.tableExists(TableName.valueOf(tableName));}public void disableTable(String tableName) throws IOException {if (tableExists(tableName)) {admin.disableTable(TableName.valueOf(tableName));}}/*** 删除表** @param tableName*/public void deleteTable(String tableName) throws IOException {disableTable(tableName);admin.deleteTable(TableName.valueOf(tableName));}/*** 查询所有表名** @return* @throws Exception*/public List<String> getALLTableName() throws Exception {ArrayList<String> tableNames = new ArrayList<String>();if (admin != null) {HTableDescriptor[] listTables = admin.listTables();if (listTables.length > 0) {for (HTableDescriptor tableDesc : listTables) {tableNames.add(tableDesc.getNameAsString());}}}return tableNames;}/*** 删除所有表,慎用!仅用于测试环境*/public void deleteAllTable() throws Exception {List<String> allTbName = getALLTableName();for (String s : allTbName) {LOG.info("Start delete table : " + s + "......");deleteTable(s);LOG.info("done delete table : " + s);}}/*** 初始化配置** @param zkServer*/public void init(String zkServer) {tables = new HashMap<String, Table>();conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", zkServer);try {conn = ConnectionFactory.createConnection(conf);admin = conn.getAdmin();} catch (IOException e) {e.printStackTrace();}}/*** 清理所有连接** @throws IOException*/public void clear() throws IOException {for (Map.Entry<String, Table> m : tables.entrySet()) {m.getValue().close();}admin.close();conn.close();conf.clear();}/*** 关卡登记信息bayonet:姓名,身份证号,年龄,性别,关卡号,日期时间,通关形式* 住宿登记信息hotel:姓名,身份证号,年龄,性别,起始日期,结束日期,同行人* 网吧登记信息internet:姓名,身份证号,年龄,性别,网吧名,日期,逗留时长*///用于提前建好表和列族public static void preDeal() throws Exception {HBaseUtil hBaseUtils = new HBaseUtil();hBaseUtils.createTable(ConstantUtil.TABLE_NAME, ConstantUtil.COLUMN_FAMILY_1, ConstantUtil.COLUMN_FAMILY_2);}//测试public static void test() throws Exception {HBaseUtil hBaseUtils = new HBaseUtil();long startTime = System.currentTimeMillis();String tb = "testTb";String colFamily = "info";String col = "name";String row = "100000";String value = "张三";hBaseUtils.createTable(tb, colFamily);List<Put> listPut = new ArrayList<>();Put put = new Put(Bytes.toBytes(row));put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(value));listPut.add(put);hBaseUtils.put(tb, listPut);Result res = hBaseUtils.get("testTb", "100000");List<Cell> list = res.getColumnCells(Bytes.toBytes("info"), Bytes.toBytes("name"));for (Cell c : list) {LOG.info(Bytes.toString(CellUtil.cloneFamily(c)));LOG.info(Bytes.toString(CellUtil.cloneQualifier(c)));LOG.info(Bytes.toString(CellUtil.cloneValue(c)));}long endTime = System.currentTimeMillis();float seconds = (endTime - startTime) / 1000F;LOG.info(" 耗时" + Float.toString(seconds) + " seconds.");}public static void main(String[] args) throws Exception {test();preDeal();}}
-
编写ElasticSearch工具类,代码如下:
package com.bigdata.utils;import com.alibaba.fastjson.JSONObject;import org.apache.lucene.search.TotalHits;import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;//import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;import org.elasticsearch.action.index.IndexRequestBuilder;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.search.SearchRequestBuilder;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.action.support.master.AcknowledgedResponse;import org.elasticsearch.client.IndicesAdminClient;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.common.xcontent.XContentBuilder;import org.elasticsearch.common.xcontent.XContentType;import org.elasticsearch.index.query.QueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.transport.client.PreBuiltTransportClient;import org.slf4j.Logger;import java.io.IOException;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.Set;import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;public class ElasticSearchUtil {//构建Settings对象private static Settings settings = Settings.builder().put("cluster.name", ConstantUtil.CLUSTER_NAME).put("client.transport.sniff", false).build();//TransportClient对象,用于连接ES集群private volatile TransportClient client;private final static Logger LOG = ConstantUtil.LOG;public ElasticSearchUtil() {init();}/*** 同步synchronized(*.class)代码块的作用和synchronized static方法作用一样,* 对当前对应的*.class进行持锁,static方法和.class一样都是锁的该类本身,同一个监听器** @return* @throws UnknownHostException*/public TransportClient getClient() {if (client == null) {synchronized (TransportClient.class) {try {client = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName(ConstantUtil.HOSTNAME), ConstantUtil.TCP_PORT));} catch (UnknownHostException e) {e.printStackTrace();}}}return client;}/*** 获取索引管理的IndicesAdminClient*/public IndicesAdminClient getAdminClient() {return getClient().admin().indices();}/*** 判定索引是否存在** @param indexName* @return*/public boolean isExistsIndex(String indexName) {IndicesExistsResponse response = getAdminClient().prepareExists(indexName).get();return response.isExists() ? true : false;}/*** 创建索引** @param indexName* @return*/public boolean createIndex(String indexName) {CreateIndexResponse createIndexResponse = getAdminClient().prepareCreate(indexName.toLowerCase()).get();return createIndexResponse.isAcknowledged() ? true : false;}/*** 删除索引** @param indexName* @return*/public boolean deleteIndex(String indexName) {AcknowledgedResponse deleteResponse = getAdminClient().prepareDelete(indexName.toLowerCase()).execute().actionGet();return deleteResponse.isAcknowledged() ? true : false;}/*** 为索引indexName设置mapping** @param indexName* @param typeName* @param mapping*/public void setMapping(String indexName, String typeName, String mapping) {getAdminClient().preparePutMapping(indexName).setType(typeName).setSource(mapping, XContentType.JSON).get();}/*** 创建文档,相当于往表里面insert一行数据** @param indexName* @param typeName* @param id* @param document* @return* @throws IOException*/public long addDocument(String indexName, String typeName, String id, Map<String, Object> document) throws IOException {Set<Map.Entry<String, Object>> documentSet = document.entrySet();IndexRequestBuilder builder = getClient().prepareIndex(indexName, typeName, id);XContentBuilder xContentBuilder = jsonBuilder().startObject();for (Map.Entry e : documentSet) {xContentBuilder = xContentBuilder.field(e.getKey().toString(), e.getValue());}IndexResponse response = builder.setSource(xContentBuilder.endObject()).get();return response.getVersion();}public List<Map<String, Object>> queryStringQuery(String text) {List<Map<String, Object>> resListMap = null;QueryBuilder match = QueryBuilders.queryStringQuery(text);SearchRequestBuilder search = getClient().prepareSearch().setQuery(match); //分页 可选//搜索返回搜索结果SearchResponse response = search.get();//命中的文档SearchHits hits = response.getHits();//命中总数TotalHits total = hits.getTotalHits();SearchHit[] hitAarr = hits.getHits();//循环查看命中值resListMap = new ArrayList<Map<String, Object>>();for (SearchHit hit : hitAarr) {//文档元数据String index = hit.getIndex();//文档的_source的值Map<String, Object> resultMap = hit.getSourceAsMap();resListMap.add(resultMap);}return resListMap;}private void init() {try {client = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName(ConstantUtil.HOSTNAME), ConstantUtil.TCP_PORT));} catch (UnknownHostException e) {e.printStackTrace();}}//用于提前建好索引,相当于关系型数据库当中的数据库public static void preDealCreatIndex() {ElasticSearchUtil esUtils = new ElasticSearchUtil();LOG.info("start create index..............");esUtils.createIndex(ConstantUtil.INDEX_NAME);LOG.info("finished create index !");}/*** 关卡登记信息bayonet:姓名,身份证号,年龄,性别,关卡号,日期时间,通关形式* 住宿登记信息hotel:姓名,身份证号,年龄,性别,起始日期,结束日期,同行人* 网吧登记信息internet:姓名,身份证号,年龄,性别,网吧名,日期,逗留时长* name,id,age,gender,* hotelAddr,hotelInTime,hotelOutTime,acquaintancer,* barAddr,internetDate,timeSpent,* bayonetAddr,crossDate,tripType*/public static void preDealSetMapping() {JSONObject mappingTypeJson = new JSONObject();JSONObject propertiesJson = new JSONObject();JSONObject idJson = new JSONObject();idJson.put("type", "keyword");idJson.put("store", "true");propertiesJson.put("id", idJson);JSONObject nameJson = new JSONObject();nameJson.put("type", "keyword");propertiesJson.put("name", nameJson);JSONObject uidJson = new JSONObject();uidJson.put("type", "keyword");uidJson.put("store", "false");propertiesJson.put("uid", uidJson);JSONObject hotelAddr = new JSONObject();hotelAddr.put("type", "text");propertiesJson.put("address", hotelAddr);JSONObject happenedDate = new JSONObject();happenedDate.put("type", "date");happenedDate.put("format", "yyyy-MM-dd");propertiesJson.put("happenedDate", happenedDate);JSONObject endDate = new JSONObject();endDate.put("type", "date");endDate.put("format", "yyyy-MM-dd");propertiesJson.put("endDate", endDate);JSONObject acquaintancer = new JSONObject();acquaintancer.put("type", "keyword");propertiesJson.put("acquaintancer", acquaintancer);mappingTypeJson.put("properties", propertiesJson);LOG.info("start set mapping to " + ConstantUtil.INDEX_NAME + " " + ConstantUtil.TYPE_NAME + " .....");LOG.info(mappingTypeJson.toString());ElasticSearchUtil esUtils = new ElasticSearchUtil();esUtils.setMapping(ConstantUtil.INDEX_NAME, ConstantUtil.TYPE_NAME, mappingTypeJson.toString());LOG.info("set mapping done!!!");}//用于测试public static void test() {String index = "esindex";System.out.println("createIndex..............");ElasticSearchUtil esUtils = new ElasticSearchUtil();esUtils.createIndex(index);System.out.println("createIndex done!!!!!!!!!!!");System.out.println("isExists = " + esUtils.isExistsIndex(index));System.out.println("deleteIndex...............");esUtils.deleteIndex(index);System.out.println("deleteIndex done!!!!");}public static void main(String[] args) throws IOException {preDealCreatIndex();preDealSetMapping();test();}}
-
编写数据写入HBase和ES的实现类,代码如下:
package com.bigdata.insert;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;import com.bigdata.utils.ConstantUtil;import com.bigdata.utils.ElasticSearchUtil;import com.bigdata.utils.HBaseUtil;import java.io.BufferedReader;import java.io.File;import java.io.FileReader;import java.util.*;/*** 读取本地文件并解析数据,之后插入HBase、ElasticSearch*/public class LoadDataToHBaseAndES {private HBaseUtil hBaseUtil;private ElasticSearchUtil elasticSearchUtil;public LoadDataToHBaseAndES() {}/*** 关卡登记信息bayonet:姓名,身份证号,年龄,性别,关卡号,日期时间,通关形式* 住宿登记信息hotel:姓名,身份证号,年龄,性别,起始日期,结束日期,同行人* 网吧登记信息internet:姓名,身份证号,年龄,性别,网吧名,日期,逗留时长* name,uid,age,gender,* hotelAddr,happenedDate,endDate,acquaintancer,* barAddr,happenedDate,duration,* bayonetAddr,happenedDate,tripType*/public void insert() throws Exception {hBaseUtil = new HBaseUtil();elasticSearchUtil = new ElasticSearchUtil();String filePath = ConstantUtil.INPUT_PATH;File dir = new File(filePath);File[] files = dir.listFiles();if (files != null) {for (File file : files) {if (file.isDirectory()) {System.out.println(file.getName() + "This is a directory!");} else {//住宿登记信息if (file.getName().contains("hotel")) {BufferedReader reader = null;reader = new BufferedReader(new FileReader(filePath + file.getName()));String tempString = null;while ((tempString = reader.readLine()) != null) {//Blank line judgmentif (!tempString.isEmpty()) {List<Put> putList = new ArrayList<Put>();String[] elements = tempString.split(",");//生成不重复用户ID,String id = UUID.randomUUID().toString();Put put = new Put(Bytes.toBytes(id));//将数据添加至hbase库put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("name"), Bytes.toBytes(elements[0]));put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("uid"), Bytes.toBytes(elements[1]));put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("age"), Bytes.toBytes(elements[2]));put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("gender"), Bytes.toBytes(elements[3]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("event"), Bytes.toBytes("hotel"));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("address"), Bytes.toBytes(elements[4]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("happenedDate"), Bytes.toBytes(elements[5]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("endDate"), Bytes.toBytes(elements[6]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("acquaintancer"), Bytes.toBytes(elements[7]));putList.add(put);ConstantUtil.LOG.info("hotel_info start putting to HBase ....:" + id + " " + tempString);hBaseUtil.put(ConstantUtil.TABLE_NAME, putList);//将数据添加至ES库Map<String, Object> esMap = new HashMap<String, Object>();esMap.put("id", id);esMap.put("name", elements[0]);esMap.put("uid", elements[1]);esMap.put("address", elements[4]);esMap.put("happenedDate", elements[5]);esMap.put("endDate", elements[6]);esMap.put("acquaintancer", elements[7]);elasticSearchUtil.addDocument(ConstantUtil.INDEX_NAME, ConstantUtil.TYPE_NAME, id, esMap);ConstantUtil.LOG.info("start add document to ES..." + ConstantUtil.INDEX_NAME + " " + ConstantUtil.TYPE_NAME + " " + id + " " + esMap);}}reader.close();}//网吧登记信息else if (file.getName().contains("internet")) {BufferedReader reader = null;reader = new BufferedReader(new FileReader(filePath + file.getName()));String tempString = null;while ((tempString = reader.readLine()) != null) {//Blank line judgmentif (!tempString.isEmpty()) {List<Put> putList = new ArrayList<Put>();String[] elements = tempString.split(",");//生成不重复用户ID,String id = UUID.randomUUID().toString();Put put = new Put(Bytes.toBytes(id));//将数据添加至hbase库put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("name"), Bytes.toBytes(elements[0]));put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("uid"), Bytes.toBytes(elements[1]));put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("age"), Bytes.toBytes(elements[2]));put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("gender"), Bytes.toBytes(elements[3]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("event"), Bytes.toBytes("internetBar"));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("address"), Bytes.toBytes(elements[4]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("happenedDate"), Bytes.toBytes(elements[5]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("duration"), Bytes.toBytes(elements[6]));putList.add(put);ConstantUtil.LOG.info("internet_info start putting to HBase ... :" + id + " " + tempString);hBaseUtil.put(ConstantUtil.TABLE_NAME, putList);//将数据添加至ES库Map<String, Object> esMap = new HashMap<String, Object>();esMap.put("id", id);esMap.put("name", elements[0]);esMap.put("uid", elements[1]);esMap.put("address", elements[4]);esMap.put("happenedDate", elements[5]);elasticSearchUtil.addDocument(ConstantUtil.INDEX_NAME, ConstantUtil.TYPE_NAME, id, esMap);ConstantUtil.LOG.info("start add document to ES..." + ConstantUtil.INDEX_NAME + " " + ConstantUtil.TYPE_NAME + " " + id + " " + esMap);}}reader.close();}//关卡登记信息else if (file.getName().contains("bayonet")) {BufferedReader reader = null;reader = new BufferedReader(new FileReader(filePath + file.getName()));String tempString = null;while ((tempString = reader.readLine()) != null) {//Blank line judgmentif (!tempString.isEmpty()) {List<Put> putList = new ArrayList<Put>();String[] elements = tempString.split(",");//生成不重复用户ID,String id = UUID.randomUUID().toString();Put put = new Put(Bytes.toBytes(id));//将数据添加至hbase库put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("name"), Bytes.toBytes(elements[0]));put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("uid"), Bytes.toBytes(elements[1]));put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("age"), Bytes.toBytes(elements[2]));put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("gender"), Bytes.toBytes(elements[3]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("event"), Bytes.toBytes("bayonet"));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("address"), Bytes.toBytes(elements[4]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("happenedDate"), Bytes.toBytes(elements[5]));put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("tripType"), Bytes.toBytes(elements[6]));putList.add(put);hBaseUtil.put(ConstantUtil.TABLE_NAME, putList);ConstantUtil.LOG.info("bayonet_info start putting to HBase....:" + id + " " + tempString);//将数据添加至ES库Map<String, Object> esMap = new HashMap<String, Object>();esMap.put("id", id);esMap.put("name", elements[0]);esMap.put("uid", elements[1]);esMap.put("address", elements[4]);esMap.put("happenedDate", elements[5]);elasticSearchUtil.addDocument(ConstantUtil.INDEX_NAME, ConstantUtil.TYPE_NAME, id, esMap);ConstantUtil.LOG.info("start add document to ES..." + ConstantUtil.INDEX_NAME + " " + ConstantUtil.TYPE_NAME + " " + id + " " + esMap);}}reader.close();}//数据描述文件跳过else {continue;}}}ConstantUtil.LOG.info("load and insert done !!!!!!!!!!!!!!!!!!");}}public static void start() throws Exception {LoadDataToHBaseAndES load2DB = new LoadDataToHBaseAndES();load2DB.insert();}public static void main(String[] args) throws Exception {start();}}
-
编写Query查询类,代码如下:
package com.bigdata.query;import com.alibaba.fastjson.JSONObject;import com.bigdata.utils.ConstantUtil;import com.bigdata.utils.ElasticSearchUtil;import com.bigdata.utils.HBaseUtil;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;import java.util.List;import java.util.Map;/**** 搜索逻辑是先搜索ElasticSearch,再查HBase*/public class Query {private HBaseUtil hBaseUtil = new HBaseUtil();private ElasticSearchUtil elasticSearchUtil = new ElasticSearchUtil();private JSONObject result = new JSONObject();private JSONObject tmpJS = new JSONObject();public String query(String target) {result.clear();tmpJS.clear();long startTime = System.currentTimeMillis();List<Map<String, Object>> listMap = elasticSearchUtil.queryStringQuery(target);long endTime = System.currentTimeMillis();float seconds = (endTime - startTime) / 1000F;ConstantUtil.LOG.info("ElasticSearch查询耗时" + Float.toString(seconds) + " seconds.");for (Map<String, Object> m : listMap) {String id = m.get("id").toString();JSONObject tmpJS = new JSONObject();tmpJS.put("id", id);Result res = null;try {long s1 = System.currentTimeMillis();res = hBaseUtil.get(ConstantUtil.TABLE_NAME, id);long e1 = System.currentTimeMillis();float se1 = (e1 - s1) / 1000F;ConstantUtil.LOG.info("HBase查询耗时" + Float.toString(se1) + " seconds.");Cell[] cells = res.rawCells();for (Cell cell : cells) {String col = Bytes.toString(CellUtil.cloneQualifier(cell));System.out.println(col);String value = Bytes.toString(CellUtil.cloneValue(cell));System.out.println(value);tmpJS.put(col, value);}result.put(id, tmpJS);} catch (IOException e) {e.printStackTrace();result.put(id, "查询失败!");}}return result.toString();}public static void main(String[] args) throws Exception {Query query = new Query();long startTime = System.currentTimeMillis();System.out.println(query.query("100004"));long endTime = System.currentTimeMillis();float seconds = (endTime - startTime) / 1000F;ConstantUtil.LOG.info(" 耗时" + Float.toString(seconds) + " seconds.");}}
-
编写ManagerQuery查询类,代码如下:
package com.bigdata.manager;import org.springframework.stereotype.Component;import com.bigdata.query.Query;@Componentpublic class ManagerQuery {private static Query query = new Query();public static String getQueryResult(String target) {try {String result = query.query(target);System.out.println(result);return result;} catch (Exception e) {e.printStackTrace();return "查询出现异常,请通知研发人员!";}}public static void main(String[] args) {String target = "牧之桃";String result = ManagerQuery.getQueryResult(target);System.out.println(result);}}
-
编写SearchService服务类(可参考SpringMVC代码写作),代码如下:
package com.bigdata.service;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.EnableAutoConfiguration;import org.springframework.context.annotation.ComponentScan;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.bigdata.manager.ManagerQuery;@RestController@EnableAutoConfiguration@ComponentScan(basePackages = {"com.bigdata"})public class SearchService {@RequestMapping("/search")public String search(String target) {try {return ManagerQuery.getQueryResult(target);} catch (Exception e) {e.printStackTrace();}return "不小心出错了!";}// 主方法,像一般的Java类一般去右击run as application时候,执行该方法public static void main(String[] args) throws Exception {SpringApplication.run(SearchService.class, args);}}
-
编写SearchController控制类(可参考SpringMVC代码写作),代码如下:
package com.bigdata.controller;import org.springframework.boot.SpringApplication;import org.springframework.stereotype.Controller;import org.springframework.ui.ModelMap;import org.springframework.web.bind.annotation.RequestMapping;/*** 注解声明,该类为Controller类 并自动加载所需要的其它类*/@Controllerpublic class SearchController {@RequestMapping("/index")String testdo(ModelMap map) {//这里返回HTML页面return "index_search";}// 主方法,像一般的Java类一般去右击run as application时候,执行该方法public static void main(String[] args) {SpringApplication.run(SearchController.class, args);}}
-
编写ApplicationBootSystem启动类,代码如下:
package com.bigdata.boot;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan;/*** 根启动类*/ @SpringBootApplication @ComponentScan(basePackages = "com.bigdata") public class ApplicationBootSystem {public static void main(String[] args) {SpringApplication.run(ApplicationBootSystem.class, args);} }
-
新建static,并在其下新建plugins,并将bootstrap-3.3.7和bootstrap-table包复制到该目录下
-
新建template目录,并在其下面新建index_search.html文件:
具体代码如下:<!DOCTYPE html> <html lang="en"> <head><meta charset="utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width, initial-scale=1"><!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --><title>Realtime Search</title><!-- Bootstrap --><link href="plugins/bootstrap-3.3.7/css/bootstrap.min.css" rel="stylesheet"><link href="plugins/bootstrap-table/bootstrap-table.min.css" rel="stylesheet"><!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --><!-- WARNING: Respond.js doesn't work if you view the page via file:// --><!--[if lt IE 9]><script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script>;<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>;<![endif]--> </head> <body> <div class="container"><div class="row"><!-- onsubmit设置成return false,不再显式提交form --><div class="col-md-8 col-md-offset-2 text-center"onsubmit="return false"><form class="form-inline"><div class="form-group"><label for="target">请输入条件</label> <input type="text"class="form-control" id="target" name="target" placeholder="请输入条件"></div><button type="submit" id="submit" class="btn btn-primary">搜一下</button></form></div></div><!-- 在下一行中,添加一个bs系统自带的表格 --><div class="row"><table id="table"></table></div> </div> <!-- jQuery (necessary for Bootstrap's JavaScript plugins) --> <script src="http://code.jquery.com/jquery-1.12.1.min.js" ;></script> <!-- Include all compiled plugins (below), or include individual files as needed --> <script src="plugins/bootstrap-3.3.7/js/bootstrap.min.js"></script> <!-- 加入bootstrap table依赖 --> <script src="plugins/bootstrap-table/bootstrap-table.min.js"></script> <script src="plugins/bootstrap-table/bootstrap-table-locale-all.min.js"></script> <script type="text/javascript">$(function () {<!--初始化表格的样式 -->$('#table').bootstrapTable({columns: [{field: 'id',title: '记录id',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}},{field: 'name',title: '姓名',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}}, {field: 'uid',title: '用户id',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}}, {field: 'age',title: '年龄',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}}, {field: 'gender',title: '性别',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}}, {field: 'event',title: '事件',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}},{field: 'address',title: '地址',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}}, {field: 'happenedDate',title: '发生时间',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}}, {field: 'acquaintancer',title: '同行人',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}},{field: 'endDate',title: '结束时间',formatter: function (value, row, index) {var a = "";if (value == $("#target").val()) {a = '<span style="color:#5858FA">' + value + '</span>';} else {a = '<span style="color#190707">' + value + '</span>';}return a;}}]});//为submit按钮绑定click事件,填充点击查询后的数据查询$("#submit").click(function () {$.ajax({url: '/bigdata/search',data: "target=" + $("#target").val(),cache: false,//false是不缓存,true为缓存async: true,//true为异步,false为同步beforeSend: function () {//请求前},success: function (result) {try {var resultArray = new Array();js = JSON.parse(result);for (var p in js) {resultArray.push(js[p])console.log(js[p]);}console.log(resultArray);$("#table").bootstrapTable('load', resultArray);} catch (e) {window.alert(result);$("#table").bootstrapTable('load', [{"result": "什么也没有找到"}]);}},complete: function () {//请求结束时},error: function () {//请求失败时}})});});</script> </body> </html>
-
写完成后,项目结构如下所示:
四、测试流程
-
先执行HBaseUtil工具类main方法,完成HBase测试表和目标表的创建,验证程序和hbase的连通性;
-
再执行ElasticSearch工具类main方法,完成ElasticSearch测试表和目标表的创建,验证程序和ElasticSearch的连通性;
-
再执行LoadDataToHBaseAndES类,完成数据写入HBase和ElasticSearch中;
-
再执行ApplicationBootSystem启动类,启动springboot入口程序;
-
最后打开浏览器输入:http://localhost:8084/bigdata/index,在打开的界面中的搜索框输入查询关键字,如输入3,点击【搜一下】按钮,正常情况下会看到如下结果:
-
尝试输入不同的条件,查看到不同的结果,
注意:
需要观察检索的实时性或者速度是很快的。另外,还可以尝试下修改测试数据集,使得其数据量变得更大些,然后再查看其检索速度,读者可以自行尝试。