通用型Flume Source插件和Flume Hbase Serializer插件

Github: https://github.com/ares333/apache-flume

Flume Source插件要实现的功能是,监控某个目录下面的子目录中的日志,获取日志句柄定时读取(默认1秒),定时(默认1分钟)检查文件是否过期(通过文件修改时间判断),如果过期移动文件到_finished目录(这个目录可以自定义,并且收集的时候自动会略此目录),定时检测目录下是否有新文件(默认为1分钟),为了简化配置文件一台服务器只需要一个channel即可。Event会附加编码,类型(配置文件中指定),和当前目录名称,方便Sink端逻辑处理。插件的各项指标绝对最优。

直接上代码:

package com.joyport.flume.source;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Ares extends AbstractSource implements Configurable,
		PollableSource {
	private static final Logger logger = LoggerFactory.getLogger(Ares.class);
	private String finishDir = "_finished";
	private int maxThread = 1024; 
	private Map<String, HashMap<String, String>> param = Collections
			.synchronizedMap(new HashMap<String, HashMap<String, String>>());
	private int keepAlive;

	private Timer scannerTimer = new Timer();
	private LinkedBlockingQueue<HashMap<String, String>> queue = new LinkedBlockingQueue<HashMap<String, String>>(
			10000);
	private Map<String, Thread> threadList = Collections
			.synchronizedMap(new HashMap<String, Thread>());
	private boolean stop = false;

	public void configure(Context context) {
		// 主目录,主目录|类型,文件超时,编码|子目录
		String[] lines = context.getString("param").split("s");
		for (String v : lines) {
			String[] nodes = v.split("|", 3);
			String[] node2 = nodes[1].split(",", 3);
			if (!this.param.containsKey(nodes[0])) {
				this.param.put(nodes[0], new HashMap<String, String>());
			}
			if (node2[2].isEmpty()) {
				throw new FlumeException("charset is empty");
			}
			this.param.get(nodes[0]).put("type", node2[0]);
			this.param.get(nodes[0]).put("expire", node2[1]);
			this.param.get(nodes[0]).put("charset", node2[2]);
			this.param.get(nodes[0]).put("dir", nodes[2]);
		}
		this.keepAlive = context.getInteger("keep-alive");
	}

	/**
	 * 定时检查this.path如果有新文件就读取
	 */
	public synchronized void start() {
		super.start();
		scannerTimer.scheduleAtFixedRate(new TimerTask() {
			public void run() {
				if (Ares.this.threadList.size() < Ares.this.maxThread) {
					for (String k : Ares.this.param.keySet()) {
						String[] kArray = k.split(",");
						for (String k1 : kArray) {
							File dirTop = new File(k1);
							if (dirTop.exists()) {
								for (String v1 : Ares.this.param.get(k)
										.get("dir").split(",")) {
									File dirSub = new File(dirTop
											.getAbsolutePath() + "/" + v1);
									if (dirSub.isDirectory()) {
										HashMap<String, String> info = new HashMap<String, String>(
												Ares.this.param.get(k));
										File[] nodeFiles = dirSub.listFiles();
										for (File v2 : nodeFiles) {
											if (v2.isFile()
													&& !Ares.this.threadList
															.containsKey(v2
																	.getPath())) {
												FileReader reader = new FileReader(
														v2.getAbsolutePath(),
														info);
												Thread thread = new Thread(
														reader);
												thread.start();
												Ares.this.threadList.put(
														v2.getAbsolutePath(),
														thread);
											}
										}
									}
								}
							}
						}
					}
				}
			}
		}, 0L, 60 * 1000);
	}

	/**
	 * 关闭文件句柄等善后工作
	 */
	public synchronized void stop() {
		super.stop();
		this.scannerTimer.cancel();
		this.stop = true;
		try {
			while (this.threadList.size() > 0) {
				Thread.sleep(1000);
			}
		} catch (InterruptedException e) {
			Ares.logger.error(e.getMessage());
		}
	}

	/**
	 * 发送Event到Channel
	 * 
	 * @throws EventDeliveryException
	 */
	@Override
	public synchronized Status process() throws EventDeliveryException {
		Status status = Status.READY;
		try {
			while (this.queue.size() > 0) {
				while (this.queue.size() > 0) {
					HashMap<String, String> node = this.queue.take();
					HashMap<String, String> header = new HashMap<String, String>();
					header.put("charset", node.get("charset"));
					header.put("type", node.get("type"));
					header.put("dir", node.get("dir"));
					this.getChannelProcessor().processEvent(
							EventBuilder.withBody(node.get(null),
									Charset.forName(node.get("charset")),
									header));
				}
			}
		} catch (Throwable e) {
			StringWriter sw = new StringWriter();
			e.printStackTrace(new PrintWriter(sw));
			logger.error(sw.toString());
		}
		return status;
	}

	/**
	 * 读文件到this.queue,如果文件完成就移走。
	 * 
	 */
	class FileReader implements Runnable {
		String file;
		HashMap<String, String> info = new HashMap<String, String>();

		public FileReader(String file, HashMap<String, String> info) {
			this.file = file;
			this.info = info;
		}

		@Override
		public void run() {
			try {
				File file = new File(this.file);
				BufferedReader reader = new BufferedReader(
						new InputStreamReader(new FileInputStream(file),
								this.info.get("charset")));
				int i = 0;
				while (true) {
					String line = reader.readLine();
					if (null != line) {
						HashMap<String, String> node = new HashMap<String, String>();
						node.put(null, line);
						node.put("type", this.info.get("type"));
						node.put("charset", this.info.get("charset"));
						node.put("dir", file.getParentFile().getName());
						Ares.this.queue.offer(node, Ares.this.keepAlive,
								TimeUnit.SECONDS);
					} else {
						Thread.sleep(1000);
						if (++i == 60) {
							i = 0;
							Date date = new Date();
							if (null != this.info.get("expire")
									&& date.getTime() - file.lastModified() > new Long(
											this.info.get("expire")) * 1000) {
								File dir = new File(file.getParent() + "/"
										+ Ares.this.finishDir);
								if (!dir.exists()) {
									dir.mkdir();
								}
								file.renameTo(new File(dir.getPath() + "/"
										+ file.getName()));
								break;
							}
						}
					}
					if (Ares.this.stop) {
						break;
					}
				}
				reader.close();
			} catch (FileNotFoundException e) {
				Ares.logger.error(e.getMessage());
			} catch (InterruptedException e) {
				Ares.logger.error(e.getMessage());
			} catch (UnsupportedEncodingException e) {
				Ares.logger.error(e.getMessage());
			} catch (IOException e) {
				Ares.logger.error(e.getMessage());
			}
			Ares.this.threadList.remove(this.file);
		}
	}
}

