changeset 2:65c7909bfb95

* more initial development
author Peter Stamfest <peter@stamfest.at>
date Thu, 26 Aug 2010 09:47:56 +0200
parents d107d0fd0718
children 3b566ca4615b
files src/net/stamfest/rrd/CommandResult.java src/net/stamfest/rrd/RRD.java src/net/stamfest/rrd/RRDCachedClient.java src/net/stamfest/rrd/RRDCommand.java src/net/stamfest/rrd/RRDCommandFactory.java src/net/stamfest/rrd/RRDCommandPool.java src/net/stamfest/rrd/RRDToolService.java src/net/stamfest/rrd/RRDUpdate.java src/net/stamfest/rrd/RRDp.java src/net/stamfest/rrd/Test1.java src/net/stamfest/rrd/Test2.java src/net/stamfest/rrd/Test3.java src/net/stamfest/rrd/system/RRDSystem.java src/net/stamfest/rrd/system/RRDSystemFactory.java
diffstat 14 files changed, 458 insertions(+), 52 deletions(-) [+]
line wrap: on
line diff
--- a/src/net/stamfest/rrd/CommandResult.java	Tue Aug 24 17:01:08 2010 +0200
+++ b/src/net/stamfest/rrd/CommandResult.java	Thu Aug 26 09:47:56 2010 +0200
@@ -13,4 +13,16 @@
     // support for output from special commands
     public HashMap<String, String> info = null;  // info, graphv
     public byte image[] = null; // graphv
+    
+    public String toString() {
+	StringBuffer sb = new StringBuffer();
+	sb.append("ok:").append(ok).append('\n');
+	sb.append("u:").append(user);
+	sb.append(" s:").append(system);
+	sb.append(" t:").append(total).append('\n');
+	sb.append("message:").append(error).append('\n');
+	sb.append("output:").append(output).append('\n');
+	
+	return sb.toString();
+    }
 }
--- a/src/net/stamfest/rrd/RRD.java	Tue Aug 24 17:01:08 2010 +0200
+++ b/src/net/stamfest/rrd/RRD.java	Thu Aug 26 09:47:56 2010 +0200
@@ -1,7 +1,6 @@
 package net.stamfest.rrd;
 
