Github: https://github.com/ares333/apache-flume
Flume Source插件要实现的功能是,监控某个目录下面的子目录中的日志,获取日志句柄定时读取(默认1秒),定时(默认1分钟)检查文件是否过期(通过文件修改时间判断),如果过期移动文件到_finished目录(这个目录可以自定义,并且收集的时候自动会略此目录),定时检测目录下是否有新文件(默认为1分钟),为了简化配置文件一台服务器只需要一个channel即可。Event会附加编码,类型(配置文件中指定),和当前目录名称,方便Sink端逻辑处理。插件的各项指标绝对最优。
直接上代码:
package com.joyport.flume.source; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Ares extends AbstractSource implements Configurable, PollableSource { private static final Logger logger = LoggerFactory.getLogger(Ares.class); private String finishDir = "_finished"; private int maxThread = 1024; private Map<String, HashMap<String, String>> param = Collections .synchronizedMap(new HashMap<String, HashMap<String, String>>()); private int keepAlive; private Timer scannerTimer = new Timer(); private LinkedBlockingQueue<HashMap<String, String>> queue = new LinkedBlockingQueue<HashMap<String, String>>( 10000); private Map<String, Thread> threadList = Collections .synchronizedMap(new HashMap<String, Thread>()); private boolean stop = false; public void configure(Context context) { // 主目录,主目录|类型,文件超时,编码|子目录 String[] lines = context.getString("param").split("s"); for (String v : lines) { String[] nodes = v.split("|", 3); String[] node2 = nodes[1].split(",", 3); if (!this.param.containsKey(nodes[0])) { this.param.put(nodes[0], new HashMap<String, String>()); } if (node2[2].isEmpty()) { throw new FlumeException("charset is empty"); } this.param.get(nodes[0]).put("type", node2[0]); this.param.get(nodes[0]).put("expire", node2[1]); this.param.get(nodes[0]).put("charset", node2[2]); this.param.get(nodes[0]).put("dir", nodes[2]); } this.keepAlive = context.getInteger("keep-alive"); } /** * 定时检查this.path如果有新文件就读取 */ public synchronized void start() { super.start(); scannerTimer.scheduleAtFixedRate(new TimerTask() { public void run() { if (Ares.this.threadList.size() < Ares.this.maxThread) { for (String k : Ares.this.param.keySet()) { String[] kArray = k.split(","); for (String k1 : kArray) { File dirTop = new File(k1); if (dirTop.exists()) { for (String v1 : Ares.this.param.get(k) .get("dir").split(",")) { File dirSub = new File(dirTop .getAbsolutePath() + "/" + v1); if (dirSub.isDirectory()) { HashMap<String, String> info = new HashMap<String, String>( Ares.this.param.get(k)); File[] nodeFiles = dirSub.listFiles(); for (File v2 : nodeFiles) { if (v2.isFile() && !Ares.this.threadList .containsKey(v2 .getPath())) { FileReader reader = new FileReader( v2.getAbsolutePath(), info); Thread thread = new Thread( reader); thread.start(); Ares.this.threadList.put( v2.getAbsolutePath(), thread); } } } } } } } } } }, 0L, 60 * 1000); } /** * 关闭文件句柄等善后工作 */ public synchronized void stop() { super.stop(); this.scannerTimer.cancel(); this.stop = true; try { while (this.threadList.size() > 0) { Thread.sleep(1000); } } catch (InterruptedException e) { Ares.logger.error(e.getMessage()); } } /** * 发送Event到Channel * * @throws EventDeliveryException */ @Override public synchronized Status process() throws EventDeliveryException { Status status = Status.READY; try { while (this.queue.size() > 0) { while (this.queue.size() > 0) { HashMap<String, String> node = this.queue.take(); HashMap<String, String> header = new HashMap<String, String>(); header.put("charset", node.get("charset")); header.put("type", node.get("type")); header.put("dir", node.get("dir")); this.getChannelProcessor().processEvent( EventBuilder.withBody(node.get(null), Charset.forName(node.get("charset")), header)); } } } catch (Throwable e) { StringWriter sw = new StringWriter(); e.printStackTrace(new PrintWriter(sw)); logger.error(sw.toString()); } return status; } /** * 读文件到this.queue,如果文件完成就移走。 * */ class FileReader implements Runnable { String file; HashMap<String, String> info = new HashMap<String, String>(); public FileReader(String file, HashMap<String, String> info) { this.file = file; this.info = info; } @Override public void run() { try { File file = new File(this.file); BufferedReader reader = new BufferedReader( new InputStreamReader(new FileInputStream(file), this.info.get("charset"))); int i = 0; while (true) { String line = reader.readLine(); if (null != line) { HashMap<String, String> node = new HashMap<String, String>(); node.put(null, line); node.put("type", this.info.get("type")); node.put("charset", this.info.get("charset")); node.put("dir", file.getParentFile().getName()); Ares.this.queue.offer(node, Ares.this.keepAlive, TimeUnit.SECONDS); } else { Thread.sleep(1000); if (++i == 60) { i = 0; Date date = new Date(); if (null != this.info.get("expire") && date.getTime() - file.lastModified() > new Long( this.info.get("expire")) * 1000) { File dir = new File(file.getParent() + "/" + Ares.this.finishDir); if (!dir.exists()) { dir.mkdir(); } file.renameTo(new File(dir.getPath() + "/" + file.getName())); break; } } } if (Ares.this.stop) { break; } } reader.close(); } catch (FileNotFoundException e) { Ares.logger.error(e.getMessage()); } catch (InterruptedException e) { Ares.logger.error(e.getMessage()); } catch (UnsupportedEncodingException e) { Ares.logger.error(e.getMessage()); } catch (IOException e) { Ares.logger.error(e.getMessage()); } Ares.this.threadList.remove(this.file); } } }
HBase Serializer实现功能是,根据event的type和dir和sink端配置确定表名,多种类型Event可以写入一张表,一个Event可以写入多张表,各项指标绝对最优。代码:
package com.joyport.flume.sink.hbase; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.AsyncHbaseEventSerializer; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.PutRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; public class AresSerializer implements AsyncHbaseEventSerializer { private String tableStr; private HashMap<String, HashMap<String, String[]>> tables = new HashMap<String, HashMap<String, String[]>>(); private byte[] cf; private HashSet<byte[]> table = new HashSet<byte[]>(); private String charset; private HashMap<byte[], byte[]> rowkey = new HashMap<byte[], byte[]>(); private HashMap<byte[], byte[][]> qualifiers = new HashMap<byte[], byte[][]>(); private HashMap<byte[], byte[][]> values = new HashMap<byte[], byte[][]>(); private static final Logger logger = LoggerFactory .getLogger(AresSerializer.class); /** * type,dir,table,table...|qualifiers|rowkey */ @Override public synchronized void initialize(byte[] table, byte[] cf) { for (String v : this.tableStr.split("s")) { String nodes[] = v.split("|", 3); String node1[] = nodes[0].split(",", 3); String key = node1[0] + "/" + node1[1]; if (!this.tables.containsKey(key)) { this.tables.put(key, new HashMap<String, String[]>()); } // node1[2]:表名 this.tables.get(key).put(node1[2], new String[] { nodes[1], nodes[2] }); } this.cf = cf; } /** * 转换Event */ @Override public void setEvent(Event event) { try { this.table.clear(); this.rowkey.clear(); this.qualifiers.clear(); this.values.clear(); Map<String, String> header = event.getHeaders(); String cols[] = new String(event.getBody(), header.get("charset")) .split("t"); String key = header.get("type") + "/" + header.get("dir"); this.charset = header.get("charset"); if (!this.tables.containsKey(key)) { logger.error("table not found, type=" + header.get("type") + ", dir=" + header.get("dir")); } else { HashMap<String, String[]> node = this.tables.get(key); // k就是表名 for (String k : node.keySet()) { List<String> qualifiersList = Arrays.asList(node.get(k)[0] .split(",")); String[] rowkeyArray = node.get(k)[1].split(","); // 计算rowkey String rowkey = ""; for (String v : rowkeyArray) { int pos = qualifiersList.indexOf(v); if (-1 == pos) { throw new FlumeException("rowkey not found in " + this.getClass().getName() + ", rowkey=" + v); } else if (pos < cols.length) { rowkey += cols[pos] + "_"; } else { return; } } byte[] table = k.getBytes(this.charset); this.table.add(table); this.rowkey.put( table, rowkey.substring(0, rowkey.length() - 1).getBytes( this.charset)); // 计算qualifiers和values int length = cols.length; if (length > qualifiersList.size()) { length = qualifiersList.size(); } byte[][] qualifiers = new byte[length][]; byte[][] values = new byte[length][]; for (int i = 0; i < length; i++) { qualifiers[i] = qualifiersList.get(i).getBytes( this.charset); values[i] = cols[i].getBytes(this.charset); } this.qualifiers.put(table, qualifiers); this.values.put(table, values); } } } catch (UnsupportedEncodingException e) { logger.error(e.getMessage()); } } @Override public List<PutRequest> getActions() { ArrayList<PutRequest> actions = new ArrayList<PutRequest>(); for (byte[] v : this.table) { PutRequest req = new PutRequest(v, this.rowkey.get(v), this.cf, this.qualifiers.get(v), this.values.get(v)); try { String row = new String(v, this.charset) + "t" + new String(this.rowkey.get(v), this.charset) + "t"; for (byte[] v1 : this.qualifiers.get(v)) { row += new String(v1, this.charset) + ","; } row = row.substring(0, row.length() - 1) + "t"; for (byte[] v1 : this.values.get(v)) { row += new String(v1, this.charset) + ","; } row = row.substring(0, row.length() - 1); logger.debug(row); } catch (UnsupportedEncodingException e) { logger.error(e.getMessage()); } actions.add(req); } return actions; } @Override public List<AtomicIncrementRequest> getIncrements() { List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>(); for (byte[] v : this.table) { AtomicIncrementRequest inc = new AtomicIncrementRequest(v, "inc".getBytes(Charsets.UTF_8), this.cf, "inc".getBytes(Charsets.UTF_8)); actions.add(inc); } return actions; } @Override public void cleanUp() { } @Override public synchronized void configure(Context context) { this.tableStr = context.getString("tables"); } @Override public void configure(ComponentConfiguration conf) { } }
插件上传到flume安装目录的conf目录即可,也可打包成jar放到lib下。使用本逻辑可以大大简化flume的配置文件:一个典型的配置文件
source.sources = r1 source.channels = c1 source.sinks = k1 source.sources.r1.channels = c1 source.sources.r1.type = com.joyport.flume.source.Ares source.sources.r1.keep-alive = 600 source.sources.r1.param = /logcenter/_flume/rxsg,/rxsg2/_flume,/mjhx/logcenter/_flume,/shipgogogo/_flume,/zlsg/_flume,/mjsj/_flume,/longquan/_flume,/mjhx3/_flume,/data/home/app1101052309/logcenter/_flume/hwsg,/mingjiang/_flume,/mssg/_flume,/data/flume/wsj,/rxjs/_flume,/mjsg/_flume,/ttgj/_flume|game,86400,UTF-8|player_active,player_login,player_pay,player_cost,player_role,player_gameinfo,server_player,player_spay /data/apache/www/_flume|ledu,86400,UTF-8|ld_user,ld_pay,ld_active,ld_login,ld_mail,ld_mobile,ld_idcard /data/web/sy/_flume|sy,86400,UTF-8|sy_download /data/web/xdw/_flume|xdw,86400,UTF-8|xdw_user,xdw_login,xdw_active,xdw_mail,xdw_mobile,xdw_idcard,xdw_pay /data|ledu_crossbar,,UTF-8|tracepng_logs source.channels.c1.type = memory source.channels.c1.capacity = 100000 source.channels.c1.transactionCapacity = 10000 source.channels.c1.byteCapacityBufferPercentage = 20 source.channels.c1.byteCapacity = 104857600 source.sinks.k1.channel = c1 source.sinks.k1.type = avro source.sinks.k1.hostname = 192.168.2.201 source.sinks.k1.port = 30001 source.sinks.k1.request-timeout = 600000 source.sinks.k1.connect-timeout = 300000 source.sinks.k1.reset-connection-interval = source.sinks.k1.batch-size = 10000 source.sinks.k1.compression-type = deflate source.sinks.k1.maxIoWorkers = 4 sink.sources = r1 sink.channels = c1 c2 sink.sinks = k1 k2 sink.sources.r1.channels = c1 c2 sink.sources.r1.type = avro sink.sources.r1.bind = 0.0.0.0 sink.sources.r1.port = 30001 sink.sources.r1.threads = 20000 sink.sources.r1.compression-type = deflate sink.sources.r1.selector.type = multiplexing sink.sources.r1.selector.header = dir sink.sources.r1.selector.mapping.ld_pay = c1 sink.sources.r1.selector.mapping.xdw_pay = c1 sink.sources.r1.selector.mapping.player_pay = c1 sink.sources.r1.selector.default = c2 sink.channels.c1.type = file sink.channels.c1.keep-alive = 600 sink.channels.c1.checkpointInterval = 30000 sink.channels.c1.transactionCapacity = 10000 sink.channels.c1.capacity = 100000 sink.channels.c1.checkpointDir = /tmp/flume-1.5.0.1/sink/file-channel/c1/checkpoint sink.channels.c1.dataDirs = /tmp/flume-1.5.0.1/sink/file-channel/c1/data sink.channels.c2.type = memory sink.channels.c2.capacity = 100000 sink.channels.c2.keep-alive = 600 sink.channels.c2.transactionCapacity = 10000 sink.channels.c2.byteCapacityBufferPercentage = 20 sink.channels.c2.byteCapacity = 104857600 sink.sinks.k1.channel = c1 sink.sinks.k1.type = asynchbase sink.sinks.k1.table = _flume sink.sinks.k1.columnFamily = cf sink.sinks.k1.batchSize = 1000 sink.sinks.k1.serializer = com.joyport.flume.sink.hbase.AresSerializer sink.sinks.k1.serializer.tables = game,player_pay,gm_pay_2|uid,passport,passtype,game_id,server_id,order_id,money,point,point_free,type,time,server_no|time,game_id,server_id,order_id ledu,ld_pay,ld_pay_2|order_id,uid,username,pay_money,pay_time,pay_ip,game_id,server_id,pay_channel|pay_time,order_id xdw,xdw_pay,xdw_pay_2|order_id,uid,username,pay_money,pay_time,pay_ip,game_id,server_id,pay_channel|pay_time,order_id sink.sinks.k2.channel = c2 sink.sinks.k2.type = asynchbase sink.sinks.k2.table = _flume sink.sinks.k2.columnFamily = cf sink.sinks.k2.batchSize = 10000 sink.sinks.k2.serializer = com.joyport.flume.sink.hbase.AresSerializer sink.sinks.k2.serializer.tables = game,player_active,gm_active_2|passport,passtype,game_id,server_id,active_time,ucode,server_no|active_time,game_id,server_id,passtype,passport game,player_active,gm_user_info|passport,passtype,game_id,server_id,active_time,ucode,server_no|game_id,server_id,passtype,passport game,player_login,gm_login_2|uid,passport,passtype,game_id,server_id,login_time,ip,server_no|login_time,game_id,server_id,passtype,passport game,player_cost,gm_cost_2|uid,passport,passtype,game_id,server_id,role_name,ext_data,cost_point,time,type,role_id,server_no,cost_id|time,game_id,server_id,cost_id game,player_role,gm_user_info|uid,passport,passtype,game_id,server_id,role_ctime,role_name,role_sex,role_prof,server_no|game_id,server_id,passtype,passport game,player_gameinfo,gm_user_info|uid,passport,passtype,game_id,server_id,role_name,role_info1,role_info2,role_info3,role_info4,time,server_no|game_id,server_id,passtype,passport game,server_player,gm_online_2|game_id,server_id,time,player_count,server_no|time,game_id,server_id game,player_spay,gm_player_spay|order_id,uid,passport,passtype,game_id,server_id,level,vip,money,point,point_free,type,is_1stpay,time|time,game_id,passtype,server_id,order_id ledu,ld_user,ld_user|uid,username,reg_time,reg_ip,site_id,page_id,sub_page_id,type|uid ledu,ld_mail,ld_user|uid,username,bind_email,bind_email_time|uid ledu,ld_mobile,ld_user|uid,username,bind_mobile,bind_mobile_time|uid ledu,ld_idcard,ld_user|uid,username,realname,idcard,verify_time|uid ledu,ld_login,ld_login_2|uid,username,login_time,login_ip,game_id,server_id|login_time,uid,game_id,server_id ledu,ld_login,ld_login|uid,username,login_time,login_ip,game_id,server_id|uid,game_id,server_id,login_time ledu,ld_active,ld_active_2|uid,username,active_time,game_id,server_id,status,reg_time,site_id,page_id,sub_page_id|uid,game_id,server_id ledu,ld_active,ld_active|uid,username,active_time,game_id,server_id,status,reg_time,site_id,page_id,sub_page_id|active_time,uid,game_id,server_id xdw,xdw_user,xdw_user|uid,username,reg_time,reg_ip,site_id,page_id,sub_page_id,type|uid xdw,xdw_mail,xdw_user|uid,username,bind_email,bind_email_time|uid xdw,xdw_mobile,xdw_user|uid,username,bind_mobile,bind_mobile_time|uid xdw,xdw_idcard,xdw_user|uid,username,realname,idcard,verify_time|uid xdw,xdw_login,xdw_login|uid,username,login_time,login_ip,game_id,server_id|uid,game_id,server_id,login_time xdw,xdw_login,xdw_login_2|uid,username,login_time,login_ip,game_id,server_id|login_time,uid,game_id,server_id xdw,xdw_active,xdw_active|uid,username,active_time,game_id,server_id,status,reg_time,site_id,page_id,sub_page_id|active_time,uid,game_id,server_id xdw,xdw_active,xdw_active_2|uid,username,active_time,game_id,server_id,status,reg_time,site_id,page_id,sub_page_id|uid,game_id,server_id sy,sy_download,sy_download|uuid,platform,channel_id,game_id,time,type,status,ip|time,platform,game_id,uuid ledu_crossbar,tracepng_logs,ld_crossbar_click|time,ip,type,uid,href,cate,host|time,ip
启动Source:
/usr/lib/jvm/jre/bin/java -Xmx2g -Dflume.root.logger=WARN,source -cp /usr/local/apache-flume-1.5.0.1-bin/conf:/usr/local/apache-flume-1.5.0.1-bin/lib/* -Djava.library.path= org.apache.flume.node.Application -n source -f /usr/local/apache-flume-1.5.0.1-bin/conf/flume.conf
启动Sink:
/usr/lib/jvm/jre/bin/java -Xmx2g -Dflume.root.logger=WARN,sink -cp /usr/local/apache-flume-1.5.0.1-bin/conf:/usr/local/apache-flume-1.5.0.1-bin/lib/* -Djava.library.path= org.apache.flume.node.Application -n sink -f /usr/local/apache-flume-1.5.0.1-bin/conf/flume.conf
log4j.properties自行配置。
flumectl脚本
#!/bin/bash . /etc/rc.d/init.d/functions debug=INFO #脚本文件必须位于flume的bin目录,执行脚本目录无所谓,flume所在目录无所谓,脚本不能做链接执行,可以用alias实现。 if [ "/" != ${0:0:1} ];then dirCurrent=$(pwd)/$0 else dirCurrent=$0 fi dirCurrent=${dirCurrent//.///} dirCurrent=${dirCurrent%/*} dirBase=${dirCurrent%/*} pid= RETVAL=0 running(){ cmd='ps -o pid,command -C java|grep "[o]rg.apache.flume.node.Application"|grep "-[n] "'$1 cmd=$cmd"|awk '{print $1}'" pid=(`eval $cmd`) if [ ${#pid[*]} -gt 0 ];then echo -n $pid return 0 else return 1 fi } stop(){ running $1 > /dev/null echo -n "stoping $1... " for v in ${pid[@]} do kill -9 $v done if [ ${#pid[@]} -gt 0 ];then echo_success else echo_failure fi echo } start(){ running $1 > /dev/null r=$? if [ $r -eq 0 ];then echo -n "$1 is running, pid=$pid" echo_failure echo else echo -n "starting $1... " cd $dirBase;bin/flume-ng agent -n $1 -c conf -f $dirBase/conf/flume.conf -Dflume.root.logger=$debug,$1 > $dirBase/logs/$1.out 2>&1 & echo_success echo fi } restart(){ stop $1 start $1 } case $1 in "start"|"stop"|"restart"|"running") case $2 in "source"|"sink") $1 $2 RETVAL=$? ;; *) echo "invalid flume type, type="$2 ;; esac ;; *) echo -e "Usage:nstart,stop,restart,running [source|sink]" ;; esac exit $RETVAL
发布版本的脚本releash.sh:
#!/bin/bash dirBase='/usr/local/apache-flume-1.5.0.1-bin' name=${dirBase##*/} #xx.xx.xx.xx ipSink=xx.xx.xx.xx RETVAL=0 #可带一个local参数,这样就不会scp到node3上 release(){ rm -f $dirBase/logs/* sed "s/source.sinks.k1.hostname = 192.168.2.201/source.sinks.k1.hostname = $ipSink/" $dirBase/conf/flume.conf > $dirBase/conf/flume.conf.online cat $dirBase/tools/installer.sh > $dirBase/$name cd $dirBase/conf && jar -c com > $dirBase/lib/com.joyport.flume.jar tar -cz -C $dirBase/.. $name --exclude $name/$name --exclude $name/conf/com --exclude $name/tools/installer.sh --exclude $name/tools/release.sh --exclude $name/tools/backup.sh --exclude $name/tools/batch.sh --exclude $name/tools/test.sh >> $dirBase/$name rm -f $dirBase/conf/flume.conf.online chmod u+x $dirBase/$name if [ "$1" = 'tested' ]; then local webFile='/data/apache/www/1miao/apache-flume-1.5.0.1-bin' scp /usr/local/apache-flume-1.5.0.1-bin/apache-flume-1.5.0.1-bin node3:/data/apache/www/1miao/ ssh node3 'chmod +x '$webFile';cp -f '$webFile' ~' fi } case $1 in *) release $1 ;; esac
installer.sh
#!/bin/bash dirInstall='/usr/local/apache-flume-1.5.0.1-bin' fileName='apache-flume-1.5.0.1-bin' file=$0 lastLine=`tail -n 1 $0|cut -c-13` fileBinStart=`awk '/^__BIN_START__/{print NR+1;exit 0;}' $0` if test "$lastLine" = '__BIN_START__' then isInstaller=false else isInstaller=true fi tarExtract(){ if [ ! -d $dirInstall ]; then mkdir $dirInstall fi tail -n +$fileBinStart $file | tar -xz -C $dirInstall/.. } case $1 in #$1:local|ip地址 $2:local $3:noInit *) if [ "false" = $isInstaller ]; then echo 'not a installer' exit 1 fi if [ "$1" = 'local' ]; then tarExtract if [ "$2" != 'local' ]; then cd $dirInstall && mv -f conf/hbase-site.xml.online conf/hbase-site.xml && mv -f conf/flume.conf.online conf/flume.conf fi if [ "$3" != 'noInit' ]; then cd $dirInstall && pid=`bin/flumectl running source`;if [ $? -eq 0 ];then echo -n 'source is running, pid='$pid', ';bin/flumectl stop source;fi;bin/flumectl start source fi rm -f $dirInstall/$fileName else if [ $1 ]; then ssh $1 ':' if [ $? -ne 0 ];then exit 1 fi ssh $1 "if [ ! -d $dirInstall ]; then mkdir $dirInstall; fi" scp $file $1:$dirInstall && ssh $1 "cd $dirInstall && $file local $2 $3" else echo 'wrong param' exit 1 fi fi ;; esac exit $RETVAL __BIN_START__
backup.sh
#!/bin/bash tar -C /usr/local/ -czf /usr/local/apache-flume-1.5.0.1-bin.tgz apache-flume-1.5.0.1-bin/ --exclude apache-flume-1.5.0.1-bin/apache-flume-1.5.0.1-bin
通用型插件会带来一个问题,所有服务器都用这个配置只要目录下有日志就会收集,可能有些服务器收集目录下不是要收集的日志,比如服务器1 /data/logs是要收集的日志,服务器2也有 /data/logs目录但是里面是其他类型的日志,这样就可能出问题了。
NB