HBase Serializer实现功能是,根据event的type和dir和sink端配置确定表名,多种类型Event可以写入一张表,一个Event可以写入多张表,各项指标绝对最优。代码:

package com.joyport.flume.sink.hbase;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;

public class AresSerializer implements AsyncHbaseEventSerializer {
	private String tableStr;
	private HashMap<String, HashMap<String, String[]>> tables = new HashMap<String, HashMap<String, String[]>>();
	private byte[] cf;

	private HashSet<byte[]> table = new HashSet<byte[]>();
	private String charset;
	private HashMap<byte[], byte[]> rowkey = new HashMap<byte[], byte[]>();
	private HashMap<byte[], byte[][]> qualifiers = new HashMap<byte[], byte[][]>();
	private HashMap<byte[], byte[][]> values = new HashMap<byte[], byte[][]>();
	private static final Logger logger = LoggerFactory
			.getLogger(AresSerializer.class);

	/**
	 * type,dir,table,table...|qualifiers|rowkey
	 */
	@Override
	public synchronized void initialize(byte[] table, byte[] cf) {
		for (String v : this.tableStr.split("s")) {
			String nodes[] = v.split("|", 3);
			String node1[] = nodes[0].split(",", 3);
			String key = node1[0] + "/" + node1[1];
			if (!this.tables.containsKey(key)) {
				this.tables.put(key, new HashMap<String, String[]>());
			}
			// node1[2]:表名
			this.tables.get(key).put(node1[2],
					new String[] { nodes[1], nodes[2] });
		}
		this.cf = cf;
	}

