我很高兴宣布Pygmalios开发的ReactiveInflux的第一个发行版。 InfluxDB错过了Scala和Java的非阻塞驱动程序。 不变性,可测试性和可扩展性是ReactiveInflux的关键功能。 加上对Apache Spark的支持,它是首选武器。
- https://github.com/pygmalios/reactiveinflux
它在内部使用Play Framework WS API ,它是基于Async Http Client构建的丰富的异步HTTP客户端 。
特征
- Scala的异步(非阻塞)接口
- Scala和Java的同步(阻塞)接口
- 同时支持Spark和Spark流
- 不变性
- 可测性
- 可扩展性
兼容性
- InfluxDB 0.11、0.10和0.9(甚至可能更旧)
- Scala 2.11和2.10
- Java 7及以上
- Apache Spark 1.4及更高版本
Scala异步(非阻塞)示例
val result = withInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>db.create().flatMap { _ =>val point = Point(time = DateTime.now(),measurement = "measurement1",tags = Map("t1" -> "A", "t2" -> "B"),fields = Map("f1" -> 10.3,"f2" -> "x","f3" -> -1,"f4" -> true))db.write(point).flatMap { _ =>db.query("SELECT * FROM measurement1").flatMap { queryResult =>println(queryResult.row.mkString)db.drop()}}}
}
Scala同步(阻塞)示例
implicit val awaitAtMost = 10.seconds
syncInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>db.create()val point = Point(time = DateTime.now(),measurement = "measurement1",tags = Map("t1" -> "A", "t2" -> "B"),fields = Map("f1" -> 10.3,"f2" -> "x","f3" -> -1,"f4" -> true))db.write(point)val queryResult = db.query("SELECT * FROM measurement1")println(queryResult.row.mkString)db.drop()
}
Java同步(阻塞)示例
// Use Influx at the provided URL
ReactiveInfluxConfig config = new JavaReactiveInfluxConfig(new URI("http://localhost:8086/"));
long awaitAtMostMillis = 30000;
try (SyncReactiveInflux reactiveInflux = new JavaSyncReactiveInflux(config, awaitAtMostMillis)) {SyncReactiveInfluxDb db = reactiveInflux.database("example1");db.create();Map tags = new HashMap<>();tags.put("t1", "A");tags.put("t2", "B");Map fields = new HashMap<>();fields.put("f1", 10.3);fields.put("f2", "x");fields.put("f3", -1);fields.put("f4", true);Point point = new JavaPoint(DateTime.now(),"measurement1",tags,fields);db.write(point);QueryResult queryResult = db.query("SELECT * FROM measurement1");System.out.println(queryResult.getRow().mkString());db.drop();
}
Apache Spark Scala示例
val point1 = Point(time = DateTime.now(),measurement = "measurement1",tags = Map("tagKey1" -> "tagValue1","tagKey2" -> "tagValue2"),fields = Map("fieldKey1" -> "fieldValue1","fieldKey2" -> 10.7)
)
sc.parallelize(Seq(point1)).saveToInflux()
Apache Spark流Scala示例
val point1 = Point(time = DateTime.now(),measurement = "measurement1",tags = Map("tagKey1" -> "tagValue1","tagKey2" -> "tagValue2"),fields = Map("fieldKey1" -> "fieldValue1","fieldKey2" -> 10.7)
)
val queue = new mutable.Queue[RDD[Point]]
queue.enqueue(ssc.sparkContext.parallelize(Seq(point1)))
ssc.queueStream(queue).saveToInflux()
Apache Spark Java示例
...
SparkInflux sparkInflux = new SparkInflux("example", 1000);
sparkInflux.saveToInflux(sc.parallelize(Collections.singletonList(point)));
Apache Spark流Java示例
...
SparkInflux sparkInflux = new SparkInflux("example", 1000);
Queue> queue = new LinkedList<>();
queue.add(ssc.sparkContext().parallelize(Collections.singletonList(point)));
sparkInflux.saveToInflux(ssc.queueStream(queue));
斯洛伐克布拉迪斯拉发的高科技初创公司投资于尖端技术,以确保实时预测零售分析领域的快速增长。
翻译自: https://www.javacodegeeks.com/2016/04/introducing-reactiveinflux-non-blocking-influxdb-driver-scala-java-supporting-apache-spark.html