文章目录
- 前言
- 一、框架搭建
- 二、代码编写
- 1.MongoDB连接
- 2.通用CRUD
- 3.HTTP服务
- 总结
前言
最近在学习MongoDB的基本功能,只看文档过于无趣了,于是用Vert.x写了个demo加深印象,也在此记录一下。
一、框架搭建
Vert.x使用的版本是4.4.4,主要用到了vertx-web和vertx-mongo-client。
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version><maven-shade-plugin.version>3.2.4</maven-shade-plugin.version><maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version><exec-maven-plugin.version>3.0.0</exec-maven-plugin.version><vertx.version>4.4.4</vertx.version><junit-jupiter.version>5.9.1</junit-jupiter.version><logback.version>1.2.3</logback.version><jackson.version>2.11.3</jackson.version><main.verticle>com.whty.mongodemo.MainVerticle</main.verticle><launcher.class>io.vertx.core.Launcher</launcher.class></properties><dependencyManagement><dependencies><dependency><groupId>io.vertx</groupId><artifactId>vertx-stack-depchain</artifactId><version>${vertx.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>io.vertx</groupId><artifactId>vertx-config</artifactId></dependency><dependency><groupId>io.vertx</groupId><artifactId>vertx-web</artifactId></dependency><dependency><groupId>io.vertx</groupId><artifactId>vertx-mongo-client</artifactId></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>${logback.version}</version></dependency><dependency><groupId>io.vertx</groupId><artifactId>vertx-junit5</artifactId><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-api</artifactId><version>${junit-jupiter.version}</version><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-engine</artifactId><version>${junit-jupiter.version}</version><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency></dependencies>
二、代码编写
熟悉Vert.x的同学应该都知道,其基本单元是Verticle,本demo中包含三个Verticle,分别是MainVerticle、MongoVerticle、HttpServerVerticle,MainVerticle是程序的入口,在其中依次发布另外两个Verticle。
1.MongoDB连接
在MongoVerticle中,我们主要是建立MongoDB连接和添加EventBus监听。
import io.vertx.core.AbstractVerticle;public class MongoVerticle extends AbstractVerticle {@Overridepublic void start() {new MongoListener(vertx.eventBus(), new MongoClientFactory(vertx).getInstance());}
}
MongoDB的连接建立非常简单,我们通过一个工厂类去创建一个连接。
public class MongoClientFactory {private final MongoClient client;public MongoClientFactory(Vertx vertx) {JsonObject config = new JsonObject().put("db_name", "demo").put("host", "127.0.0.1").put("port", 27017).put("username", "demo").put("password", "demo123").put("authSource", "demo").put("useObjectId", true);this.client = MongoClient.createShared(vertx, config);}public MongoClient getInstance() {return this.client;}
}
需要注意的是数据库的配置,useObjectId表示插入数据时如果无_id则自动生成ObjectId,ObjectId在JSON显示如下
{"_id":{"$oid":"xxxxxxxxxxxxxxxxxxxxx"}}
如果将useObjectId设置为true则在后续API中会自动转换为
{"_id":"xxxxxxxxxxxxxxxxxx"}
useObjectId默认设置为false将会使用普通字符串id。
2.通用CRUD
vertx-mongo-client中的API非常丰富,详细可以参考官方文档,本demo中仅列举基本CRUD操作。
import com.whty.mongodemo.constant.EventBusAddress;
import com.whty.mongodemo.constant.ParamKey;
import com.whty.mongodemo.util.DateUtil;
import io.vertx.core.Future;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.mongo.FindOptions;
import io.vertx.ext.mongo.MongoClient;
import io.vertx.ext.mongo.MongoClientDeleteResult;
import io.vertx.ext.mongo.UpdateOptions;
import io.vertx.ext.mongo.impl.codec.json.JsonObjectCodec;
import lombok.extern.slf4j.Slf4j;
import java.util.List;@Slf4j
public class MongoListener {private final EventBus eventBus;private final MongoClient client;public MongoListener(EventBus eventBus, MongoClient client) {this.eventBus = eventBus;this.client = client;this.addConsumer();}private void addConsumer() {this.queryById();this.query();this.insert();this.remove();this.update();this.queryByPage();this.queryWithLookup();}private void queryById() {eventBus.localConsumer(EventBusAddress.QUERY_BY_ID, message -> {JsonObject body = (JsonObject) message.body();String collection = body.getString(ParamKey.COLLECTION);JsonObject query = body.getJsonObject(ParamKey.QUERY);client.find(collection, query, ar -> {if (ar.succeeded()) {List<JsonObject> list = ar.result();if (list != null && list.size() > 0) {JsonObject result = list.get(0);message.reply(this.handleDate(result));} else {message.reply(null);}} else {log.error(ar.cause().getMessage(), ar.cause());message.reply(ar.cause());}});});}private void query() {eventBus.localConsumer(EventBusAddress.QUERY, message -> {JsonObject body = (JsonObject) message.body();String collection = body.getString(ParamKey.COLLECTION);JsonObject query = body.getJsonObject(ParamKey.QUERY);client.find(collection, query, ar -> {if (ar.succeeded()) {List<JsonObject> list = ar.result();if (list != null && list.size() > 0) {list.forEach(this::handleDate);message.reply(new JsonArray(list));} else {message.reply(new JsonArray());}} else {log.error(ar.cause().getMessage(), ar.cause());message.reply(ar.cause());}});});}private void queryByPage() {eventBus.localConsumer(EventBusAddress.QUERY_BY_PAGE, message -> {JsonObject body = (JsonObject) message.body();String collection = body.getString(ParamKey.COLLECTION);JsonObject query = body.getJsonObject(ParamKey.QUERY);JsonObject pageParams = body.getJsonObject(ParamKey.PAGE);Integer pageSIze = pageParams.getInteger(ParamKey.PAGE_SIZE);Integer pageNum = pageParams.getInteger(ParamKey.PAGE_NUM);JsonObject sort = pageParams.getJsonObject(ParamKey.SORT);FindOptions findOptions = new FindOptions().setSort(sort).setLimit(pageSIze).setSkip((pageNum - 1) * pageSIze);Future<Long> countFuture = client.count(collection, query);Future<List<JsonObject>> dataFuture = client.findWithOptions(collection, query, findOptions);Future.all(countFuture, dataFuture).onSuccess(ar -> {Long count = ar.resultAt(0);List<JsonObject> list = ar.resultAt(1);if (list != null && list.size() > 0) {list.forEach(this::handleDate);}JsonObject result = new JsonObject().put(ParamKey.PAGE_SIZE, pageSIze).put(ParamKey.PAGE_NUM, pageNum).put("total", count).put("list", list != null && list.size() > 0 ? list : new JsonArray());message.reply(result);}).onFailure(e -> {log.error(e.getMessage(), e);message.reply(e);});});}private void insert() {eventBus.localConsumer(EventBusAddress.INSERT, message -> {JsonObject body = (JsonObject) message.body();String collection = body.getString(ParamKey.COLLECTION);JsonObject query = body.getJsonObject(ParamKey.QUERY);query.put("createTime", new JsonObject().put("$date", DateUtil.getUTCTime()));try {client.insert(collection, query, ar -> {if (ar.succeeded()) {String id = ar.result();message.reply(new JsonObject().put("_id", id));} else {log.error("存储数据异常:{}", ar.cause().getMessage(), ar.cause());message.reply(ar.cause());}});} catch (Exception e) {log.error("存储数据异常:{}", e.getMessage(), e);message.reply(e);}});}private void update() {eventBus.localConsumer(EventBusAddress.UPDATE, message -> {JsonObject body = (JsonObject) message.body();String collection = body.getString(ParamKey.COLLECTION);JsonObject query = body.getJsonObject(ParamKey.QUERY);JsonObject data = body.getJsonObject("data");data.put("updateTime", new JsonObject().put("$date", DateUtil.getUTCTime()));JsonObject update = new JsonObject().put("$set", data);//multi set to true to update multiple documentsclient.updateCollectionWithOptions(collection, query, update,new UpdateOptions().setMulti(body.containsKey("multi")),ar -> {if (ar.succeeded()) {message.reply(ar.result().toJson());} else {log.error(ar.cause().getMessage(), ar.cause());message.reply(ar.cause());}});});}private void remove() {eventBus.localConsumer(EventBusAddress.REMOVE, message -> {JsonObject body = (JsonObject) message.body();String collection = body.getString(ParamKey.COLLECTION);JsonObject query = body.getJsonObject(ParamKey.QUERY);Future<MongoClientDeleteResult> future;if (body.containsKey(ParamKey.SINGLE)) { //仅删除匹配的第一条数据future = client.removeDocument(collection, query);} else {future = client.removeDocuments(collection, query);}future.andThen(ar -> {if (ar.succeeded()) {message.reply(ar.result().toJson());} else {log.error(ar.cause().getMessage(), ar.cause());message.reply(ar.cause());}});});}private void queryWithLookup() {eventBus.localConsumer(EventBusAddress.QUERY_WITH_LOOKUP, message -> {JsonObject body = (JsonObject) message.body();String collection = body.getString(ParamKey.COLLECTION);JsonObject query = body.getJsonObject(ParamKey.QUERY);JsonArray pipeline = new JsonArray();JsonObject lookup = new JsonObject().put("$lookup", body.getJsonObject("lookup"));pipeline.add(lookup);if (null != query) {pipeline.add(new JsonObject().put("$match", query));}JsonArray resultArr = new JsonArray();client.aggregate(collection, pipeline).handler(resultArr::add).endHandler(v ->message.reply(resultArr)).exceptionHandler(e -> {log.error(e.getMessage(), e);message.reply(e);});});}private JsonObject handleDate(JsonObject json) {return this.decodeDateKey(this.decodeDateKey(json, "createTime"), "updateTime");}private JsonObject decodeDateKey(JsonObject json, String key) {Object timeField = json.getValue(key, null);if (!(timeField instanceof JsonObject)) return json;Object timeString = ((JsonObject) timeField).getValue(JsonObjectCodec.DATE_FIELD, null);if (!(timeString instanceof String)) return json;json.put(key, DateUtil.formatOffsetDateTime((String) timeString));return json;}
}
Vert.x的Verticle之间主要通过EventBus进行通信,在这里我们创建了MongoDB的通用CRUD监听,而发送消息的一段则在后面的HttpServerVerticle中创建。
可以看到每一个API都会用到collection参数,collection集合就相当于SQL数据库的表,这里collection参数表示表名称,通过传递不同的表名称来对不同的表进行CRUD。
$lookup是MongoDB中的高阶查询方法,可以实现类似SQL数据库中的联表功能,具体用法可以参考这篇文章。
3.HTTP服务
import com.whty.mongodemo.route.CustomForm;
import com.whty.mongodemo.route.FormData;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class HttpServerVerticle extends AbstractVerticle {private final int port = 8080;@Overridepublic void start() {HttpServer server = vertx.createHttpServer();Router router = Router.router(vertx);router.route().handler(BodyHandler.create());//注册路由this.registerRoute(router);server.requestHandler(router).listen(port, ar -> {if (ar.succeeded()) {log.info("web服务启动成功,监听端口{}", port);} else {log.error("web服务启动失败");}});}private void registerRoute(Router router) {new CustomForm().init(router);new FormData().init(router);}
}
在这个Verticle中,我们创建了HttpServer并注册了路由,我们演示的集合有两个分别是custom_form和form_data,一个用于存储自定义表单结构,另一个存储表单填写的数据,这两个集合的业务路由分别由两个类管理。
import com.whty.mongodemo.handler.CustomFormHandler;
import io.vertx.ext.web.Router;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class CustomForm {private final CustomFormHandler handler = new CustomFormHandler();private final static String prefix = "/customForm";public void init(Router router) {router.get(prefix + "/queryById/:id").handler(handler.queryById());router.post(prefix + "/query").handler(handler.query());router.post(prefix + "/queryByPage").handler(handler.queryByPage());router.post(prefix + "/insert").handler(handler.insert());router.put(prefix + "/update").handler(handler.update());router.delete(prefix + "/remove/:id").handler(handler.remove());router.post(prefix + "/queryWithChildren").handler(handler.queryWithChildren());}
}
为了传参方便,查询列表和分页查询也使用的是post方法,不太符合REST规范建议好孩子不要学习。下面继续看handler中的代码
import com.whty.mongodemo.constant.ParamKey;
import com.whty.mongodemo.service.CustomFormService;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class CustomFormHandler extends BaseRoutingHandler {private final CustomFormService service = new CustomFormService("form_schema");public Handler<RoutingContext> queryById() {return ctx -> service.queryById(ctx.pathParam("id"), this.commonReplyHandler(ctx.response()));}public Handler<RoutingContext> query() {return ctx -> service.query(ctx.body() != null ? ctx.body().asJsonObject() : null, this.commonReplyHandler(ctx.response()));}public Handler<RoutingContext> queryByPage() {return ctx -> {JsonObject queryParams = ctx.body().asJsonObject();JsonObject page = new JsonObject().put(ParamKey.PAGE_NUM, 1).put(ParamKey.PAGE_SIZE, 10);JsonObject query = new JsonObject();if (null != queryParams) {queryParams.forEach(entry -> {String k = entry.getKey();Object v = entry.getValue();if (k.equals(ParamKey.PAGE_NUM)) {page.put(ParamKey.PAGE_NUM, v);} else if (k.equals(ParamKey.PAGE_SIZE)) {page.put(ParamKey.PAGE_SIZE, v);} else {if ("formRef".equals(k)) { //模糊查询query.put(k, new JsonObject().put("$regex", v));} else {query.put(k, v);}}});}service.queryByPage(page, query, this.commonReplyHandler(ctx.response()));};}public Handler<RoutingContext> queryWithChildren() {return ctx -> {JsonObject lookup = new JsonObject().put("from", "form_component").put("localField", "component_id").put("foreignField", "_id").put("as", "component_info");service.queryWithLookup(lookup, ctx.body().asJsonObject(), this.commonReplyHandler(ctx.response()));};}public Handler<RoutingContext> insert() {return ctx -> {JsonObject query = ctx.body().asJsonObject();service.insert(query, this.commonReplyHandler(ctx.response()));};}public Handler<RoutingContext> update() {return ctx -> {JsonObject bodyData = ctx.body().asJsonObject();String id = bodyData.getString("_id");bodyData.remove("_id");service.updateById(id, bodyData, this.commonReplyHandler(ctx.response()));};}public Handler<RoutingContext> remove() {return ctx -> service.removeById(ctx.pathParam("id"), this.commonReplyHandler(ctx.response()));}
}
每个业务handler都会继承BaseRoutingHandler,其中包含一些通用方法
import com.whty.mongodemo.model.JsonResult;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class BaseRoutingHandler {public void endJson(HttpServerResponse response, JsonResult jsonResult) {response.putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8");response.end(jsonResult.encodePrettily());}public void endJson(HttpServerResponse response) {response.putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8");response.end();}public Handler<AsyncResult<Message<Object>>> commonReplyHandler(HttpServerResponse response) {return ar -> {if (ar.succeeded()) {if (null != ar.result().body() && ar.result().body() instanceof Throwable) {this.endJson(response, JsonResult.failed());} else {this.endJson(response, JsonResult.success(ar.result().body()));}} else {log.error(ar.cause().getMessage(), ar.cause());this.endJson(response, JsonResult.failed());}};}
}
form_data表相关的代码与以上代码非常相似就不贴了。可以看到整个项目都是JSON直通到底没有使用实体类,这也是为了演示MongoDB库数据结构灵活的特点,在实际生产中,还是建议规范定义schema,创建实体类传参,避免数据混乱。
总结
以上就是Vert.x操作MongoDB的示例代码,对Vert.x感兴趣的同学可以看看本人其他相关文章,下面是本项目源码,可以顺手点个STAR。
源码下载