redis storm mysql_storm-redis 详解

多的不说,先来代码分析,再贴我自己写的代码。如果代码有错误,求更正。。

导入两个关键包,其他项目需要的包,大家自己导入了,我pom下的包太多,不好一下扔上来。

org.apache.storm

storm-redis

${storm.version}

redis.clients

jedis

2.9.0

我是连接的linux上的redis,所以要对redis进行配置,不然会出现拒绝连接的错误。redis部署在linux时,java远程连接需要修改配置:

修改redis.conf文件

1.将bind 127.0.0.1加上注释,(#bind 127.0.0.1),允许出本机外的IP访问redis

2.将protected-mode yes,修改为protected-mode no;不保护redis

3.将daemonize no,修改为daemonize yes;允许redis服务后台运行

修改防火墙端口号

1.将redis默认的6379注册到防火墙中

/sbin/iptables -I INPUT -p tcp –dport 6379 -j ACCEPT

2.保存防火墙端口号表

/etc/rc.d/init.d/iptables save

3.重启防火墙

/etc/rc.d/init.d/iptables restart

4.查看防火墙状态

/etc/rc.d/init.d/iptables status

使用测试类连接下看能不能连同:import java.util.Iterator;

import java.util.Set;

import redis.clients.jedis.Jedis;

/**

* @author cwc

* @date 2018年5月30日

* @description:

* @version 1.0.0

*/

public class RedisTest {

public static void main(String[]args){

//连接本地的 Redis 服务

Jedis jedis = new Jedis("xxx.xx.xxx.xx");

System.out.println("连接成功");

//查看服务是否运行

System.out.println("服务正在运行: "+jedis.ping());

// 获取数据并输出

Set keys = jedis.keys("*");

Iterator it=keys.iterator() ;

while(it.hasNext()){

String key = it.next();

System.out.println(key);

}

}

}

准备就绪,先说说storm向redis写入:

官方给的写入API:class WordCountStoreMapper implements RedisStoreMapper {

private RedisDataTypeDescription description;

private final String hashKey = "wordCount";

public WordCountStoreMapper() {

description = new RedisDataTypeDescription(

RedisDataTypeDescription.RedisDataType.HASH, hashKey);

}

@Override

public RedisDataTypeDescription getDataTypeDescription() {

return description;

}

@Override

public String getKeyFromTuple(ITuple tuple) {

return tuple.getStringByField("word");

}

@Override

public String getValueFromTuple(ITuple tuple) {

return tuple.getStringByField("count");

}

}//这里是用来new 一个新的bolt,在TopologyBuilder时调用操作

JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()

.setHost(host).setPort(port).build();

RedisStoreMapper storeMapper = new WordCountStoreMapper();

RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

我反正刚刚看的时候一脸懵逼,之后研究了很久才明白,下面贴我自己的代码:import java.util.HashMap;

import java.util.Map;

import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichSpout;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

/**

* @author cwc

* @date 2018年5月29日

* @description:这是给的假的数据源

* @version 1.0.0

*/

public class RedisWriteSpout extends BaseRichSpout {

private static final long serialVersionUID = 1L;

private SpoutOutputCollector spoutOutputCollector;

/**

* 作为字段word输出

*/

private static final Map LASTNAME = new HashMap();

static {

LASTNAME.put(0, "anderson");

LASTNAME.put(1, "watson");

LASTNAME.put(2, "ponting");

LASTNAME.put(3, "dravid");

LASTNAME.put(4, "lara");

}

/**

* 作为字段myValues输出

*/

private static final Map COMPANYNAME = new HashMap();

static {

COMPANYNAME.put(0, "abc");

COMPANYNAME.put(1, "dfg");

COMPANYNAME.put(2, "pqr");

COMPANYNAME.put(3, "ecd");

COMPANYNAME.put(4, "awe");

}

public void open(Map conf, TopologyContext context,

SpoutOutputCollector spoutOutputCollector) {

this.spoutOutputCollector = spoutOutputCollector;

}

public void nextTuple() {

final Random rand = new Random();

int randomNumber = rand.nextInt(5);

try {

Thread.sleep(100);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

spoutOutputCollector.emit (new Values(LASTNAME.get(randomNumber),COMPANYNAME.get(randomNumber)));

System.out.println("数据来袭!!!!!!");

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// emit the field site.

declarer.declare(new Fields("word","myValues"));

}

}import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;

import org.apache.storm.redis.common.mapper.RedisStoreMapper;

import org.apache.storm.tuple.ITuple;

/**

* @author cwc

* @date 2018年5月30日

* @description:

* @version 1.0.0

*/

public class RedisWriteMapper implements RedisStoreMapper{

private static final long serialVersionUID = 1L;

private RedisDataTypeDescription description;

//这里的key是redis中的key

private final String hashKey = "mykey";

public RedisWriteMapper() {

description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);

}

@Override

public String getKeyFromTuple(ITuple ituple) {

//这个代表redis中,hash中的字段名

return ituple.getStringByField("word");

}

@Override

public String getValueFromTuple(ITuple ituple) {

//这个代表redis中,hash中的字段名对应的值

return ituple.getStringByField("myValues");

}

@Override

public RedisDataTypeDescription getDataTypeDescription() {

return description;

}

}

storm读取redis数据:

官方给的API:class WordCountRedisLookupMapper implements RedisLookupMapper {

private RedisDataTypeDescription description;

private final String hashKey = "wordCount";

public WordCountRedisLookupMapper() {

description = new RedisDataTypeDescription(

RedisDataTypeDescription.RedisDataType.HASH, hashKey);

}

@Override

public List toTuple(ITuple input, Object value) {

String member = getKeyFromTuple(input);

List values = Lists.newArrayList();

values.add(new Values(member, value));

return values;

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("wordName", "count"));

}

@Override

public RedisDataTypeDescription getDataTypeDescription() {

return description;

}

@Override

public String getKeyFromTuple(ITuple tuple) {

return tuple.getStringByField("word");

}

@Override

public String getValueFromTuple(ITuple tuple) {

return null;

}

}JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()