	/**
	 * 转换Event
	 */
	@Override
	public void setEvent(Event event) {
		try {
			this.table.clear();
			this.rowkey.clear();
			this.qualifiers.clear();
			this.values.clear();
			Map<String, String> header = event.getHeaders();
			String cols[] = new String(event.getBody(), header.get("charset"))
					.split("t");
			String key = header.get("type") + "/" + header.get("dir");
			this.charset = header.get("charset");
			if (!this.tables.containsKey(key)) {
				logger.error("table not found, type=" + header.get("type")
						+ ", dir=" + header.get("dir"));
			} else {
				HashMap<String, String[]> node = this.tables.get(key);
				// k就是表名
				for (String k : node.keySet()) {
					List<String> qualifiersList = Arrays.asList(node.get(k)[0]
							.split(","));
					String[] rowkeyArray = node.get(k)[1].split(",");
					// 计算rowkey
					String rowkey = "";
					for (String v : rowkeyArray) {
						int pos = qualifiersList.indexOf(v);
						if (-1 == pos) {
							throw new FlumeException("rowkey not found in "
									+ this.getClass().getName() + ", rowkey="
									+ v);
						} else if (pos < cols.length) {
							rowkey += cols[pos] + "_";
						} else {
							return;
						}
					}
					byte[] table = k.getBytes(this.charset);
					this.table.add(table);
					this.rowkey.put(
							table,
							rowkey.substring(0, rowkey.length() - 1).getBytes(
									this.charset));
					// 计算qualifiers和values
					int length = cols.length;
					if (length > qualifiersList.size()) {
						length = qualifiersList.size();
					}
					byte[][] qualifiers = new byte[length][];
					byte[][] values = new byte[length][];
					for (int i = 0; i < length; i++) {
						qualifiers[i] = qualifiersList.get(i).getBytes(
								this.charset);
						values[i] = cols[i].getBytes(this.charset);
					}
					this.qualifiers.put(table, qualifiers);
					this.values.put(table, values);
				}
			}
		} catch (UnsupportedEncodingException e) {
			logger.error(e.getMessage());
		}
	}

	@Override
	public List<PutRequest> getActions() {
		ArrayList<PutRequest> actions = new ArrayList<PutRequest>();
		for (byte[] v : this.table) {
			PutRequest req = new PutRequest(v, this.rowkey.get(v), this.cf,
					this.qualifiers.get(v), this.values.get(v));
			try {
				String row = new String(v, this.charset) + "t"
						+ new String(this.rowkey.get(v), this.charset) + "t";
				for (byte[] v1 : this.qualifiers.get(v)) {
					row += new String(v1, this.charset) + ",";
				}
				row = row.substring(0, row.length() - 1) + "t";
				for (byte[] v1 : this.values.get(v)) {
					row += new String(v1, this.charset) + ",";
				}
				row = row.substring(0, row.length() - 1);
				logger.debug(row);
			} catch (UnsupportedEncodingException e) {
				logger.error(e.getMessage());
			}
			actions.add(req);
		}
		return actions;
	}

	@Override
	public List<AtomicIncrementRequest> getIncrements() {
		List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
		for (byte[] v : this.table) {
			AtomicIncrementRequest inc = new AtomicIncrementRequest(v,
					"inc".getBytes(Charsets.UTF_8), this.cf,
					"inc".getBytes(Charsets.UTF_8));
			actions.add(inc);
		}
		return actions;
	}

	@Override
	public void cleanUp() {
	}

