Mercurial > java-rrd-hg
changeset 2:65c7909bfb95
* more initial development
author | Peter Stamfest <peter@stamfest.at> |
---|---|
date | Thu, 26 Aug 2010 09:47:56 +0200 |
parents | d107d0fd0718 |
children | 3b566ca4615b |
files | src/net/stamfest/rrd/CommandResult.java src/net/stamfest/rrd/RRD.java src/net/stamfest/rrd/RRDCachedClient.java src/net/stamfest/rrd/RRDCommand.java src/net/stamfest/rrd/RRDCommandFactory.java src/net/stamfest/rrd/RRDCommandPool.java src/net/stamfest/rrd/RRDToolService.java src/net/stamfest/rrd/RRDUpdate.java src/net/stamfest/rrd/RRDp.java src/net/stamfest/rrd/Test1.java src/net/stamfest/rrd/Test2.java src/net/stamfest/rrd/Test3.java src/net/stamfest/rrd/system/RRDSystem.java src/net/stamfest/rrd/system/RRDSystemFactory.java |
diffstat | 14 files changed, 458 insertions(+), 52 deletions(-) [+] |
line wrap: on
line diff
--- a/src/net/stamfest/rrd/CommandResult.java Tue Aug 24 17:01:08 2010 +0200 +++ b/src/net/stamfest/rrd/CommandResult.java Thu Aug 26 09:47:56 2010 +0200 @@ -13,4 +13,16 @@ // support for output from special commands public HashMap<String, String> info = null; // info, graphv public byte image[] = null; // graphv + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("ok:").append(ok).append('\n'); + sb.append("u:").append(user); + sb.append(" s:").append(system); + sb.append(" t:").append(total).append('\n'); + sb.append("message:").append(error).append('\n'); + sb.append("output:").append(output).append('\n'); + + return sb.toString(); + } }
--- a/src/net/stamfest/rrd/RRD.java Tue Aug 24 17:01:08 2010 +0200 +++ b/src/net/stamfest/rrd/RRD.java Thu Aug 26 09:47:56 2010 +0200 @@ -1,7 +1,6 @@ package net.stamfest.rrd; -public interface RRD { +public interface RRD extends RRDUpdate { public CommandResult info(String filename) throws Exception; public CommandResult graphv(String[] cmdin) throws Exception; - public CommandResult update(String filename, String args[]) throws Exception; } \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/net/stamfest/rrd/RRDCachedClient.java Thu Aug 26 09:47:56 2010 +0200 @@ -0,0 +1,129 @@ +package net.stamfest.rrd; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Because Java does not support UNIX Sockets, this can only be + * used with rrdcached listening on a TCP socket. + * + * @author peter + * + */ +public class RRDCachedClient implements RRDUpdate { + private Socket socket; + private OutputStream writer; + private InputStream reader; + + public RRDCachedClient(String host, int port) throws UnknownHostException, IOException { + init(host, port); + } + + private void init(String host, int port) throws UnknownHostException, IOException { + socket = new Socket(host, port); + writer = socket.getOutputStream(); + reader = socket.getInputStream(); + } + + private String readLine() throws IOException { + int c; + byte b[] = new byte[128]; + int pos = 0; + + while (true) { + c = reader.read(); + if (c == -1 || c == '\n') break; + if (pos == b.length) { + b = Arrays.copyOf(b, b.length * 2); + } + b[pos++] = (byte) c; + } + + return new String(b, 0, pos); + } + + private static Pattern response = Pattern.compile("^(-?\\d+) (.*)$"); + private synchronized CommandResult sendCommand(String cmd1[], + String cmd2[]) throws Exception { + CommandResult r = new CommandResult(); + + StringBuffer sb = new StringBuffer(); + + if (cmd1 != null) { + for (int i = 0 ; i < cmd1.length ; i++) { + if (sb.length() > 0) sb.append(' '); + sb.append(cmd1[i]); + } + } + if (cmd2 != null) { + System.out.println("CMD2 " + cmd2.length); + for (int i = 0 ; i < cmd2.length ; i++) { + if (sb.length() > 0) sb.append(' '); + sb.append(cmd2[i]); + } + } + if (sb.length() == 0) return null; + + sb.append('\n'); + + writer.write(sb.toString().getBytes()); + writer.flush(); + + System.out.println(sb); + + String line = readLine(); + + System.out.println("R:" + line); + Matcher m = response.matcher(line); + if (m.find()) { + int rc = Integer.parseInt(m.group(1)); + r.error = line; /* not using group(2), because sometimes + rrdcached says "0 errors, ...." which + would put "error, ..." into this field + regardless of the positive outcome of + the command. This could be misleading */ + if (rc < 0) { + r.ok = false; + } else { + sb.setLength(0); + r.ok = true; + while (rc-- > 0) { + sb.append(readLine()).append('\n'); + } + r.output = sb.toString(); + } + } else { + throw new Exception("Protocol error"); + } + return r; + } + + @Override + public CommandResult update(String filename, String[] args) + throws Exception + { + return sendCommand(new String[] { "UPDATE", filename }, args); + } + + public CommandResult flush(String filename) throws Exception { + return sendCommand(new String[] { "FLUSH", filename }, null); + } + + public CommandResult flushall(String filename) throws Exception { + return sendCommand(new String[] { "FLUSHALL" }, null); + } + + public void close() throws IOException { + if (socket != null) socket.close(); + socket = null; + writer = null; + reader = null; + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/net/stamfest/rrd/RRDCommand.java Thu Aug 26 09:47:56 2010 +0200 @@ -0,0 +1,6 @@ +package net.stamfest.rrd; + +public interface RRDCommand { + public CommandResult command(String cmd[]) throws Exception; + public void finish(); +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/net/stamfest/rrd/RRDCommandFactory.java Thu Aug 26 09:47:56 2010 +0200 @@ -0,0 +1,18 @@ +package net.stamfest.rrd; + +/** + * + * @author peter + * + * Used by RRDpPool to have control about which kind of RRDp gets + * used by the pool. + * + */ +public interface RRDCommandFactory { + /** + * + * @return a new instance of RRDp + * @throws Exception + */ + public RRDCommand createRRDp() throws Exception; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/net/stamfest/rrd/RRDCommandPool.java Thu Aug 26 09:47:56 2010 +0200 @@ -0,0 +1,142 @@ +package net.stamfest.rrd; + +public class RRDCommandPool implements RRDCommand { + class RRDPoolMember { + int requestcount = 0; + boolean inuse = false; + RRDCommand rrdp; + } + + private int poolsize = 4; + private int maxRequestsPerMember = 100; + private boolean finished = false; + + private RRDCommandFactory factory = null; + + RRDPoolMember pool[] = null; + + public RRDCommandPool() { + init(4); + } + + public RRDCommandPool(int poolsize) { + init(poolsize); + } + + private void init(int poolsize) { + this.poolsize = poolsize; + + pool = new RRDPoolMember[poolsize]; + factory = new RRDCommandFactory() { + @Override + public RRDp createRRDp() throws Exception { + return new RRDp(); + } + }; + } + + @Override + public CommandResult command(String[] cmd) throws Exception { + RRDPoolMember member = null; + + if (finished) throw new Exception("Already finished"); + + synchronized(this) { + while (true) { + for (int i = 0 ; i < poolsize ; i++) { + if (pool[i] == null) { + pool[i] = new RRDPoolMember(); + member = pool[i]; + break; + } else if (! pool[i].inuse) { + member = pool[i]; + break; + } + } + if (member != null) { + member.inuse = true; + break; + } else { + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + // next loop iteration + } + } + } + + CommandResult r = null; + if (member != null) { + member.requestcount++; + if (member.rrdp == null) member.rrdp = factory.createRRDp(); + r = member.rrdp.command(cmd); + + if (member.requestcount > maxRequestsPerMember) { + member.rrdp.finish(); + member.requestcount = 0; + member.rrdp = null; + } + + synchronized (this) { + member.inuse = false; + notify(); + } + } + + return r; + } + + public int getMaxRequestsPerMember() { + return maxRequestsPerMember; + } + + public void setMaxRequestsPerMember(int maxRequestsPerMember) { + this.maxRequestsPerMember = maxRequestsPerMember; + } + + public int getPoolsize() { + return poolsize; + } + + @Override + public void finish() { + finished = true; + synchronized (this) { + boolean alldone = false; + + while (!alldone) { + alldone = true; + for (int i = 0 ; i < poolsize ; i++) { + if (pool[i] == null) continue; + if (! pool[i].inuse) { + if (pool[i].rrdp != null) { + pool[i].rrdp.finish(); + pool[i].rrdp = null; + pool[i] = null; + } + continue; + } else { + alldone = false; + // in use - have to wait + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + break; + } + } + } + } + } + + public RRDCommandFactory getFactory() { + return factory; + } + + public void setFactory(RRDCommandFactory factory) { + this.factory = factory; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/net/stamfest/rrd/RRDToolService.java Thu Aug 26 09:47:56 2010 +0200 @@ -0,0 +1,61 @@ +package net.stamfest.rrd; + +import java.io.IOException; + +/** + * + * @author peter + * + * A java interface to Tobi Oetikers rrdtool. This provides rrd + * operations to java programs. It puts a layer on top of the actual job + * to invoke rrdtool functionality. By default, it uses the rrdtool pipe + * mode of operation with a single instance of rrdtool. It can use other + * backends as well, eg. a RRDCommandPool. The entire machinery is quite + * flexible. + * + */ + +public class RRDToolService implements RRD { + RRDCommand backend; + + public RRDToolService() throws IOException { + this.backend = new RRDp(); + } + + public RRDToolService(RRDCommand backend) { + this.backend = backend; + } + + /* (non-Javadoc) + * @see net.stamfest.rrd.RRD#info(java.lang.String) + */ + public CommandResult info(String filename) throws Exception { + return backend.command(new String[] { "info", filename }); + } + + public CommandResult update(String filename, String args[]) throws Exception { + String cmd[] = new String[args.length + 1]; + cmd[0] = "update"; + for (int i = 0, j = 1 ; i < args.length ; i++, j++) { + cmd[j] = args[i]; + } + return backend.command(cmd); + } + + /* (non-Javadoc) + * @see net.stamfest.rrd.RRD#graphv(java.lang.String[]) + */ + public CommandResult graphv(String[] cmdin) throws Exception { + String cmd[] = new String[cmdin.length + 2]; + cmd[0] = "graphv"; + cmd[1] = "-"; + + for (int i = 0, j = 2 ; i < cmdin.length ; i++, j++) { + cmd[j] = cmdin[i]; + } + + return backend.command(cmd); + } + + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/net/stamfest/rrd/RRDUpdate.java Thu Aug 26 09:47:56 2010 +0200 @@ -0,0 +1,8 @@ +package net.stamfest.rrd; + +public interface RRDUpdate { + + public CommandResult update(String filename, String args[]) + throws Exception; + +} \ No newline at end of file
--- a/src/net/stamfest/rrd/RRDp.java Tue Aug 24 17:01:08 2010 +0200 +++ b/src/net/stamfest/rrd/RRDp.java Thu Aug 26 09:47:56 2010 +0200 @@ -2,16 +2,18 @@ import java.io.IOException; import java.io.InputStream; -import java.io.OutputStreamWriter; +import java.io.OutputStream; +import java.net.Socket; import java.util.Arrays; import java.util.HashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class RRDp implements RRD { +public class RRDp implements RRDCommand { Process rrdtool = null; + Socket socket = null; - private OutputStreamWriter writer; + private OutputStream writer; private InputStream reader; public RRDp() throws IOException { @@ -22,7 +24,14 @@ InputStream r = rrdtool.getInputStream(); reader = r; - writer = new OutputStreamWriter(rrdtool.getOutputStream()); + writer = rrdtool.getOutputStream(); + } + + public RRDp(String host, int port) throws IOException { + socket = new Socket(host, port); + + reader = socket.getInputStream(); + writer = socket.getOutputStream(); } // OK u:0.00 s:0.00 r:17.20 @@ -37,7 +46,7 @@ throw new Exception("Use graphv instead"); } - if (rrdtool == null) { + if (rrdtool == null && socket == null) { throw new Exception("No subprocess available (already closed?)"); } StringBuffer sb = new StringBuffer(); @@ -47,7 +56,7 @@ } sb.append('\n'); - writer.write(sb.toString()); + writer.write(sb.toString().getBytes()); writer.flush(); String line; @@ -124,55 +133,28 @@ return new String(b, 0, pos); } - public void close() { - if (rrdtool != null) { + public void finish() { + if (rrdtool != null || socket != null) { try { + if (socket != null) socket.close(); writer.close(); reader.close(); } catch (IOException e) { e.printStackTrace(); } rrdtool = null; + socket = null; } } @Override protected void finalize() throws Throwable { super.finalize(); - close(); + finish(); } + public CommandResult command(String cmd[]) throws Exception { return sendCommand(cmd); } - /* (non-Javadoc) - * @see net.stamfest.rrd.RRD#info(java.lang.String) - */ - public CommandResult info(String filename) throws Exception { - return command(new String[] { "info", filename }); - } - - public CommandResult update(String filename, String args[]) throws Exception { - String cmd[] = new String[args.length + 1]; - cmd[0] = "update"; - for (int i = 0, j = 1 ; i < args.length ; i++, j++) { - cmd[j] = args[i]; - } - return command(cmd); - } - - /* (non-Javadoc) - * @see net.stamfest.rrd.RRD#graphv(java.lang.String[]) - */ - public CommandResult graphv(String[] cmdin) throws Exception { - String cmd[] = new String[cmdin.length + 2]; - cmd[0] = "graphv"; - cmd[1] = "-"; - - for (int i = 0, j = 2 ; i < cmdin.length ; i++, j++) { - cmd[j] = cmdin[i]; - } - - return command(cmd); - } }
--- a/src/net/stamfest/rrd/Test1.java Tue Aug 24 17:01:08 2010 +0200 +++ b/src/net/stamfest/rrd/Test1.java Thu Aug 26 09:47:56 2010 +0200 @@ -2,8 +2,13 @@ public class Test1 { public static void main(String argv[]) throws Exception { - RRD rrd = new RRDp(); + RRD rrd = new RRDToolService(); CommandResult r = rrd.info(argv[0]); + System.out.println("result="+ r); + + + + for (String k : r.info.keySet()) { System.out.printf("%s=%s\n", k, r.info.get(k));
--- a/src/net/stamfest/rrd/Test2.java Tue Aug 24 17:01:08 2010 +0200 +++ b/src/net/stamfest/rrd/Test2.java Thu Aug 26 09:47:56 2010 +0200 @@ -1,18 +1,37 @@ package net.stamfest.rrd; public class Test2 { - public static void main(String argv[]) throws Exception { - RRD rrd = new RRDp(); + public static void main(final String argv[]) throws Exception { + RRDCommandPool pool = new RRDCommandPool(20); + pool.setMaxRequestsPerMember(100); + final RRD rrd = new RRDToolService(pool); - System.err.println(System.currentTimeMillis()); - for (int i = 0 ; i < 100 ; i++) { - CommandResult r = rrd.graphv(argv); + + int total = 1000; + int threads = 6; + final int perthread = total / threads; - for (String k : r.info.keySet()) { - // System.out.printf("%s=%s\n", k, r.info.get(k)); - } - System.out.println(i); + for (int t = 0 ; t < threads ; t++) { + Thread th = new Thread() { + public void run() { + System.err.println(System.currentTimeMillis()); + for (int i = 0 ; i < perthread ; i++) { + try { + rrd.graphv(argv); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + //for (String k : r.info.keySet()) { + // System.out.printf("%s=%s\n", k, r.info.get(k)); + //} + System.out.println(i); + } + System.err.println(System.currentTimeMillis()); + }; + }; + th.start(); } - System.err.println(System.currentTimeMillis()); } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/net/stamfest/rrd/Test3.java Thu Aug 26 09:47:56 2010 +0200 @@ -0,0 +1,15 @@ +package net.stamfest.rrd; + +import java.util.Arrays; + +public class Test3 { + public static void main(String argv[]) throws Exception { + RRDCachedClient rrdc = new RRDCachedClient("localhost", 7622); + CommandResult r = rrdc.update(argv[0], + Arrays.copyOfRange(argv, 1, + argv.length)); + + System.out.println(r); + + } +}