.setHost(host).setPort(port).build();

RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();

RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);

自己代码:import java.util.HashMap;

import java.util.Map;

import java.util.Random;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichSpout;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

/**

* @author cwc

* @date 2018年5月30日

* @description:

* @version 1.0.0

*/

public class RedisReadSpout extends BaseRichSpout {

private static final long serialVersionUID = 1L;

private SpoutOutputCollector spoutOutputCollector;

/**

* 这是刚刚作为word写入的数据,要通过他获取我们存的值

*/

private static final Map LASTNAME = new HashMap();

static {

LASTNAME.put(0, "anderson");

LASTNAME.put(1, "watson");

LASTNAME.put(2, "ponting");

LASTNAME.put(3, "dravid");

LASTNAME.put(4, "lara");

}

public void open(Map conf, TopologyContext context,

SpoutOutputCollector spoutOutputCollector) {

this.spoutOutputCollector = spoutOutputCollector;

}

public void nextTuple() {

final Random rand = new Random();

int randomNumber = rand.nextInt(5);

try {

Thread.sleep(100);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

spoutOutputCollector.emit (new Values(LASTNAME.get(randomNumber)));

System.out.println("读数据来袭!!!!!!");

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// emit the field site.

declarer.declare(new Fields("word"));

}

}import java.util.List;

import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;

import org.apache.storm.redis.common.mapper.RedisLookupMapper;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.ITuple;

import org.apache.storm.tuple.Values;

import com.google.common.collect.Lists;

/**

* @author cwc

* @date 2018年5月30日

* @description:

* @version 1.0.0

*/

public class RedisReadMapper implements RedisLookupMapper {

private static final long serialVersionUID = 1L;

//对redis的所支持的种类进行了初始化

private RedisDataTypeDescription description;

//你想要读取的hash表中的key,这里使用的是刚刚存储的key字段名

private final String hashKey="mykey";

/**

* redis中储存结构为hash hashKey为根key 然后在通过getKeyFromTuple 获得的key找到相对于的value

* key1-key2[]-value key2中的每一个key对应一个value

* lookupValue = jedisCommand.hget(additionalKey, key);

*/

public RedisReadMapper() {

description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);

}

@Override

public String getKeyFromTuple(ITuple tuple) {

//获取传过来的字段名

return tuple.getStringByField("word");

}

@Override

public String getValueFromTuple(ITuple tuple) {

return null;

}

@Override

public RedisDataTypeDescription getDataTypeDescription() {

return description;

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

//从redis中hash通过上面的key下面找到制定的word中的字段名下的值,有点想hbase中row:cf:val一样

declarer.declare(new Fields("word","values"));

}

@Override

/**

* 将拿到的数据装进集合并且返回

*/

public List toTuple(ITuple input, Object value) {

String member =getKeyFromTuple(input);

List values =Lists.newArrayList();

//将拿到的数据存进集合,下面时将两个值返回的,所以向下游传值时需要定义两个名字。

values.add(new Values(member,value));

return values;

}

}import java.util.Map;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

/**

* @author cwc

* @date 2018年5月30日

* @description:打印获取的数据

* @version 1.0.0

*/

public class RedisOutBolt extends BaseRichBolt{

private OutputCollector collector;

@Override

public void execute(Tuple tuple) {

//String str =tuple.getString(0);

String strs =tuple.getString(1);

System.out.println(strs);

}

@Override

public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {

// TODO Auto-generated method stub

this.collector=collector;

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("RedisOutBolt"));

}

}