	@Override
	public synchronized void configure(Context context) {
		this.tableStr = context.getString("tables");
	}

	@Override
	public void configure(ComponentConfiguration conf) {
	}
}

插件上传到flume安装目录的conf目录即可,也可打包成jar放到lib下。使用本逻辑可以大大简化flume的配置文件:一个典型的配置文件

source.sources = r1
source.channels = c1
source.sinks = k1

source.sources.r1.channels = c1
source.sources.r1.type = com.joyport.flume.source.Ares
source.sources.r1.keep-alive = 600
source.sources.r1.param = 
/logcenter/_flume/rxsg,/rxsg2/_flume,/mjhx/logcenter/_flume,/shipgogogo/_flume,/zlsg/_flume,/mjsj/_flume,/longquan/_flume,/mjhx3/_flume,/data/home/app1101052309/logcenter/_flume/hwsg,/mingjiang/_flume,/mssg/_flume,/data/flume/wsj,/rxjs/_flume,/mjsg/_flume,/ttgj/_flume|game,86400,UTF-8|player_active,player_login,player_pay,player_cost,player_role,player_gameinfo,server_player,player_spay 
/data/apache/www/_flume|ledu,86400,UTF-8|ld_user,ld_pay,ld_active,ld_login,ld_mail,ld_mobile,ld_idcard 
/data/web/sy/_flume|sy,86400,UTF-8|sy_download 
/data/web/xdw/_flume|xdw,86400,UTF-8|xdw_user,xdw_login,xdw_active,xdw_mail,xdw_mobile,xdw_idcard,xdw_pay 
/data|ledu_crossbar,,UTF-8|tracepng_logs

source.channels.c1.type = memory
source.channels.c1.capacity = 100000
source.channels.c1.transactionCapacity = 10000
source.channels.c1.byteCapacityBufferPercentage = 20
source.channels.c1.byteCapacity = 104857600

source.sinks.k1.channel = c1
source.sinks.k1.type = avro
source.sinks.k1.hostname = 192.168.2.201
source.sinks.k1.port = 30001
source.sinks.k1.request-timeout = 600000
source.sinks.k1.connect-timeout = 300000
source.sinks.k1.reset-connection-interval = 
source.sinks.k1.batch-size = 10000
source.sinks.k1.compression-type = deflate
source.sinks.k1.maxIoWorkers = 4

sink.sources = r1
sink.channels = c1 c2
sink.sinks = k1 k2

sink.sources.r1.channels = c1 c2
sink.sources.r1.type = avro
sink.sources.r1.bind = 0.0.0.0
sink.sources.r1.port = 30001
sink.sources.r1.threads = 20000
sink.sources.r1.compression-type = deflate
sink.sources.r1.selector.type = multiplexing
sink.sources.r1.selector.header = dir
sink.sources.r1.selector.mapping.ld_pay = c1
sink.sources.r1.selector.mapping.xdw_pay = c1
sink.sources.r1.selector.mapping.player_pay = c1
sink.sources.r1.selector.default = c2

sink.channels.c1.type = file
sink.channels.c1.keep-alive = 600
sink.channels.c1.checkpointInterval = 30000
sink.channels.c1.transactionCapacity = 10000
sink.channels.c1.capacity = 100000
sink.channels.c1.checkpointDir = /tmp/flume-1.5.0.1/sink/file-channel/c1/checkpoint
sink.channels.c1.dataDirs = /tmp/flume-1.5.0.1/sink/file-channel/c1/data

sink.channels.c2.type = memory
sink.channels.c2.capacity = 100000
sink.channels.c2.keep-alive = 600
sink.channels.c2.transactionCapacity = 10000
sink.channels.c2.byteCapacityBufferPercentage = 20
sink.channels.c2.byteCapacity = 104857600