-public interface RRD {
+public interface RRD extends RRDUpdate {
     public CommandResult info(String filename) throws Exception;
     public CommandResult graphv(String[] cmdin) throws Exception;
-    public CommandResult update(String filename, String args[]) throws Exception;
 }
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/net/stamfest/rrd/RRDCachedClient.java	Thu Aug 26 09:47:56 2010 +0200
@@ -0,0 +1,129 @@
+package net.stamfest.rrd;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Because Java does not support UNIX Sockets, this can only be 
+ * used with rrdcached listening on a TCP socket.
+ * 
+ * @author peter
+ *
+ */
+public class RRDCachedClient implements RRDUpdate {
+    private Socket socket;
+    private OutputStream writer;
+    private InputStream reader; 
+    
+    public RRDCachedClient(String host, int port) throws UnknownHostException, IOException {
+	init(host, port);
+    }
+    
+    private void init(String host, int port) throws UnknownHostException, IOException {
+	socket = new Socket(host, port);
+	writer = socket.getOutputStream();
+	reader = socket.getInputStream();
+    }
+
+    private String readLine() throws IOException {
+	int c;
+	byte b[] = new byte[128];
+	int pos = 0;
+	
+	while (true) {
+	    c = reader.read();
+	    if (c == -1 || c == '\n') break;
+	    if (pos == b.length) {
+		b = Arrays.copyOf(b, b.length * 2);
+	    }
+	    b[pos++] = (byte) c;
+	}
+	
+	return new String(b, 0, pos);
+    }
+
+    private static Pattern response = Pattern.compile("^(-?\\d+) (.*)$");
+    private synchronized CommandResult sendCommand(String cmd1[], 
+                                                   String cmd2[]) throws Exception {
+	CommandResult r = new CommandResult();
+	
+	StringBuffer sb = new StringBuffer();
+	
+	if (cmd1 != null) {
+	    for (int i = 0 ; i < cmd1.length ; i++) {
+		if (sb.length() > 0) sb.append(' ');
+		sb.append(cmd1[i]);
+	    }
+	}
+	if (cmd2 != null) {
+	    System.out.println("CMD2 " + cmd2.length);
+	    for (int i = 0 ; i < cmd2.length ; i++) {
+		if (sb.length() > 0) sb.append(' ');
+		sb.append(cmd2[i]);
+	    }
+	}
+	if (sb.length() == 0) return null;
+	
+	sb.append('\n');
+	
+	writer.write(sb.toString().getBytes());
+	writer.flush();
+	
+	System.out.println(sb);
+	
+	String line = readLine();
+	
+	System.out.println("R:" + line);
+	Matcher m = response.matcher(line);
+	if (m.find()) {
+	    int rc = Integer.parseInt(m.group(1));
+	    r.error = line; /* not using group(2), because sometimes 
+	    			rrdcached says "0 errors, ...." which 
+	    			would put "error, ..." into this field 
+	    			regardless of the positive outcome of 
+	    			the command. This could be misleading */
+	    if (rc < 0) {
+		r.ok = false;
+	    } else {
+		sb.setLength(0);
+		r.ok = true;
+		while (rc-- > 0) {
+		    sb.append(readLine()).append('\n');
+		}
+		r.output = sb.toString();
+	    }
+	} else {
+	    throw new Exception("Protocol error");
+	}
+	return r;
+    }
+    
+    @Override
+    public CommandResult update(String filename, String[] args)
+	throws Exception
+    {
+	return sendCommand(new String[] { "UPDATE", filename }, args);
+    }
+    
+    public CommandResult flush(String filename) throws Exception {
+	return sendCommand(new String[] { "FLUSH", filename }, null);
+    }
+
+    public CommandResult flushall(String filename) throws Exception {
+	return sendCommand(new String[] { "FLUSHALL" }, null);
+    }
+
+    public void close() throws IOException {
+	if (socket != null) socket.close();
+	socket = null;
+	writer = null;
+	reader = null;
+    }
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/net/stamfest/rrd/RRDCommand.java	Thu Aug 26 09:47:56 2010 +0200
@@ -0,0 +1,6 @@
+package net.stamfest.rrd;
+
+public interface RRDCommand {
+    public CommandResult command(String cmd[]) throws Exception;
+    public void finish();
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/net/stamfest/rrd/RRDCommandFactory.java	Thu Aug 26 09:47:56 2010 +0200
@@ -0,0 +1,18 @@
+package net.stamfest.rrd;
+
+/**
+ * 
+ * @author peter
+ *
+ * Used by RRDpPool to have control about which kind of RRDp gets 
+ * used by the pool.
+ * 
+ */
+public interface RRDCommandFactory {
+    /**
+     * 
+     * @return a new instance of RRDp
+     * @throws Exception 
+     */
+    public RRDCommand createRRDp() throws Exception;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/net/stamfest/rrd/RRDCommandPool.java	Thu Aug 26 09:47:56 2010 +0200
@@ -0,0 +1,142 @@
+package net.stamfest.rrd;
+
+public class RRDCommandPool implements RRDCommand {
+    class RRDPoolMember {
+	int requestcount = 0;
+	boolean inuse = false;
+	RRDCommand rrdp;
+    }
+    
+    private int poolsize = 4;
+    private int maxRequestsPerMember = 100;
+    private boolean finished = false;
+    
+    private RRDCommandFactory factory = null;
+    
+    RRDPoolMember pool[] = null;
+    
+    public RRDCommandPool() {
+	init(4);
+    }
+    
+    public RRDCommandPool(int poolsize) {
+	init(poolsize);
+    }
+    
+    private void init(int poolsize) {
+	this.poolsize = poolsize;
+	
+	pool = new RRDPoolMember[poolsize];
+	factory = new RRDCommandFactory() {
+	    @Override
+	    public RRDp createRRDp() throws Exception {
+	        return new RRDp();
+	    }
+	};
+    }
+
+    @Override
+    public CommandResult command(String[] cmd) throws Exception {
+	RRDPoolMember member = null;
+
+	if (finished) throw new Exception("Already finished");
+
+	synchronized(this) {
+	    while (true) {
+		for (int i = 0 ; i < poolsize ; i++) {
+		    if (pool[i] == null) {
+			pool[i] = new RRDPoolMember();
+			member = pool[i];
+			break;
+		    } else if (! pool[i].inuse) {
+			member = pool[i];
+			break;
+		    }
+		}
+		if (member != null) {
+		    member.inuse = true;
+		    break;
+		} else {
+		    try {
+			wait();
+		    } catch (InterruptedException e) {
+			// 		ignore
+		    }
+		    // next loop iteration 
+		}
+	    }
+	}
+	
+	CommandResult r = null;
+	if (member != null) {
+	    member.requestcount++;
+	    if (member.rrdp == null) member.rrdp = factory.createRRDp();
+	    r = member.rrdp.command(cmd);
+	    
+	    if (member.requestcount > maxRequestsPerMember) {
+		member.rrdp.finish();
+		member.requestcount = 0;
+		member.rrdp = null;
+	    }
+	    
+	    synchronized (this) {
+		member.inuse = false;
+		notify();
+	    }
+	}
+	
+	return r;
+    }
+
+    public int getMaxRequestsPerMember() {
+        return maxRequestsPerMember;
+    }
+
+    public void setMaxRequestsPerMember(int maxRequestsPerMember) {
+        this.maxRequestsPerMember = maxRequestsPerMember;
+    }
+
+    public int getPoolsize() {
+        return poolsize;
+    }
+
+    @Override
+    public void finish() {
+	finished = true;
+	synchronized (this) {
+	    boolean alldone = false;
+	    
+	    while (!alldone) {
+		alldone = true;
+		for (int i = 0 ; i < poolsize ; i++) {
+		    if (pool[i] == null) continue;
+		    if (! pool[i].inuse) {
+			if (pool[i].rrdp != null) {
+			    pool[i].rrdp.finish();
+			    pool[i].rrdp = null;
+			    pool[i] = null;
+			}
+			continue;
+		    } else {
+			alldone = false;
+			// in use - have to wait
+			try {
+			    wait();
+			} catch (InterruptedException e) {
+			    // 	ignore
+			}
+			break;
+		    }
+		}
+	    }
+	}
+    }
+
+    public RRDCommandFactory getFactory() {
+        return factory;
+    }
+
+    public void setFactory(RRDCommandFactory factory) {
+        this.factory = factory;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/net/stamfest/rrd/RRDToolService.java	Thu Aug 26 09:47:56 2010 +0200
@@ -0,0 +1,61 @@
+package net.stamfest.rrd;
+
+import java.io.IOException;
+
+/**
+ * 
+ * @author peter
+ * 
+ *         A java interface to Tobi Oetikers rrdtool. This provides rrd
+ *         operations to java programs. It puts a layer on top of the actual job
+ *         to invoke rrdtool functionality. By default, it uses the rrdtool pipe
+ *         mode of operation with a single instance of rrdtool. It can use other
+ *         backends as well, eg. a RRDCommandPool. The entire machinery is quite
+ *         flexible.
+ * 
+ */
+
+public class RRDToolService implements RRD {
+    RRDCommand backend;
+    
+    public RRDToolService() throws IOException {
+	this.backend = new RRDp();
+    }
+    
+    public RRDToolService(RRDCommand backend) {
+	this.backend = backend;
+    }
+    
+    /* (non-Javadoc)
+     * @see net.stamfest.rrd.RRD#info(java.lang.String)
+     */
+    public CommandResult info(String filename) throws Exception {
+	return backend.command(new String[] { "info", filename });
+    }
+
+    public CommandResult update(String filename, String args[]) throws Exception {
+	String cmd[] = new String[args.length + 1];
+	cmd[0] = "update";
+	for (int i = 0, j = 1 ; i < args.length ; i++, j++) {
+	    cmd[j] = args[i];
+	}
+	return backend.command(cmd);
+    }
+    
+    /* (non-Javadoc)
+     * @see net.stamfest.rrd.RRD#graphv(java.lang.String[])
+     */
+    public CommandResult graphv(String[] cmdin) throws Exception {
+	String cmd[] = new String[cmdin.length + 2];
+	cmd[0] = "graphv";
+	cmd[1] = "-";
+	
+	for (int i = 0, j = 2 ; i < cmdin.length ; i++, j++) {
+	    cmd[j] = cmdin[i];
+	}
+
+	return backend.command(cmd);
+    }
+
+    
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/net/stamfest/rrd/RRDUpdate.java	Thu Aug 26 09:47:56 2010 +0200
@@ -0,0 +1,8 @@
+package net.stamfest.rrd;
+
+public interface RRDUpdate {
+
+    public CommandResult update(String filename, String args[])
+	throws Exception;
+
+}
\ No newline at end of file
--- a/src/net/stamfest/rrd/RRDp.java	Tue Aug 24 17:01:08 2010 +0200
+++ b/src/net/stamfest/rrd/RRDp.java	Thu Aug 26 09:47:56 2010 +0200
@@ -2,16 +2,18 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStreamWriter;
+import java.io.OutputStream;
+import java.net.Socket;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class RRDp implements RRD {
+public class RRDp implements RRDCommand {
     Process rrdtool = null;
+    Socket socket = null;
    
-    private OutputStreamWriter writer;
+    private OutputStream writer;
     private InputStream reader;
 
     public RRDp() throws IOException {
@@ -22,7 +24,14 @@
 	
 	InputStream r = rrdtool.getInputStream();
 	reader = r;
-	writer = new OutputStreamWriter(rrdtool.getOutputStream());
+	writer = rrdtool.getOutputStream();
+    }
+    
+    public RRDp(String host, int port) throws IOException {
+	socket = new Socket(host, port);
+	
+	reader = socket.getInputStream();
+	writer = socket.getOutputStream();
     }
     
     // OK u:0.00 s:0.00 r:17.20
@@ -37,7 +46,7 @@
 	    throw new Exception("Use graphv instead"); 
 	}
 	
-	if (rrdtool == null) {
+	if (rrdtool == null && socket == null) {
 	    throw new Exception("No subprocess available (already closed?)");
 	}
 	StringBuffer sb = new StringBuffer();
@@ -47,7 +56,7 @@
 	}
 	sb.append('\n');
 	
-	writer.write(sb.toString());
+	writer.write(sb.toString().getBytes());
 	writer.flush();
 	
 	String line;
@@ -124,55 +133,28 @@
 	return new String(b, 0, pos);
     }
 
-    public void close() {
-	if (rrdtool != null) {
+    public void finish() {
+	if (rrdtool != null || socket != null) {
 	    try {
+		if (socket != null) socket.close();
 		writer.close();
 		reader.close();
 	    } catch (IOException e) {
 		e.printStackTrace();
 	    }
 	    rrdtool = null;
+	    socket = null;
 	}
     }
     
     @Override
     protected void finalize() throws Throwable {
         super.finalize();
-        close();
+        finish();
     }
+    
     public CommandResult command(String cmd[]) throws Exception {
 	return sendCommand(cmd);
     }
     
-    /* (non-Javadoc)
-     * @see net.stamfest.rrd.RRD#info(java.lang.String)
-     */
-    public CommandResult info(String filename) throws Exception {
-	return command(new String[] { "info", filename });
-    }
-
-    public CommandResult update(String filename, String args[]) throws Exception {
-	String cmd[] = new String[args.length + 1];
-	cmd[0] = "update";
-	for (int i = 0, j = 1 ; i < args.length ; i++, j++) {
-	    cmd[j] = args[i];
-	}
-	return command(cmd);
-    }
-    
-    /* (non-Javadoc)
-     * @see net.stamfest.rrd.RRD#graphv(java.lang.String[])
-     */
-    public CommandResult graphv(String[] cmdin) throws Exception {
-	String cmd[] = new String[cmdin.length + 2];
-	cmd[0] = "graphv";
-	cmd[1] = "-";
-	
-	for (int i = 0, j = 2 ; i < cmdin.length ; i++, j++) {
-	    cmd[j] = cmdin[i];
-	}
-
-	return command(cmd);
-    }
 }
--- a/src/net/stamfest/rrd/Test1.java	Tue Aug 24 17:01:08 2010 +0200
+++ b/src/net/stamfest/rrd/Test1.java	Thu Aug 26 09:47:56 2010 +0200
@@ -2,8 +2,13 @@
 
 public class Test1 {
     public static void main(String argv[]) throws Exception {
-	RRD rrd = new RRDp();
+	RRD rrd = new RRDToolService();
 	CommandResult r = rrd.info(argv[0]);
+	System.out.println("result="+ r);
+	
+	
+	
+	
 	
 	for (String k : r.info.keySet()) {
 	    System.out.printf("%s=%s\n", k, r.info.get(k));
--- a/src/net/stamfest/rrd/Test2.java	Tue Aug 24 17:01:08 2010 +0200
+++ b/src/net/stamfest/rrd/Test2.java	Thu Aug 26 09:47:56 2010 +0200
@@ -1,18 +1,37 @@
 package net.stamfest.rrd;
 
 public class Test2 {
-    public static void main(String argv[]) throws Exception {
-	RRD rrd = new RRDp();
+    public static void main(final String argv[]) throws Exception {
+	RRDCommandPool pool = new RRDCommandPool(20);
+	pool.setMaxRequestsPerMember(100);
+	final RRD rrd = new RRDToolService(pool);
 	
-	System.err.println(System.currentTimeMillis());
-	for (int i = 0 ; i < 100 ; i++) {
-	    CommandResult r = rrd.graphv(argv);
+	
+	int total = 1000;
+	int threads = 6;
+	final int perthread = total  / threads;
 	
-	    for (String k : r.info.keySet()) {
-		// System.out.printf("%s=%s\n", k, r.info.get(k));
-	    }
-	    System.out.println(i);
+	for (int t = 0 ; t < threads ; t++) {
+	    Thread th = new Thread() {
+		public void run() {
+		    System.err.println(System.currentTimeMillis());
+		    for (int i = 0 ; i < perthread ; i++) {
+			try {
+			    rrd.graphv(argv);
+			} catch (Exception e) {
+			    // TODO Auto-generated catch block
+			    e.printStackTrace();
+			}
+			
+			//for (String k : r.info.keySet()) {
+			// System.out.printf("%s=%s\n", k, r.info.get(k));
+			//}
+			System.out.println(i);
+		    }
+		    System.err.println(System.currentTimeMillis());
+		};
+	    };
+	    th.start();
 	}
-	System.err.println(System.currentTimeMillis());
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/net/stamfest/rrd/Test3.java	Thu Aug 26 09:47:56 2010 +0200
@@ -0,0 +1,15 @@
+package net.stamfest.rrd;
+
+import java.util.Arrays;
+
+public class Test3 {
+    public static void main(String argv[]) throws Exception {
+	RRDCachedClient rrdc = new RRDCachedClient("localhost", 7622);
+	CommandResult r = rrdc.update(argv[0], 
+	                              Arrays.copyOfRange(argv, 1, 
+	                                                 argv.length));
+
+	System.out.println(r);
+	
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/net/stamfest/rrd/system/RRDSystem.java	Thu Aug 26 09:47:56 2010 +0200
@@ -0,0 +1,5 @@
+package net.stamfest.rrd.system;
+
+public interface RRDSystem {
+
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/net/stamfest/rrd/system/RRDSystemFactory.java	Thu Aug 26 09:47:56 2010 +0200
@@ -0,0 +1,5 @@
+package net.stamfest.rrd.system;
+
+public class RRDSystemFactory {
+
+}