接下来是  RedisMain,测试读写方法:import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.redis.bolt.RedisLookupBolt;

import org.apache.storm.redis.bolt.RedisStoreBolt;

import org.apache.storm.redis.common.config.JedisPoolConfig;

import org.apache.storm.redis.common.mapper.RedisLookupMapper;

import org.apache.storm.redis.common.mapper.RedisStoreMapper;

import org.apache.storm.topology.TopologyBuilder;

public class RedisMain {

public static void main(String[] args) throws Exception {

//writeRedis();

readRedis();

}

/**

* 写redis

*/

public static void writeRedis(){

JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()

.setHost("xxx.xx.xx.xx").setPort(6379).build();

System.out.println("连接成功!!!");

RedisStoreMapper storeMapper = new RedisWriteMapper();

RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("RedisWriteSpout", new RedisWriteSpout(), 2);

builder.setBolt("to-save", storeBolt, 1).shuffleGrouping("RedisWriteSpout");

Config conf = new Config();

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf, builder.createTopology());

System.err.println("写入完成!!!!!");

try {

Thread.sleep(10000);

//等待6s之后关闭集群

cluster.killTopology("test");

//关闭集群

cluster.shutdown();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

/**

* 读redis

*/

public static void readRedis(){

JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()

.setHost("xxx.xx.xxx.xx").setPort(6379).build();

RedisLookupMapper lookupMapper = new RedisReadMapper();

RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("RedisReadSpout-reader", new RedisReadSpout(), 2);

builder.setBolt("to-lookupBolt", lookupBolt, 1).shuffleGrouping("RedisReadSpout-reader");

builder.setBolt("to-out",new RedisOutBolt(), 1).shuffleGrouping("to-lookupBolt");

Config conf = new Config();

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf, builder.createTopology());

try {

Thread.sleep(100000);

//等待6s之后关闭集群

cluster.killTopology("test");

//关闭集群

cluster.shutdown();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}    很多解释都写在了代码注解中,其中也有很多问题,在代码注释的地方放生的,认真看下代码,祝大家零BUG哦~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/434219.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

22套精美的网页按钮图标设计推荐(包括PSD和AI文件)

图标是一个简单的单色符号,表示对象的基本形状。字形被广泛地使用在我们周围的公共场所,如机场和商场或购物中心。在网页设计当中,他经常会被使用,用来展示每个元素代表的意思,他简洁大方,一目了然&#xf…

轻gc和重gc分别在什么时候发生_GC发展与现状

GC发展Java不像C或C那样,需要程序员在编程的过程中,时刻注意申请内存保存对象,在对象使用完成后,要在合适的时机将对象占用的内存释放掉(析构函数);Java得意与内部的三大机制,保证了程序开发方便&#xff1…

解决VC不包含stdint.h头文件问题

stdint.h是C99的标准,主要用于统一跨平台数据定义。 MSVC中不带有这个头文件,直到VS2010(新闻)。 在之前的版本里面,我们可以: (1)下载这个头文件 download a MS version of this …

hibernate mysql 性能_MyBatis和Hibernate相比,优势在哪里?

MyBatis和Hibernate相比,优势在哪里?发布时间:2020-06-07 22:57:08来源:51CTO阅读:435作者:qq5923dd411b8fa栏目:数据库1、开发对比开发速度hibernate的真正掌握要比Mybatis来得难些。Mybatis框…

nginx离线安装_web高可用-基于keepalived和nginx

一.体系架构在Keepalived Nginx高可用负载均衡架构中,keepalived负责实现High-availability (HA) 功能控制前端机VIP(虚拟网络地址),当有设备发生故障时,热备服务器可以瞬间将VIP自动切换过来,实际运行中体验只有2秒钟…

C#数组的合并拆分

1.合并拆分数组 ///<summary>///合并数组 ///</summary>///<param name"First">第一个数组</param>///<param name"Second">第二个数组</param>///<returns>合并后的数组(第一个数组第二个数组&#xff…

VC嵌入python时debug版lib下载

前些天发的一个随笔中得知python做界面很强&#xff0c;加之以前也听说过&#xff0c;所以学习了一下&#xff0c;主要想看看python和C的互操作以取长补短。 化了大约4天时间草草看了《Python编程金典》&#xff0c;觉得思想和Java的差不多&#xff0c;面向对象的思想和机制都…

php mysql 实现原理_php+mysql分页原理实现

完整代码如下&#xff1a;!htmlhead-"-type"";GBK"styletype"text/css"body{font-size:12px;font-family:verdana;width:100%;}div.page{text-align:center;}div.content{height:300px;}div.pagea{border:#aaaadd1pxsolid;text-decoration:none;…

Tiny6410上安装debian基本系统的过程

Tiny6410上安装debian基本系统的过程&#xff1a;注&#xff1a;debootstrap工具可以在已有的任何linux中使用&#xff0c;不过建议用debian或其衍生系统(如ubuntu等)&#xff0c;以下步骤可以在电脑上做&#xff0c;也可以直接在Tiny6410上做&#xff0c;不过很慢&#xff0c;…

python任意代码都可以缩进去_我发现了个 Python 黑魔法,执行任意代码都会自动念上一段 『平安经』...

最近的"平安经"可谓是引起了不小的风波啊。作为一个正儿八经的程序员&#xff0c;最害怕的就是自己的代码上线出现各种各样的 BUG。为此&#xff0c;明哥今天分享一个 Python 的黑魔法&#xff0c;教你如何在你执行任意 Python 代码前&#xff0c;让 Python 解释器自…

域用户更改密码提示拒绝访问_AD域中的ACL攻防探索

前言关于域内ACL的攻防近两年经常被人所提起&#xff0c;同时也产生了很多关于域内ACL相关的工具和攻击方式&#xff0c;本文将会从ACL的概念谈起&#xff0c;然后介绍几种不同的域内攻击方式以及如何监测和防御对于ACL的攻击。ACL的概念和作用ACM&#xff1a;首先想要了解ACL首…

Codeforces Round #143 (Div. 2) C

http://codeforces.com/contest/231/problem/C 昨天的cf。比赛的时候其实有点思路的&#xff0c;但是因为B搞错方向了没敢写C。刚刚写了下&#xff0c;AC了。用的思路还真是昨天晚上的。囧。昨天晚上看的时候有一个地方不知道怎么转换&#xff0c;就是怎么快速求出i-->ians需…

go MySQL 多语句_八、MySQL经典查询语句-Go语言中文社区

student表course表score表teacher表1、 查询Student表中的所有记录的Sname、Ssex和Class列。select Sname,Ssex,Class from Student;2、 查询教师所有的单位即不重复的Depart列。select distinct Depart from Teacher3、 查询Student表的所有记录。select * from Student4、 查…

ffmpeg编译 MingW + MSYS

环境要求&#xff1a; MinGW:5.1.4 make: 3.81 gcc: 4.3.2 w32api: 3.13 SDL: 1.2.1 MinGW-Runtime:3.15 1.Minimal SYStem (MSYS) 安装 下载MSYS-1.0.10.exe并安装&#xff1a;下载地址&#xff1a;http://downl…

如何干净的删除vm_如何在macOS 10.15 Catalina绕过XProtect?

在macOS 10.15 Catalina中&#xff0c;Apple进行了许多安全性能地改进&#xff0c;包括通过使所有可执行文件都受XProtect扫描来加固系统&#xff0c;而不管文件是否带有com.apple.quarantine位标记。对于安全研究人员而言&#xff0c;这意味着不再像以前的macOS一样&#xff0…

vtun中setsockopt fcntl等有关套接字设置

client.c文件中在建立socket后有一句 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); SO_REUSEADDR允许重用本地地址。 client.c文件在bind之后的connec_t函数中&#xff0c; 先将socket设为非阻塞&#xff0c;然后connect 然后select然后再将socket状态改为…

mysql行级锁作用_Mysql事务及行级锁的理解

在最近的开发中&#xff0c;碰到一个需求签到&#xff0c;每个用户每天只能签到一次&#xff0c;那么怎么去判断某个用户当天是否签到呢&#xff1f;因为当属表设计的时候&#xff0c;每个用户签到一次&#xff0c;即向表中插入一条记录&#xff0c;根据记录的数量和时间来判断…

activexobject对象不能创建_Oracle数据库用户管理之系统权限和对象权限

【关键术语】Privileges 权限System privileges 系统权限Object privileges 对象权限Grant 授予Revoke 撤消2.1 Oracle 权限概述2.1.1 权限的作用权限(privilege)是指执行特定类型 SQL 命令或访问其他模式对象的权利。Oracle 使用 权限来控制用户对数据的访问以及用户所能执行的…

视频文件大小计算

1.每小时录像文件大小计算公式: 码率大小*3600/8/1024 MB/小时。2.硬盘录像机硬盘容量计算公式: 每小时录像文件大小*每天录像时间&#xff08;时&#xff09;*硬盘录像机路数*需要保存的天数。例如&#xff1a;8路硬盘录像机&#xff0c;音视频录像&#xff0c;采用512Kbps定…

【啃不完的算法导论】- 动态规划 - 最长公共子序列(概念篇)

以下内容纯是为了熟悉《算法导论》中的内容&#xff0c;高手可略过&#xff0c;其中涉及的书本内容的版权归原作者、译者、出版社所有 求最长公共子序列&#xff0c;一个典型的 动态规划题 和 字符串处理算法&#xff0c;写在这里是希望自己以后能多来看看和改改&#xff0c;温…