sink.sinks.k1.channel = c1
sink.sinks.k1.type = asynchbase
sink.sinks.k1.table = _flume
sink.sinks.k1.columnFamily = cf
sink.sinks.k1.batchSize = 1000
sink.sinks.k1.serializer = com.joyport.flume.sink.hbase.AresSerializer
sink.sinks.k1.serializer.tables = 
game,player_pay,gm_pay_2|uid,passport,passtype,game_id,server_id,order_id,money,point,point_free,type,time,server_no|time,game_id,server_id,order_id 
ledu,ld_pay,ld_pay_2|order_id,uid,username,pay_money,pay_time,pay_ip,game_id,server_id,pay_channel|pay_time,order_id 
xdw,xdw_pay,xdw_pay_2|order_id,uid,username,pay_money,pay_time,pay_ip,game_id,server_id,pay_channel|pay_time,order_id

sink.sinks.k2.channel = c2
sink.sinks.k2.type = asynchbase
sink.sinks.k2.table = _flume
sink.sinks.k2.columnFamily = cf
sink.sinks.k2.batchSize = 10000
sink.sinks.k2.serializer = com.joyport.flume.sink.hbase.AresSerializer
sink.sinks.k2.serializer.tables = 
game,player_active,gm_active_2|passport,passtype,game_id,server_id,active_time,ucode,server_no|active_time,game_id,server_id,passtype,passport 
game,player_active,gm_user_info|passport,passtype,game_id,server_id,active_time,ucode,server_no|game_id,server_id,passtype,passport 
game,player_login,gm_login_2|uid,passport,passtype,game_id,server_id,login_time,ip,server_no|login_time,game_id,server_id,passtype,passport 
game,player_cost,gm_cost_2|uid,passport,passtype,game_id,server_id,role_name,ext_data,cost_point,time,type,role_id,server_no,cost_id|time,game_id,server_id,cost_id 
game,player_role,gm_user_info|uid,passport,passtype,game_id,server_id,role_ctime,role_name,role_sex,role_prof,server_no|game_id,server_id,passtype,passport 
game,player_gameinfo,gm_user_info|uid,passport,passtype,game_id,server_id,role_name,role_info1,role_info2,role_info3,role_info4,time,server_no|game_id,server_id,passtype,passport 
game,server_player,gm_online_2|game_id,server_id,time,player_count,server_no|time,game_id,server_id 
game,player_spay,gm_player_spay|order_id,uid,passport,passtype,game_id,server_id,level,vip,money,point,point_free,type,is_1stpay,time|time,game_id,passtype,server_id,order_id 

ledu,ld_user,ld_user|uid,username,reg_time,reg_ip,site_id,page_id,sub_page_id,type|uid 
ledu,ld_mail,ld_user|uid,username,bind_email,bind_email_time|uid 
ledu,ld_mobile,ld_user|uid,username,bind_mobile,bind_mobile_time|uid 
ledu,ld_idcard,ld_user|uid,username,realname,idcard,verify_time|uid 
ledu,ld_login,ld_login_2|uid,username,login_time,login_ip,game_id,server_id|login_time,uid,game_id,server_id 
ledu,ld_login,ld_login|uid,username,login_time,login_ip,game_id,server_id|uid,game_id,server_id,login_time 
ledu,ld_active,ld_active_2|uid,username,active_time,game_id,server_id,status,reg_time,site_id,page_id,sub_page_id|uid,game_id,server_id 
ledu,ld_active,ld_active|uid,username,active_time,game_id,server_id,status,reg_time,site_id,page_id,sub_page_id|active_time,uid,game_id,server_id 

