changeset 7:90a4a69c54c6

* Reconnect to tcp-based services
author Peter Stamfest <peter@stamfest.at>
date Tue, 07 Sep 2010 12:00:50 +0200
parents 37fc9235b1cf
children b5c0f893128c
files src/net/stamfest/rrd/RRDCachedClient.java src/net/stamfest/rrd/RRDCommandPool.java src/net/stamfest/rrd/RRDToolService.java src/net/stamfest/rrd/RRDp.java src/net/stamfest/rrd/tests/Test1.java src/net/stamfest/rrd/tests/Test2.java
diffstat 6 files changed, 147 insertions(+), 60 deletions(-) [+]
line wrap: on
line diff
--- a/src/net/stamfest/rrd/RRDCachedClient.java	Thu Sep 02 06:17:59 2010 +0200
+++ b/src/net/stamfest/rrd/RRDCachedClient.java	Tue Sep 07 12:00:50 2010 +0200
@@ -19,18 +19,34 @@
 public class RRDCachedClient implements RRDUpdate {
     private Socket socket;
     private OutputStream writer;
-    private InputStream reader; 
+    private InputStream reader;
+    private String host;
+    private int port; 
     
     public RRDCachedClient(String host, int port) throws UnknownHostException, IOException {
 	init(host, port);
     }
     
     private void init(String host, int port) throws UnknownHostException, IOException {
+	this.host = host;
+	this.port = port;
 	socket = new Socket(host, port);
+	socket.setKeepAlive(true);
+	socket.setSoTimeout(60000);
 	writer = socket.getOutputStream();
 	reader = socket.getInputStream();
     }
 
+    private void reconnect() throws UnknownHostException, IOException {
+	try {
+	    writer = null;
+	    reader = null;
+	    socket.close();
+	} catch (Exception ignore) {
+	}
+	init(host, port);
+    }
+    
     private String readLine() throws IOException {
 	int c;
 	byte b[] = new byte[128];
@@ -51,6 +67,10 @@
     private static Pattern response = Pattern.compile("^(-?\\d+) (.*)$");
     private synchronized CommandResult sendCommand(String cmd1[], 
                                                    String cmd2[]) throws Exception {
+	if (writer == null) {
+	    reconnect();
+	}
+	
 	CommandResult r = new CommandResult();
 	
 	StringBuffer sb = new StringBuffer();
@@ -72,34 +92,57 @@
 	
 	sb.append('\n');
 	
-	writer.write(sb.toString().getBytes());
-	writer.flush();
-	
-	System.out.println(sb);
-	
-	String line = readLine();
-	
-	System.out.println("R:" + line);
-	Matcher m = response.matcher(line);
-	if (m.find()) {
-	    int rc = Integer.parseInt(m.group(1));
-	    r.error = line; /* not using group(2), because sometimes 
-	    			rrdcached says "0 errors, ...." which 
-	    			would put "error, ..." into this field 
-	    			regardless of the positive outcome of 
-	    			the command. This could be misleading */
-	    if (rc < 0) {
-		r.ok = false;
+	try {
+	    writer.write(sb.toString().getBytes());
+	    writer.flush();
+	} catch (Exception e) {
+	    reconnect();
+	    if (writer == null) throw e;
+	    writer.write(sb.toString().getBytes());
+	    writer.flush();
+	}
+
+	try {
+	    String line = readLine();
+
+	    // System.out.println("R:" + line);
+	    Matcher m = response.matcher(line);
+	    if (m.find()) {
+		int rc = Integer.parseInt(m.group(1));
+		r.error = line; /*
+				 * not using group(2), because sometimes
+				 * rrdcached says "0 errors, ...." which would
+				 * put "error, ..." into this field regardless
+				 * of the positive outcome of the command. This
+				 * could be misleading
+				 */
+		if (rc < 0) {
+		    r.ok = false;
+		} else {
+		    sb.setLength(0);
+		    r.ok = true;
+		    while (rc-- > 0) {
+			sb.append(readLine()).append('\n');
+		    }
+		    r.output = sb.toString();
+		}
 	    } else {
-		sb.setLength(0);
-		r.ok = true;
-		while (rc-- > 0) {
-		    sb.append(readLine()).append('\n');
+		try {
+		    writer = null;
+		    reader = null;
+		    socket.close();
+		} catch (Exception ignore) {
 		}
-		r.output = sb.toString();
+		throw new Exception("Protocol error");
 	    }
-	} else {
-	    throw new Exception("Protocol error");
+	} catch (Exception e) {
+	    try {
+		writer = null;
+		reader = null;
+		socket.close();
+		throw e;
+	    } catch (Exception ignore) {
+	    }
 	}
 	return r;
     }
--- a/src/net/stamfest/rrd/RRDCommandPool.java	Thu Sep 02 06:17:59 2010 +0200
+++ b/src/net/stamfest/rrd/RRDCommandPool.java	Tue Sep 07 12:00:50 2010 +0200
@@ -15,6 +15,8 @@
 
     @Override
     public void finish() {
+	member.rrdp.finish();
+	member.rrdp = null;
     }
 
     @Override
@@ -33,24 +35,37 @@
     
     RRDPoolMember pool[] = null;
     
-    public RRDCommandPool(String basedir) {
-	init(4, basedir);
+    public RRDCommandPool(String basedir, String cachedAddress) {
+	init(4, basedir, cachedAddress);
     }
     
-    public RRDCommandPool(int poolsize, String basedir) {
-	init(poolsize, basedir);
+    public RRDCommandPool(int poolsize, String basedir, String cachedAddress) {
+	init(poolsize, basedir, cachedAddress);
+    }
+    
+    public RRDCommandPool(int poolsize, RRDCommandFactory factory) {
+	init(poolsize, factory);
     }
     
-    private void init(int poolsize, final String basedir) {
+    private void init(int poolsize, 
+                      final String basedir,
+                      final String cachedAddress) {
+	init(poolsize, new RRDCommandFactory() {
+	    @Override
+	    public RRDp createRRDCommand() throws Exception {
+	        return new RRDp(basedir, cachedAddress);
+	    }
+	});
+    }
+
+    /**
+     * @param poolsize
+     */
+    private void init(int poolsize, RRDCommandFactory factory) {
 	this.poolsize = poolsize;
+	this.factory = factory;
 	
 	pool = new RRDPoolMember[poolsize];
-	factory = new RRDCommandFactory() {
-	    @Override
-	    public RRDp createRRDCommand() throws Exception {
-	        return new RRDp(basedir);
-	    }
-	};
     }
 
     
@@ -85,8 +100,10 @@
 	}
 	
 	member.requestcount++;
-	if (member.rrdp == null) member.rrdp = factory.createRRDCommand();
-	
+	if (member.rrdp == null) {
+	    member.requestcount = 0;
+	    member.rrdp = factory.createRRDCommand();
+	}
 	
 	return new RRDPoolMemberWrapper(member);
     }
@@ -96,7 +113,10 @@
 	    RRDPoolMember member = ((RRDPoolMemberWrapper) cmd).member;
 	    
 	    if (member.requestcount > maxRequestsPerMember) {
-		member.rrdp.finish();
+		if (member.rrdp != null) {
+		    // this may happen in case of a previous error
+		    member.rrdp.finish();
+		}
 		member.requestcount = 0;
 		member.rrdp = null;
 	    }
@@ -114,6 +134,9 @@
 	RRDCommand rrdcmd = getConnection();
 	try {
 	    return rrdcmd.command(cmd);
+	} catch (Exception e) {
+	    rrdcmd.finish();
+	    throw e;
 	} finally {
 	    done(rrdcmd);
 	}
--- a/src/net/stamfest/rrd/RRDToolService.java	Thu Sep 02 06:17:59 2010 +0200
+++ b/src/net/stamfest/rrd/RRDToolService.java	Tue Sep 07 12:00:50 2010 +0200
@@ -2,7 +2,6 @@
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.IOException;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
@@ -22,10 +21,7 @@
 
 public class RRDToolService implements RRD {
     RRDCommandPool pool;
-    
-    public RRDToolService(String basedir) throws IOException {
-	this.pool = new RRDCommandPool(basedir);
-    }
+    private RRDCachedClient cachedClient = null;
     
     public RRDToolService(RRDCommandPool pool) {
 	this.pool = pool;
@@ -39,8 +35,12 @@
     }
 
     public CommandResult update(String filename, String arg) throws Exception {
-	String cmd[] = new String[] { "update", filename, arg };
-	return pool.command(cmd);
+	if (cachedClient != null) {
+	    return cachedClient.update(filename, new String[] { arg });
+	} else {
+	    String cmd[] = new String[] { "update", filename, arg };
+	    return pool.command(cmd);
+	}
     }
     
     public CommandResult update(String filename, String args[]) throws Exception {
@@ -96,21 +96,29 @@
 	for (i = 0 ; i < len ; i++) {
 	    if (filename.charAt(i) == File.separatorChar) {
 		if (st != i) {
-		    String e = filename.substring(st, i);
-		    /* handle "." and ".." directory entries */
-		    if (! e.equals(".")) {
-			if (e.equals("..")) {
-			    if (l.size() > 0) l.remove(l.size() - 1);
-			} else {
-			    l.add(e);
-			}
-		    }
+		    addPathElement(l, filename.substring(st, i));
 		}
 		st = i + 1;
 	    }
 	}
+	if (st != i) addPathElement(l, filename.substring(st, i));
 	return l;
     }
+
+    /**
+     * @param l
+     * @param e
+     */
+    private void addPathElement(ArrayList<String> l, String e) {
+	/* handle "." and ".." directory entries */
+	if (! e.equals(".")) {
+	    if (e.equals("..")) {
+		if (l.size() > 0) l.remove(l.size() - 1);
+	    } else {
+		l.add(e);
+	    }
+	}
+    }
     
     @Override
     public CommandResult create(String filename, String[] args)
@@ -118,7 +126,6 @@
     {
 	// 	split filename 
 	List<String> pel = splitFilename(filename);
-	
 	RRDCommand rrdcmd = pool.getConnection();
 	String cwd = null;
 	try {
@@ -196,5 +203,13 @@
 	    
 	}
     }
+
+    public RRDCachedClient getCachedClient() {
+        return cachedClient;
+    }
+
+    public void setCachedClient(RRDCachedClient cachedClient) {
+        this.cachedClient = cachedClient;
+    }
     
 }
--- a/src/net/stamfest/rrd/RRDp.java	Thu Sep 02 06:17:59 2010 +0200
+++ b/src/net/stamfest/rrd/RRDp.java	Tue Sep 07 12:00:50 2010 +0200
@@ -19,10 +19,14 @@
     private OutputStream writer;
     private InputStream reader;
 
-    public RRDp(String basedir) throws IOException {
+    public RRDp(String basedir, String cachedAddress) throws IOException {
 	String cmd[] = new String[] { "rrdtool", "-", basedir };
 	
 	ProcessBuilder pb = new ProcessBuilder(cmd);
+	if (cachedAddress != null) {
+	    pb.environment().put("RRDCACHED_ADDRESS", cachedAddress);
+	}
+	
 	rrdtool = pb.start();
 	
 	InputStream r = rrdtool.getInputStream();
@@ -156,6 +160,8 @@
 	    }
 	    rrdtool = null;
 	    socket = null;
+	    writer = null;
+	    reader = null;
 	}
     }
     
--- a/src/net/stamfest/rrd/tests/Test1.java	Thu Sep 02 06:17:59 2010 +0200
+++ b/src/net/stamfest/rrd/tests/Test1.java	Tue Sep 07 12:00:50 2010 +0200
@@ -6,7 +6,7 @@
 
 public class Test1 {
     public static void main(String argv[]) throws Exception {
-	RRD rrd = new RRDToolService(".");
+	RRD rrd = null;
 	CommandResult r = rrd.info(argv[0]);
 	System.out.println("result="+ r);
 	
--- a/src/net/stamfest/rrd/tests/Test2.java	Thu Sep 02 06:17:59 2010 +0200
+++ b/src/net/stamfest/rrd/tests/Test2.java	Tue Sep 07 12:00:50 2010 +0200
@@ -6,7 +6,7 @@
 
 public class Test2 {
     public static void main(final String argv[]) throws Exception {
-	RRDCommandPool pool = new RRDCommandPool(20, ".");
+	RRDCommandPool pool = new RRDCommandPool(20, ".", null);
 	pool.setMaxRequestsPerMember(100);
 	final RRD rrd = new RRDToolService(pool);