Mercurial > java-rrd-hg
changeset 7:90a4a69c54c6
* Reconnect to tcp-based services
author | Peter Stamfest <peter@stamfest.at> |
---|---|
date | Tue, 07 Sep 2010 12:00:50 +0200 |
parents | 37fc9235b1cf |
children | b5c0f893128c |
files | src/net/stamfest/rrd/RRDCachedClient.java src/net/stamfest/rrd/RRDCommandPool.java src/net/stamfest/rrd/RRDToolService.java src/net/stamfest/rrd/RRDp.java src/net/stamfest/rrd/tests/Test1.java src/net/stamfest/rrd/tests/Test2.java |
diffstat | 6 files changed, 147 insertions(+), 60 deletions(-) [+] |
line wrap: on
line diff
--- a/src/net/stamfest/rrd/RRDCachedClient.java Thu Sep 02 06:17:59 2010 +0200 +++ b/src/net/stamfest/rrd/RRDCachedClient.java Tue Sep 07 12:00:50 2010 +0200 @@ -19,18 +19,34 @@ public class RRDCachedClient implements RRDUpdate { private Socket socket; private OutputStream writer; - private InputStream reader; + private InputStream reader; + private String host; + private int port; public RRDCachedClient(String host, int port) throws UnknownHostException, IOException { init(host, port); } private void init(String host, int port) throws UnknownHostException, IOException { + this.host = host; + this.port = port; socket = new Socket(host, port); + socket.setKeepAlive(true); + socket.setSoTimeout(60000); writer = socket.getOutputStream(); reader = socket.getInputStream(); } + private void reconnect() throws UnknownHostException, IOException { + try { + writer = null; + reader = null; + socket.close(); + } catch (Exception ignore) { + } + init(host, port); + } + private String readLine() throws IOException { int c; byte b[] = new byte[128]; @@ -51,6 +67,10 @@ private static Pattern response = Pattern.compile("^(-?\\d+) (.*)$"); private synchronized CommandResult sendCommand(String cmd1[], String cmd2[]) throws Exception { + if (writer == null) { + reconnect(); + } + CommandResult r = new CommandResult(); StringBuffer sb = new StringBuffer(); @@ -72,34 +92,57 @@ sb.append('\n'); - writer.write(sb.toString().getBytes()); - writer.flush(); - - System.out.println(sb); - - String line = readLine(); - - System.out.println("R:" + line); - Matcher m = response.matcher(line); - if (m.find()) { - int rc = Integer.parseInt(m.group(1)); - r.error = line; /* not using group(2), because sometimes - rrdcached says "0 errors, ...." which - would put "error, ..." into this field - regardless of the positive outcome of - the command. This could be misleading */ - if (rc < 0) { - r.ok = false; + try { + writer.write(sb.toString().getBytes()); + writer.flush(); + } catch (Exception e) { + reconnect(); + if (writer == null) throw e; + writer.write(sb.toString().getBytes()); + writer.flush(); + } + + try { + String line = readLine(); + + // System.out.println("R:" + line); + Matcher m = response.matcher(line); + if (m.find()) { + int rc = Integer.parseInt(m.group(1)); + r.error = line; /* + * not using group(2), because sometimes + * rrdcached says "0 errors, ...." which would + * put "error, ..." into this field regardless + * of the positive outcome of the command. This + * could be misleading + */ + if (rc < 0) { + r.ok = false; + } else { + sb.setLength(0); + r.ok = true; + while (rc-- > 0) { + sb.append(readLine()).append('\n'); + } + r.output = sb.toString(); + } } else { - sb.setLength(0); - r.ok = true; - while (rc-- > 0) { - sb.append(readLine()).append('\n'); + try { + writer = null; + reader = null; + socket.close(); + } catch (Exception ignore) { } - r.output = sb.toString(); + throw new Exception("Protocol error"); } - } else { - throw new Exception("Protocol error"); + } catch (Exception e) { + try { + writer = null; + reader = null; + socket.close(); + throw e; + } catch (Exception ignore) { + } } return r; }
--- a/src/net/stamfest/rrd/RRDCommandPool.java Thu Sep 02 06:17:59 2010 +0200 +++ b/src/net/stamfest/rrd/RRDCommandPool.java Tue Sep 07 12:00:50 2010 +0200 @@ -15,6 +15,8 @@ @Override public void finish() { + member.rrdp.finish(); + member.rrdp = null; } @Override @@ -33,24 +35,37 @@ RRDPoolMember pool[] = null; - public RRDCommandPool(String basedir) { - init(4, basedir); + public RRDCommandPool(String basedir, String cachedAddress) { + init(4, basedir, cachedAddress); } - public RRDCommandPool(int poolsize, String basedir) { - init(poolsize, basedir); + public RRDCommandPool(int poolsize, String basedir, String cachedAddress) { + init(poolsize, basedir, cachedAddress); + } + + public RRDCommandPool(int poolsize, RRDCommandFactory factory) { + init(poolsize, factory); } - private void init(int poolsize, final String basedir) { + private void init(int poolsize, + final String basedir, + final String cachedAddress) { + init(poolsize, new RRDCommandFactory() { + @Override + public RRDp createRRDCommand() throws Exception { + return new RRDp(basedir, cachedAddress); + } + }); + } + + /** + * @param poolsize + */ + private void init(int poolsize, RRDCommandFactory factory) { this.poolsize = poolsize; + this.factory = factory; pool = new RRDPoolMember[poolsize]; - factory = new RRDCommandFactory() { - @Override - public RRDp createRRDCommand() throws Exception { - return new RRDp(basedir); - } - }; } @@ -85,8 +100,10 @@ } member.requestcount++; - if (member.rrdp == null) member.rrdp = factory.createRRDCommand(); - + if (member.rrdp == null) { + member.requestcount = 0; + member.rrdp = factory.createRRDCommand(); + } return new RRDPoolMemberWrapper(member); } @@ -96,7 +113,10 @@ RRDPoolMember member = ((RRDPoolMemberWrapper) cmd).member; if (member.requestcount > maxRequestsPerMember) { - member.rrdp.finish(); + if (member.rrdp != null) { + // this may happen in case of a previous error + member.rrdp.finish(); + } member.requestcount = 0; member.rrdp = null; } @@ -114,6 +134,9 @@ RRDCommand rrdcmd = getConnection(); try { return rrdcmd.command(cmd); + } catch (Exception e) { + rrdcmd.finish(); + throw e; } finally { done(rrdcmd); }
--- a/src/net/stamfest/rrd/RRDToolService.java Thu Sep 02 06:17:59 2010 +0200 +++ b/src/net/stamfest/rrd/RRDToolService.java Tue Sep 07 12:00:50 2010 +0200 @@ -2,7 +2,6 @@ import java.io.BufferedReader; import java.io.File; -import java.io.IOException; import java.io.StringReader; import java.util.ArrayList; import java.util.List; @@ -22,10 +21,7 @@ public class RRDToolService implements RRD { RRDCommandPool pool; - - public RRDToolService(String basedir) throws IOException { - this.pool = new RRDCommandPool(basedir); - } + private RRDCachedClient cachedClient = null; public RRDToolService(RRDCommandPool pool) { this.pool = pool; @@ -39,8 +35,12 @@ } public CommandResult update(String filename, String arg) throws Exception { - String cmd[] = new String[] { "update", filename, arg }; - return pool.command(cmd); + if (cachedClient != null) { + return cachedClient.update(filename, new String[] { arg }); + } else { + String cmd[] = new String[] { "update", filename, arg }; + return pool.command(cmd); + } } public CommandResult update(String filename, String args[]) throws Exception { @@ -96,21 +96,29 @@ for (i = 0 ; i < len ; i++) { if (filename.charAt(i) == File.separatorChar) { if (st != i) { - String e = filename.substring(st, i); - /* handle "." and ".." directory entries */ - if (! e.equals(".")) { - if (e.equals("..")) { - if (l.size() > 0) l.remove(l.size() - 1); - } else { - l.add(e); - } - } + addPathElement(l, filename.substring(st, i)); } st = i + 1; } } + if (st != i) addPathElement(l, filename.substring(st, i)); return l; } + + /** + * @param l + * @param e + */ + private void addPathElement(ArrayList<String> l, String e) { + /* handle "." and ".." directory entries */ + if (! e.equals(".")) { + if (e.equals("..")) { + if (l.size() > 0) l.remove(l.size() - 1); + } else { + l.add(e); + } + } + } @Override public CommandResult create(String filename, String[] args) @@ -118,7 +126,6 @@ { // split filename List<String> pel = splitFilename(filename); - RRDCommand rrdcmd = pool.getConnection(); String cwd = null; try { @@ -196,5 +203,13 @@ } } + + public RRDCachedClient getCachedClient() { + return cachedClient; + } + + public void setCachedClient(RRDCachedClient cachedClient) { + this.cachedClient = cachedClient; + } }
--- a/src/net/stamfest/rrd/RRDp.java Thu Sep 02 06:17:59 2010 +0200 +++ b/src/net/stamfest/rrd/RRDp.java Tue Sep 07 12:00:50 2010 +0200 @@ -19,10 +19,14 @@ private OutputStream writer; private InputStream reader; - public RRDp(String basedir) throws IOException { + public RRDp(String basedir, String cachedAddress) throws IOException { String cmd[] = new String[] { "rrdtool", "-", basedir }; ProcessBuilder pb = new ProcessBuilder(cmd); + if (cachedAddress != null) { + pb.environment().put("RRDCACHED_ADDRESS", cachedAddress); + } + rrdtool = pb.start(); InputStream r = rrdtool.getInputStream(); @@ -156,6 +160,8 @@ } rrdtool = null; socket = null; + writer = null; + reader = null; } }
--- a/src/net/stamfest/rrd/tests/Test1.java Thu Sep 02 06:17:59 2010 +0200 +++ b/src/net/stamfest/rrd/tests/Test1.java Tue Sep 07 12:00:50 2010 +0200 @@ -6,7 +6,7 @@ public class Test1 { public static void main(String argv[]) throws Exception { - RRD rrd = new RRDToolService("."); + RRD rrd = null; CommandResult r = rrd.info(argv[0]); System.out.println("result="+ r);
--- a/src/net/stamfest/rrd/tests/Test2.java Thu Sep 02 06:17:59 2010 +0200 +++ b/src/net/stamfest/rrd/tests/Test2.java Tue Sep 07 12:00:50 2010 +0200 @@ -6,7 +6,7 @@ public class Test2 { public static void main(final String argv[]) throws Exception { - RRDCommandPool pool = new RRDCommandPool(20, "."); + RRDCommandPool pool = new RRDCommandPool(20, ".", null); pool.setMaxRequestsPerMember(100); final RRD rrd = new RRDToolService(pool);