xdw,xdw_user,xdw_user|uid,username,reg_time,reg_ip,site_id,page_id,sub_page_id,type|uid 
xdw,xdw_mail,xdw_user|uid,username,bind_email,bind_email_time|uid 
xdw,xdw_mobile,xdw_user|uid,username,bind_mobile,bind_mobile_time|uid 
xdw,xdw_idcard,xdw_user|uid,username,realname,idcard,verify_time|uid 
xdw,xdw_login,xdw_login|uid,username,login_time,login_ip,game_id,server_id|uid,game_id,server_id,login_time 
xdw,xdw_login,xdw_login_2|uid,username,login_time,login_ip,game_id,server_id|login_time,uid,game_id,server_id 
xdw,xdw_active,xdw_active|uid,username,active_time,game_id,server_id,status,reg_time,site_id,page_id,sub_page_id|active_time,uid,game_id,server_id 
xdw,xdw_active,xdw_active_2|uid,username,active_time,game_id,server_id,status,reg_time,site_id,page_id,sub_page_id|uid,game_id,server_id 

sy,sy_download,sy_download|uuid,platform,channel_id,game_id,time,type,status,ip|time,platform,game_id,uuid 

ledu_crossbar,tracepng_logs,ld_crossbar_click|time,ip,type,uid,href,cate,host|time,ip

启动Source:

/usr/lib/jvm/jre/bin/java -Xmx2g -Dflume.root.logger=WARN,source -cp /usr/local/apache-flume-1.5.0.1-bin/conf:/usr/local/apache-flume-1.5.0.1-bin/lib/* -Djava.library.path= org.apache.flume.node.Application -n source -f /usr/local/apache-flume-1.5.0.1-bin/conf/flume.conf

启动Sink:

/usr/lib/jvm/jre/bin/java -Xmx2g -Dflume.root.logger=WARN,sink -cp /usr/local/apache-flume-1.5.0.1-bin/conf:/usr/local/apache-flume-1.5.0.1-bin/lib/* -Djava.library.path= org.apache.flume.node.Application -n sink -f /usr/local/apache-flume-1.5.0.1-bin/conf/flume.conf

log4j.properties自行配置。

flumectl脚本

#!/bin/bash
. /etc/rc.d/init.d/functions
debug=INFO
#脚本文件必须位于flume的bin目录,执行脚本目录无所谓,flume所在目录无所谓,脚本不能做链接执行,可以用alias实现。
if [ "/" != ${0:0:1} ];then
        dirCurrent=$(pwd)/$0
else
        dirCurrent=$0
fi
dirCurrent=${dirCurrent//.///}
dirCurrent=${dirCurrent%/*}
dirBase=${dirCurrent%/*}
pid=
RETVAL=0

running(){
        cmd='ps -o pid,command -C java|grep "[o]rg.apache.flume.node.Application"|grep "-[n] "'$1
        cmd=$cmd"|awk '{print $1}'"
        pid=(`eval $cmd`)
        if [ ${#pid[*]} -gt 0 ];then
                echo -n $pid
                return 0
        else
                return 1
        fi
}

stop(){
        running $1 > /dev/null
        echo -n "stoping $1... "
        for v in ${pid[@]}
        do
                kill -9 $v
        done
        if [ ${#pid[@]} -gt 0 ];then
                echo_success
        else
                echo_failure
        fi
        echo
}

start(){
        running $1 > /dev/null
        r=$?
        if [ $r -eq 0 ];then
                echo -n "$1 is running, pid=$pid"
                echo_failure
                echo
        else
                echo -n "starting $1... "
                cd $dirBase;bin/flume-ng agent -n $1 -c conf -f $dirBase/conf/flume.conf -Dflume.root.logger=$debug,$1 > $dirBase/logs/$1.out 2>&1 &
                echo_success
                echo
        fi
}

restart(){
        stop $1
        start $1
}

case $1 in
"start"|"stop"|"restart"|"running")
        case $2 in
        "source"|"sink")
                $1 $2
                RETVAL=$?
                ;;
        *)
                echo "invalid flume type, type="$2
                ;;
        esac
        ;;
*)
        echo -e "Usage:nstart,stop,restart,running [source|sink]"
        ;;
esac
exit $RETVAL

发布版本的脚本releash.sh:

#!/bin/bash
dirBase='/usr/local/apache-flume-1.5.0.1-bin'
name=${dirBase##*/}
#xx.xx.xx.xx
ipSink=xx.xx.xx.xx
RETVAL=0

