本文实例为大家分享了读取用户登入出日志并上传服务端的具体实现代码,供大家参考,具体内容如下
该客户端运行在给用户提供unix服务的服务器上。用来读取并收集该服务器上用户的上下线信息,并进行配对整理后发送给服务端汇总。
具体实现代码:
1. DMSServer.java
package com.dms; import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.io.SAXReader; /** * DMS服务端,用来接收每个客户端发送过来的 * 配对日志并保存在本地文件中 * @author Administrator * */ public class DMSServer { //属性定义 //用来接收客户端连接的服务端的ServerSocket private ServerSocket server; //用来管理处理客户端请求的线程的线程池 private ExecutorService threadPool; //保存所有客户端发送过来配对日志的文件 private File serverLogFile; //消息队列 private BlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>(); public DMSServer() throws Exception{ try { System.out.println("服务端正在初始化..."); //1 解析配置文件server-config.xml Map<String,String> config = loadConfig(); //2 根据配置文件内容初始化属性 init(config); System.out.println("服务端初始化完毕..."); } catch (Exception e) { System.out.println("初始化服务端失败!"); throw e; } } /** * 构造方法初始化第一步,解析配置文件 * @return 返回的Map中保存的是配置文件中的 * 每一条内容,其中key:标签的名字, * value为标签中间的文本 * @throws Exception */ private Map<String,String> loadConfig() throws Exception{ try { SAXReader reader = new SAXReader(); Document doc = reader.read(new File("server-config.xml")); Element root = doc.getRootElement(); Map<String,String> config = new HashMap<String,String>(); /* * 获取<config>标签中的所有子标签 * 并将每一个子标签的名字作为key,中间的 * 文本作为value存入Map集合 */ List<Element> list = root.elements(); for(Element e : list){ String key = e.getName(); String value = e.getTextTrim(); config.put(key, value); } return config; } catch (Exception e) { System.out.println("解析配置文件异常!"); e.printStackTrace(); throw e; } } /** * 构造方法初始化第二步,根据配置项初始化属性 * @param config * @throws Exception */ private void init(Map<String,String> config) throws Exception{ /* * 用配置文件中的<logrecfile>初始化属性:serverLogFile * 用配置文件中的<threadsum>初始化属性:threadPool,这里创建固定大小线程池。该值作为线程池线程数量 * 用配置文件中的<serverport>初始化属性:server,这里这个值为ServerSocket的服务端口 */ this.server = new ServerSocket( Integer.parseInt(config.get("serverport")) ); this.serverLogFile = new File( config.get("logrecfile") ); this.threadPool = Executors.newFixedThreadPool( Integer.parseInt(config.get("threadsum")) ); } /** * 服务端开始工作的方法 * @throws Exception */ public void start() throws Exception{ /* * 实现要求: * 首先单独启动一个线程,用来运行SaveLogHandler * 这个任务,目的是保存所有配对日志 * 然后开始循环监听服务端端口,一旦一个客户端连接了, * 就实例化一个ClientHander,然后将该任务交给线程池 * 使其分配线程来处理与该客户端的交互。 * */ try { System.out.println("服务端开始工作..."); SaveLogHandler slh=new SaveLogHandler(); new Thread(slh).start(); while(true){ Socket socket=server.accept(); threadPool.execute(new ClientHandler(socket)); } } catch (Exception e) { e.printStackTrace(); throw e; } } public static void main(String[] args) { try { DMSServer server = new DMSServer(); server.start(); } catch (Exception e) { System.out.println("启动服务端失败!"); } } /** * 该线程负责从消息队列中取出每一条配对日志, * 并存入到serverLogFile文件 * @author Administrator * */ private class SaveLogHandler implements Runnable{ public void run(){ PrintWriter pw = null; try { pw = new PrintWriter( new FileOutputStream( serverLogFile,true ) ); while(true){ if(messageQueue.size()>0){ pw.println(messageQueue.poll()); }else{ pw.flush(); Thread.sleep(500); } } } catch (Exception e) { e.printStackTrace(); } finally{ if(pw != null){ pw.close(); } } } } /** * 处理一个指定客户端请求 * @author Administrator * */ private class ClientHandler implements Runnable{ private Socket socket; public ClientHandler(Socket socket){ this.socket = socket; } public void run(){ /* * 思路: * 首先接收客户端发送过来的所有配对日志, * 直到读取到"OVER"为止,然后将这些配对 * 日志保存到本地的文件中,并回复客户端 * "OK" * 执行步骤: * 1:通过Socket创建输出流,用来给客户端 * 发送响应 * 2:通过Socket创建输入流,读取客户端发送 * 过来的日志 * 3:循环读取客户端发送过来的每一行字符串,并 * 先判断是否为字符串"OVER",若不是,则是 * 一条配对日志,那么保存到本地文件,若是, * 则停止读取。 * 4:成功读取所有日志后回复客户端"OK" */ PrintWriter pw = null; try { //1 pw = new PrintWriter( new OutputStreamWriter( socket.getOutputStream(),"UTF-8" ) ); //2 BufferedReader br = new BufferedReader( new InputStreamReader( socket.getInputStream(),"UTF-8" ) ); //3 String message = null; while((message = br.readLine())!=null){ if("OVER".equals(message)){ break; } //将该日志写入文件保存 messageQueue.offer(message); } //4 pw.println("OK"); pw.flush(); } catch (Exception e) { e.printStackTrace(); pw.println("ERROR"); pw.flush(); } finally{ try { //与客户端断开连接释放资源 socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
2. DMSClient.java
package com.dms; import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.RandomAccessFile; import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.io.SAXReader; import com.dms.bo.LogData; import com.dms.bo.LogRec; /** * 该客户端运行在给用户提供unix服务的服务器上。 * 用来读取并收集该服务器上用户的上下线信息,并 * 进行配对整理后发送给服务端汇总。 * @author Administrator * */ public class DMSClient { //属性定义 //第一步:解析日志所需属性 //unix系统日志文件 private File logFile; //保存解析后日志的文件 private File textLogFile; //书签文件 private File lastPositionFile; //每次解析日志的条目数 private int batch; //第二步:配对日志所需要属性 //保存配对日志的文件 private File logRecFile; //保存未配对日志的文件 private File loginLogFile; //第三步:发送日志所需要属性 //服务端地址 private String serverHost; //服务端端口 private int serverPort; /** * 构造方法,用来初始化客户端 * @throws Exception */ public DMSClient() throws Exception{ try { //1 解析配置文件config.xml Map<String,String> config = loadConfig(); //打桩 System.out.println(config); //2 根据配置文件内容初始化属性 init(config); } catch (Exception e) { System.out.println("初始化失败!"); throw e; } } /** * 构造方法初始化第二步,根据配置项初始化属性 * @param config * @throws Exception */ private void init(Map<String,String> config) throws Exception{ try { logFile = new File( config.get("logfile") ); textLogFile = new File( config.get("textlogfile") ); lastPositionFile = new File( config.get("lastpositionfile") ); batch = Integer.parseInt( config.get("batch") ); logRecFile = new File( config.get("logrecfile") ); loginLogFile = new File( config.get("loginlogfile") ); serverHost = config.get("serverhost"); serverPort = Integer.parseInt( config.get("serverport") ); } catch (Exception e) { System.out.println("初始化属性失败!"); e.printStackTrace(); throw e; } } /** * 构造方法初始化第一步,解析配置文件 * @return 返回的Map中保存的是配置文件中的 * 每一条内容,其中key:标签的名字, * value为标签中间的文本 * @throws Exception */ private Map<String,String> loadConfig() throws Exception{ try { SAXReader reader = new SAXReader(); Document doc = reader.read(new File("config.xml")); Element root = doc.getRootElement(); Map<String,String> config = new HashMap<String,String>(); /* * 获取<config>标签中的所有子标签 * 并将每一个子标签的名字作为key,中间的 * 文本作为value存入Map集合 */ List<Element> list = root.elements(); for(Element e : list){ String key = e.getName(); String value = e.getTextTrim(); config.put(key, value); } return config; } catch (Exception e) { System.out.println("解析配置文件异常!"); e.printStackTrace(); throw e; } } /** * 客户端开始工作的方法 * 循环执行三步: * 1:解析日志 * 2:配对日志 * 3:发送日志 */ public void start(){ parseLogs(); matchLogs(); sendLogs(); // while(true){ // //解析日志 // if(!parseLogs()){ // continue; // } // //配对日志 // if(!matchLogs()){ // continue; // } // //发送日志 // sendLogs(); // } } /** * 第三步:发送日志 * @return true:发送成功 * false:发送失败 */ private boolean sendLogs(){ /* * 实现思路: * 将logRecFile文件中的所有配对日志读取 * 出来然后连接上服务端并发送过去,若服务端 * 全部接收,就可以将该文件删除,表示发送 * 完毕了。 * 实现步骤: * 1:logRecFile文件必须存在 * 2:将所有配对日志读取出来并存入一个集合 * 等待发送 * 3:通过Socket连接服务端 * 4:创建输出流 * 5:顺序将所有配对日志按行发送给服务端 * 6:单独发送一个字符串"OVER"表示所有日志 * 均已发送完毕 * 7:创建输入流 * 8:读取服务端发送回来的响应字符串 * 9:若响应的字符串为"OK",表示服务端正常 * 接收了所有日志,这时就可以将logRecFile * 文件删除并返回true表示发送完毕。 * */ Socket socket = null; try { //1 if(!logRecFile.exists()){ System.out.println(logRecFile+"不存在!"); return false; } //2 List<String> matches = IOUtil.loadLogRec(logRecFile); //3 socket = new Socket(serverHost,serverPort); //4 PrintWriter pw = new PrintWriter( new OutputStreamWriter( socket.getOutputStream(),"UTF-8" ) ); //5 for(String log : matches){ pw.println(log); } //6 pw.println("OVER"); pw.flush(); //7 BufferedReader br = new BufferedReader( new InputStreamReader( socket.getInputStream(),"UTF-8" ) ); //8 String response = br.readLine(); //9 if("OK".equals(response)){ logRecFile.delete(); return true; }else{ System.out.println("发送日志失败!"); return false; } } catch (Exception e) { System.out.println("发送日志失败!"); e.printStackTrace(); } finally{ if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } return false; } /** * 第二步:配对日志 * @return true:配对成功 * false:配对失败 */ private boolean matchLogs(){ /* * 实现思路: * 将第一步解析的新日志,与上次为配对成功 * 的登入日志全部读取出来,然后再按照user, * pid相同,type一个是7,一个是8进行配对。 * 只要能找到类型为8的,一定可以找到一个 * 能与之配对的登入日志。 * * 实现步骤: * 1:必要的判断 * 1.1:logRecFile是否存在,存在则不再 * 进行新的配对工作,避免覆盖。 * 1.2:textLogFile文件必须存在。 * 2:读取textLogFile将日志读取出来,并 * 存入到集合中。(若干LogData实例) * 3:若loginLogFile文件若存在,则说明 * 有上次未配对成功的日志,也将其读取 * 出来存入集合等待一起配对 * 4:配对工作 * 4.1:创建一个集合,用于保存所有配对日志 * 4.2:创建两个Map分别保存登入日志与登出日志 * 4.3:遍历所有待配对的日志,按照登入与登出 * 分别存入两个Map中, * 其中key:user,pid * value:LogData实例 * 4.4:遍历登出Map,并根据每条登出日志的key * 去登入Map中找到对应的登入日志,并 * 以一个LogRec实例保存该配对日志,然后 * 存入配对日志的集合中。并将该配对日志 * 中的登入日志从登入Map中删除。这样一来 * 登入Map中应当只剩下没有配对的了。 * 5:将配对日志写入到logRecFile中 * 6:将所有未配对日志写入到loginLogFile中 * 7:将textLogFile文件删除 * 8:返回true,表示配对完毕 * */ try { //1 //1.1 if(logRecFile.exists()){ return true; } //1.2 if(!textLogFile.exists()){ System.out.println(textLogFile+"不存在!"); return false; } //2 List<LogData> list = IOUtil.loadLogData(textLogFile); //3 if(loginLogFile.exists()){ list.addAll( IOUtil.loadLogData(loginLogFile) ); } //4 //4.1 List<LogRec> matches = new ArrayList<LogRec>(); //4.2 Map<String,LogData> loginMap = new HashMap<String,LogData>(); Map<String,LogData> logoutMap = new HashMap<String,LogData>(); //4.3 for(LogData logData : list){ String key = logData.getUser()+","+ logData.getPid(); if(logData.getType()==LogData.TYPE_LOGIN){ loginMap.put(key, logData); }else if(logData.getType()==LogData.TYPE_LOGOUT){ logoutMap.put(key, logData); } } //4.4 Set<Entry<String,LogData>> entrySet = logoutMap.entrySet(); for(Entry<String,LogData> e : entrySet){ LogData logout = e.getValue(); LogData login = loginMap.remove(e.getKey()); LogRec logRec = new LogRec(login,logout); matches.add(logRec); } //5 IOUtil.saveCollection(matches, logRecFile); //6 IOUtil.saveCollection( loginMap.values(),loginLogFile ); //7 textLogFile.delete(); //8 return true; } catch (Exception e) { System.out.println("配对日志失败!"); e.printStackTrace(); } return false; } /** * 第一步:解析日志 * @return true:解析成功 * false:解析失败 */ private boolean parseLogs(){ /* * 实现思路: * 循环读取batch条日志,然后将每条日志中的 * 5个信息解析出来,最终组成一个字符串,以 * 行为单位,写入到textLogFile文件中 * * 实现步骤: * 1:必要的判断工作 * 1.1:为了避免解析的日志还没有被使用,而 * 第一步又重复执行导致之前日志被覆盖 * 的问题,这里需要判断,若保存解析后 * 的日志文件存在,则第一步不再执行。 * 该日志文件会在第二步配对完毕后删除。 * 1.2:logFile文件必须存在(wtmpx文件) * 1.3:是否还有日志可以解析 * 2:创建RandomAccessFile来读取logFile * 3:将指针移动到上次最后读取的位置,准备 * 开始新的解析工作 * 4:解析工作 * 4.1:创建一个List集合,用于保存解析后 * 的每一条日志(LogData实例) * 4.2:循环batch次,解析每条日志中的 * 5项内容(user,pid,type,time,host) * 并用一个LogData实例保存,然后将 * 该LogData实例存入集合 * 5:将集合中的所有的日志以行为单位保存到 * textLogFile中 * 6:保存书签信息 * 7:返回true,表示工作完毕 * */ RandomAccessFile raf = null; try { //1 //1.1 if(textLogFile.exists()){ return true; } //1.2 if(!logFile.exists()){ System.out.println(logFile+"不存在!"); return false; } //1.3 long lastPosition = hasLogs(); //打桩 // System.out.println( // "lastPosition:"+lastPosition // ); if(lastPosition<0){ System.out.println("没有日志可以解析了!"); return false; } //2 raf = new RandomAccessFile(logFile,"r"); //3 raf.seek(lastPosition); //4 List<LogData> list = new ArrayList<LogData>(); for(int i=0;i<batch;i++){ //每次解析前都判断是否还有日志可以解析 if(logFile.length()-lastPosition <LogData.LOG_LENGTH ){ break; } //解析user raf.seek(lastPosition+LogData.USER_OFFSET); String user = IOUtil.readString( raf, LogData.USER_LENGTH ).trim(); //解析PID raf.seek(lastPosition+LogData.PID_OFFSET); int pid = raf.readInt(); //解析TYPE raf.seek(lastPosition+LogData.TYPE_OFFSET); short type = raf.readShort(); //解析TIME raf.seek(lastPosition+LogData.TIME_OFFSET); int time = raf.readInt(); //解析HOST raf.seek(lastPosition+LogData.HOST_OFFSET); String host = IOUtil.readString( raf, LogData.HOST_LENGTH ).trim(); LogData log = new LogData(user, pid, type, time, host); list.add(log); //打桩 // System.out.println(log); //当解析完一条日志后,更新lastPosition lastPosition = raf.getFilePointer(); } //5 IOUtil.saveCollection(list, textLogFile); //6 保存书签文件 IOUtil.saveLong( lastPosition, lastPositionFile); //7 return true; } catch (Exception e) { System.out.println("解析日志失败!"); e.printStackTrace(); } finally{ if(raf != null){ try { raf.close(); } catch (IOException e) { e.printStackTrace(); } } } return false; } /** * 第一步解析日志中的一个环节, * 根据书签文件记录的位置判断是否还有 * 日志可以解析,若有,则将上次最后的位置 * 返回,若没有则返回-1。 * @return */ private long hasLogs(){ try { /* * 若lastPositionFile不存在,则说明 * 从来没有解析过,那么从头开始解析即可 */ if(!lastPositionFile.exists()){ return 0; } long lastPosition = IOUtil.readLong(lastPositionFile); if(logFile.length()-lastPosition >=LogData.LOG_LENGTH){ return lastPosition; } } catch (Exception e) { e.printStackTrace(); } return -1; } public static void main(String[] args) { try { DMSClient client = new DMSClient(); client.start(); } catch (Exception e) { System.out.println("客户端运行失败!"); } } }
3. IOUtil.java
package com.dms; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.PrintWriter; import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Collection; import java.util.List; import com.dms.bo.LogData; /** * 该类是一个工具类,负责客户端的IO操作 * @author Administrator * */ public class IOUtil { /** * 从给定的文件中读取每一行字符串(配对日志) * 并存入一个集合后返回 * @param file * @return * @throws Exception */ public static List<String> loadLogRec(File file) throws Exception{ BufferedReader br = null; try { br = new BufferedReader( new InputStreamReader( new FileInputStream( file ) ) ); List<String> list = new ArrayList<String>(); String line = null; while((line = br.readLine())!=null){ list.add(line); } return list; } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(br != null){ br.close(); } } } /** * 从给定的文件中读取每一条配对日志,并存入 * 一个集合中然后返回。 * @param file * @return * @throws Exception */ public static List<LogData> loadLogData(File file) throws Exception{ BufferedReader br = null; try { br = new BufferedReader( new InputStreamReader( new FileInputStream( file ) ) ); List<LogData> list = new ArrayList<LogData>(); String line = null; while((line = br.readLine())!=null){ LogData logData = new LogData(line); list.add(logData); } return list; } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(br!=null){ br.close(); } } } /** * 将指定的long值以字符串的形式写入到 * 给定文件的第一行 * @param l * @param file * @throws Exception */ public static void saveLong( long lon,File file) throws Exception{ PrintWriter pw = null; try { pw = new PrintWriter(file); pw.println(lon); } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(pw != null){ pw.close(); } } } /** * 将集合中每个元素的toString方法返回的字符串 * 以行为单位写入到指定文件中。 * @param c * @param file * @throws Exception */ public static void saveCollection( Collection c,File file) throws Exception{ PrintWriter pw = null; try { pw = new PrintWriter(file); for(Object o : c){ pw.println(o); } } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(pw != null){ pw.close(); } } } /** * 从给定的RandomAccessFile当前位置开始连续 * 读取length个字节,并转换为字符串后返回 * @param raf * @param length * @return * @throws Exception */ public static String readString( RandomAccessFile raf,int length) throws Exception{ try { byte[] data = new byte[length]; raf.read(data); return new String(data,"ISO8859-1"); } catch (Exception e) { e.printStackTrace(); throw e; } } /** * 从给定文件中读取第一行字符串,然后将其 * 转换为一个long值后返回 * @param file * @return * @throws Exception */ public static long readLong(File file) throws Exception{ BufferedReader br = null; try { br = new BufferedReader( new InputStreamReader( new FileInputStream( file ) ) ); String line = br.readLine(); return Long.parseLong(line); } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(br != null){ br.close(); } } } }
4. config.xml
<?xml version="1.0" encoding="UTF-8"?> <config> <!-- unix系统日志文件名 --> <logfile>wtmpx</logfile> <!-- 保存解析后日志的文件名 --> <textlogfile>log.txt</textlogfile> <!-- 书签文件名 --> <lastpositionfile>last-position.txt</lastpositionfile> <!-- 每次解析日志的条目数 --> <batch>10</batch> <!-- 配对日志文件名 --> <logrecfile>logrec.txt</logrecfile> <!-- 未配对日志文件名 --> <loginlogfile>login.txt</loginlogfile> <!-- 服务端地址 --> <serverhost>localhost</serverhost> <!-- 服务端端口 --> <serverport>8088</serverport> </config>
5. server-config.xml
<?xml version="1.0" encoding="UTF-8"?> <config> <!-- 服务端保存配对日志文件的文件名 --> <logrecfile>server-logs.txt</logrecfile> <!-- 线程池线程数量 --> <threadsum>30</threadsum> <!-- 服务端端口 --> <serverport>8088</serverport> </config>
以上就是本文的全部内容,希望对大家的学习有所帮助。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:notice#nhooo.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。