1.引入POM文件
如果想调用Flume,需要引入flume相关的jar包依赖,jar包依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>cn.com.toto.stormlogPro</artifactId><groupId>stormlogPro</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>cn.com.toto.flume</artifactId><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.6.0</version><!-- 设置打包的时候,剔除依赖--><scope>provided</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>cn.com.toto.stromlogpro.log4j.LogInfoBuilder</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build>
</project>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
2.自定义的拦截器的代码
package cn.com.toto.stromlogpro.flume;import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;/*** 自定义一个点击流收集的拦截器* * 1、实现一个Interceptor.Builder接口。* 2、Interceptor.Builder中有个configuref方法,通过configure获取配置文件中的相应key。* 3、Interceptor.Builder中有个builder方法,通过builder创建一个自定义的AppInterceptor* 4、AppInterceptor中有两个方法,一个是批处理,一个单条处理,将批处理的逻辑转换为单条处理* 5、需要在单条数据中添加 appid,由于appid是变量。需要在AppInterceptor的构造器中传入一些参数。* 6、为自定义的AppInterceptor创建有参构造器,将需要的参数传入进来。** @author tuzq* @create 2017-06-25 12:48*/
public class AppInterceptor implements Interceptor{//4.定义成员变量appId,用来接收从配置文件中读取的信息private String appId;public AppInterceptor(String appId) {this.appId = appId;}/*** 单条数据进行处理,通过这个方式为日志添加上系统id* @param event* @return*/@Overridepublic Event intercept(Event event) {String message = null;try {message = new String(event.getBody(), "utf-8");} catch (UnsupportedEncodingException e) {message = new String(event.getBody());}//处理逻辑if (StringUtils.isNotBlank(message)) {message = "aid:"+appId+"||msg:" +message;event.setBody(message.getBytes());//正常逻辑应该执行到这里return event;}return event;}/*** 批量数据进行处理* @param list* @return*/@Overridepublic List<Event> intercept(List<Event> list) {List<Event> resultList = new ArrayList<Event>();for (Event event : list) {Event r = intercept(event);if (r != null) {resultList.add(r);}}return resultList;}@Overridepublic void initialize() {}@Overridepublic void close() {}public static class AppInterceptorBuilder implements Interceptor.Builder{//1、获取配置文件的appIdprivate String appId;@Overridepublic Interceptor build() {//3、构造拦截器return new AppInterceptor(appId);}@Overridepublic void configure(Context context) {//2、当出现default之后,就是点击流告警系统this.appId = context.getString("appId","default");System.out.println("appId:"+appId);}}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
LogInfoBuilder的代码如下:
package cn.com.toto.stromlogpro.log4j;import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.logging.Logger;/*** 通过这个工程模拟创建日志内容** @author tuzq* @create 2017-06-25 13:51*/
public class LogInfoBuilder {private final static Logger logger = Logger.getLogger("msg");public static void main(String[] args) {Random random = new Random();List<String> list = logInfoList();while(true) {logger.info(list.get(random.nextInt(list.size())));}}private static List<String> logInfoList() {List list = new ArrayList<String>();list.add("aid:1||msg:error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("java.sql.SQLException: You have an error in your SQL syntax;");list.add("error Unable to connect to any of the specified MySQL hosts.");list.add("error:Servlet.service() for servlet action threw exception java.lang.NullPointerException");list.add("error:Exception in thread main java.lang.ArrayIndexOutOfBoundsException: 2");list.add("error:NoSuchMethodError: com/starit/.");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:java.lang.NoClassDefFoundError: org/coffeesweet/test01/Test01");list.add("error:Java.lang.IllegalStateException");list.add("error:Java.lang.IllegalMonitorStateException");list.add("error:Java.lang.NegativeArraySizeException");list.add("error:java.sql.SQLException: You have an error in your SQL syntax;");list.add("error:Java.lang.TypeNotPresentException ");list.add("error:Java.lang.UnsupprotedOperationException ");list.add("error Java.lang.IndexOutOfBoundsException");list.add("error Java.lang.ClassNotFoundException");list.add("error java.lang.ExceptionInInitializerError ");list.add("error:java.lang.IncompatibleClassChangeError ");list.add("error:java.lang.LinkageError ");list.add("error:java.lang.OutOfMemoryError ");list.add("error java.lang.StackOverflowError");list.add("error: java.lang.UnsupportedClassVersionError");list.add("error java.lang.ClassCastException");list.add("error: java.lang.CloneNotSupportedException");list.add("error: java.lang.EnumConstantNotPresentException ");list.add("error java.lang.IllegalMonitorStateException ");list.add("error java.lang.IllegalStateException ");list.add("error java.lang.IndexOutOfBoundsException ");list.add("error java.lang.NumberFormatException ");list.add("error java.lang.RuntimeException ");list.add("error java.lang.TypeNotPresentException ");list.add("error MetaSpout.java:9: variable i might not have been initialized");list.add("error MyEvaluator.java:1: class Test1 is public, should be declared in a file named Test1.java ");list.add("error Main.java:5: cannot find symbol ");list.add("error NoClassDefFoundError: asa wrong name: ASA ");list.add("error Test1.java:54: 'void' type not allowed here");list.add("error Test5.java:8: missing return statement");list.add("error:Next.java:66: cannot find symbol ");list.add("error symbol : method createTempFile(java.lang.String,java.lang.String,java.lang.String) ");list.add("error invalid method declaration; return type required");list.add("error array required, but java.lang.String found");list.add("error Exception in thread main java.lang.NumberFormatException: null 20. .");list.add("error non-static method cannot be referenced from a static context");list.add("error Main.java:5: non-static method fun1() cannot be referenced from a static context");list.add("error continue outside of loop");list.add("error MyAbstract.java:6: missing method body, or declare abstract");list.add("error Main.java:6: Myabstract is abstract; cannot be instantiated");list.add("error MyInterface.java:2: interface methods cannot have body ");list.add("error Myabstract is abstract; cannot be instantiated");list.add("error asa.java:3: modifier static not allowed here");list.add("error possible loss of precision found: long required:byte var=varlong");list.add("error java.lang.NegativeArraySizeException ");list.add("error java.lang.ArithmeticException: by zero");list.add("error java.lang.ArithmeticException");list.add("error java.lang.ArrayIndexOutOfBoundsException");list.add("error java.lang.ClassNotFoundException");list.add("error java.lang.IllegalArgumentException");list.add("error fatal error C1010: unexpected end of file while looking for precompiled header directive");list.add("error fatal error C1083: Cannot open include file: R…….h: No such file or directory");list.add("error C2011:C……clas type redefinition");list.add("error C2018: unknown character 0xa3");list.add("error C2057: expected constant expression");list.add("error C2065: IDD_MYDIALOG : undeclared identifier IDD_MYDIALOG");list.add("error C2082: redefinition of formal parameter bReset");list.add("error C2143: syntax error: missing : before ");list.add("error C2146: syntax error : missing ';' before identifier dc");list.add("error C2196: case value '69' already used");list.add("error C2509: 'OnTimer' : member function not declared in 'CHelloView'");list.add("error C2555: 'B::f1': overriding virtual function differs from 'A::f1' only by return type or calling convention");list.add("error C2511: 'reset': overloaded member function 'void (int)' not found in 'B'");list.add("error C2660: 'SetTimer' : function does not take 2 parameters");list.add("error warning C4035: 'f……': no return value");list.add("error warning C4553: '= =' : operator has no effect; did you intend '='");list.add("error C4716: 'CMyApp::InitInstance' : must return a value");list.add("error LINK : fatal error LNK1168: cannot open Debug/P1.exe for writing");list.add("error LNK2001: unresolved external symbol public: virtual _ _thiscall C (void)");list.add("error java.lang.IllegalArgumentException: Path index.jsp does not start with");list.add("error org.apache.struts.action.ActionServlet.process(ActionServlet.java:148");list.add("error org.apache.jasper.JasperException: Exception in JSP");list.add("error The server encountered an internal error () that prevented it from fulfilling this request");list.add("error org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467");list.add("error javax.servlet.http.HttpServlet.service(HttpServlet.java:803)");list.add("error javax.servlet.jsp.JspException: Cannot find message resources under key org.apache.struts.action.MESSAGE");list.add("error Stacktrace: org.apache.jasper.servlet.JspServletWrapper.handleJspException(JspServletWrapper.java:467)");list.add("error javax.servlet.ServletException: Cannot find bean org.apache.struts.taglib.html.BEAN in any scope");list.add("error no data found");list.add("error exception in thread main org.hibernate.MappingException: Unknown entity:.");list.add("error using namespace std;");list.add("error C2065: 'cout' : undeclared identifier");list.add("error main already defined in aaa.obj");list.add("error syntax error : missing ';' before '}'");list.add("error cout : undeclared identifier");list.add("error weblogic.servlet.internal.WebAppServletContext$ServletInvocationAction.run(WebAp ");list.add("error Caused by: java.lang.reflect.InvocationTargetException");list.add("error Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao");list.add("error at com.starit.gejie.Util.Trans.BL_getSysNamesByType(Trans.java:220)");return list;}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
MyDailyRollingFileAppender的代码如下:
package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.DailyRollingFileAppender;
import org.apache.log4j.Priority;/*** @author tuzq* @create 2017-06-25 13:58*/
public class MyDailyRollingFileAppender extends DailyRollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
MyRollingFileAppender的代码如下:
package cn.com.toto.stromlogpro.log4j;/*** Created by toto on 2017/6/25.*/import org.apache.log4j.Priority;
import org.apache.log4j.RollingFileAppender;/*** @author tuzq* @create 2017-06-25 14:01*/
public class MyRollingFileAppender extends RollingFileAppender {@Overridepublic boolean isAsSevereAsThreshold(Priority priority) {return getThreshold().equals(priority);}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
3.在Flume中的conf配置文件,并将收集的日志下层到kafka中
a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/info.log
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.com.toto.stromlogpro.flume.AppInterceptor$AppInterceptorBuilder
#通过这个参数向自定义的Flume拦截器中传递参数(即系统编号)
a1.sources.r1.interceptors.i1.appId = 1a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = log_monitor
a1.sinks.k1.brokerList = hadoop1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
版权声明:本文为博主原创文章,未经博主允许不得转载。