#可带一个local参数,这样就不会scp到node3上
release(){
        rm -f $dirBase/logs/*
        sed "s/source.sinks.k1.hostname = 192.168.2.201/source.sinks.k1.hostname = $ipSink/" $dirBase/conf/flume.conf > $dirBase/conf/flume.conf.online
        cat $dirBase/tools/installer.sh > $dirBase/$name
        cd $dirBase/conf && jar -c com > $dirBase/lib/com.joyport.flume.jar
        tar -cz -C $dirBase/.. $name --exclude $name/$name --exclude $name/conf/com --exclude $name/tools/installer.sh --exclude $name/tools/release.sh --exclude $name/tools/backup.sh --exclude $name/tools/batch.sh --exclude $name/tools/test.sh >> $dirBase/$name
        rm -f $dirBase/conf/flume.conf.online
        chmod u+x $dirBase/$name
        if [ "$1" = 'tested' ]; then
                local webFile='/data/apache/www/1miao/apache-flume-1.5.0.1-bin'
                scp /usr/local/apache-flume-1.5.0.1-bin/apache-flume-1.5.0.1-bin node3:/data/apache/www/1miao/
                ssh node3 'chmod +x '$webFile';cp -f '$webFile' ~'
        fi
}

case $1 in
*)
        release $1
        ;;
esac

installer.sh

#!/bin/bash
dirInstall='/usr/local/apache-flume-1.5.0.1-bin'
fileName='apache-flume-1.5.0.1-bin'
file=$0
lastLine=`tail -n 1 $0|cut -c-13`
fileBinStart=`awk '/^__BIN_START__/{print NR+1;exit 0;}' $0`

if test "$lastLine" = '__BIN_START__'
then
        isInstaller=false
else
        isInstaller=true
fi

tarExtract(){
        if [ ! -d $dirInstall ]; then
                mkdir $dirInstall
        fi
        tail -n +$fileBinStart $file | tar -xz -C $dirInstall/..
}

case $1 in
#$1:local|ip地址 $2:local $3:noInit
*)
        if [ "false" = $isInstaller ]; then
                echo 'not a installer'
                exit 1
        fi
        if [ "$1" = 'local' ]; then
                tarExtract
                if [ "$2" != 'local' ]; then
                        cd $dirInstall && mv -f conf/hbase-site.xml.online conf/hbase-site.xml && mv -f conf/flume.conf.online conf/flume.conf
                fi
                if [ "$3" != 'noInit' ]; then
                        cd $dirInstall && pid=`bin/flumectl running source`;if [ $? -eq 0 ];then echo -n 'source is running, pid='$pid', ';bin/flumectl stop source;fi;bin/flumectl start source
                fi
                rm -f $dirInstall/$fileName
        else
                if [ $1 ]; then
                        ssh $1 ':'
                        if [ $? -ne 0 ];then
                                exit 1
                        fi
                        ssh $1 "if [ ! -d $dirInstall ]; then mkdir $dirInstall; fi"
                        scp $file $1:$dirInstall && ssh $1 "cd $dirInstall && $file local $2 $3"
                else
                        echo 'wrong param'
                        exit 1
                fi
        fi
        ;;
esac
exit $RETVAL
__BIN_START__

backup.sh

#!/bin/bash
tar -C /usr/local/ -czf /usr/local/apache-flume-1.5.0.1-bin.tgz apache-flume-1.5.0.1-bin/ --exclude apache-flume-1.5.0.1-bin/apache-flume-1.5.0.1-bin

 

通用型插件会带来一个问题,所有服务器都用这个配置只要目录下有日志就会收集,可能有些服务器收集目录下不是要收集的日志,比如服务器1 /data/logs是要收集的日志,服务器2也有 /data/logs目录但是里面是其他类型的日志,这样就可能出问题了。

通用型Flume Source插件和Flume Hbase Serializer插件》上有1条评论

发表评论

电子邮件地址不会被公开。

*