Advertisement
Guest User

Untitled

a guest
Jul 15th, 2016
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.85 KB | None | 0 0
  1. package syncWrites;
  2.  
  3. /**
  4. * Check Galera discrepancies in sync writes (parameter wsrep_sync_wait)
  5. *
  6. * Run:
  7. * scp Java/bin/syncWrite/* app@10.9.202.189:syncWrite/Java/syncWrite; ssh app@10.9.202.189 '(cd syncWrite/Java; java -cp ".:mysql-connector-java-5.1.39-bin.jar" syncWrite.simplified --pass *** --db test --nodes 10.9.202.189,100.64.6.114,10.9.197.46 --sw 7)'
  8. */
  9.  
  10.  
  11. import java.text.SimpleDateFormat;
  12. import java.sql.DriverManager;
  13. import java.sql.Connection;
  14. import java.sql.ResultSet;
  15. import java.sql.SQLException;
  16. import java.sql.Statement;
  17. import java.util.Date;
  18. import java.util.HashMap;
  19. import java.util.Map;
  20.  
  21. public class simplified {
  22. static class Cfg {
  23. String user = "root";
  24. String pass;
  25. String dbName;
  26. String[] nodes;
  27. int delay = 0;
  28. String wsrep_sync_wait = "0";
  29. String pk = "1";
  30. }
  31.  
  32. static class NodeState {
  33. String addr;
  34. Connection conn;
  35. int maxTsDiff = 0;
  36. int maxValDiff = -1; // negative, to print stats on the first cycle
  37. NodeState(String addr) { this.addr = addr; }
  38. @Override
  39. public String toString() {
  40. return "NodeState [addr=" + addr + ", conn=" + conn
  41. + ", maxTsDiff=" + maxTsDiff + ", maxValDiff=" + maxValDiff
  42. + "]";
  43. }
  44.  
  45.  
  46. }
  47.  
  48. static String tsPrefix() { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss: ").format(new Date()); }
  49. static void printlnRaw(String s) { System.out.println(s); }
  50. static void printfRaw(String fmt, Object...args) { System.out.printf(fmt, args); }
  51. static void println(String s) { System.out.print(tsPrefix()); System.out.println(s); }
  52. static void printf(String fmt, Object...args) { System.out.printf(fmt, args); }
  53.  
  54. static Cfg cfg;
  55. static NodeState[] nodeStates;
  56. static volatile int latestVal = -1;
  57. static NodeState sourceNode = null;
  58.  
  59. public static void main(String[] args) throws SQLException, InterruptedException {
  60.  
  61. System.out.println("++ Single Threaded ++");
  62. simplified.cfg = parseArgs(args);
  63. if( simplified.cfg == null )
  64. return;
  65.  
  66. simplified.nodeStates = new NodeState[cfg.nodes.length];
  67. for(int k=0; k < cfg.nodes.length; k++)
  68. nodeStates[k] = new NodeState(cfg.nodes[k]);
  69.  
  70. sourceNode = nodeStates[0];
  71.  
  72. System.out.println("Source Node: " + sourceNode.toString());
  73.  
  74. while (true) {
  75. increment(nodeStates[0], cfg.pk);
  76.  
  77. for(int i = 1; i<cfg.nodes.length; i++) {
  78. queryAndReport(nodeStates[i], simplified.cfg.pk);
  79. }
  80. }
  81.  
  82. }
  83.  
  84. static Connection getConnection(String nodeAddr) throws SQLException {
  85. String url = "jdbc:mysql://" + nodeAddr + "/" + simplified.cfg.dbName;
  86. System.out.println("Getting connection from " + url);
  87. Connection conn = DriverManager.getConnection(url, simplified.cfg.user, simplified.cfg.pass);
  88. conn.setAutoCommit(true);
  89. conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
  90. Statement stmt = conn.createStatement();
  91. stmt.execute("set session wsrep_sync_wait="+String.valueOf(cfg.wsrep_sync_wait));
  92. // stmt.execute("set session wsrep_causal_reads="+String.valueOf(cfg.wsrep_sync_wait));
  93. stmt.close();
  94. return conn;
  95. }
  96.  
  97. static synchronized int getLatestValue() {
  98. return simplified.latestVal;
  99. }
  100.  
  101. static synchronized int readFromSource(NodeState nodeState) throws SQLException {
  102. if (nodeState.conn == null)
  103. nodeState.conn = getConnection(nodeState.addr);
  104.  
  105. Statement stmt = nodeState.conn.createStatement();
  106.  
  107. ResultSet rs = stmt.executeQuery("select val from snc where pk=1");
  108. rs.next();
  109. int newval = rs.getInt(1);
  110. rs.close();
  111. stmt.close();
  112. return newval;
  113. }
  114.  
  115. static synchronized void setLatestValue(int newvalue) {
  116. simplified.latestVal = newvalue;
  117. }
  118.  
  119. static int increment(NodeState nodeState, String pk) throws SQLException {
  120. if (nodeState.conn == null)
  121. nodeState.conn = getConnection(nodeState.addr);
  122. Statement stmt = nodeState.conn.createStatement();
  123. int updatedval = getLatestValue() + 1;
  124. stmt.execute("update snc set val=" + updatedval
  125. + ", ts=default where pk=" + pk);
  126. nodeState.conn.commit();
  127. ResultSet rs = stmt.executeQuery("select val from snc where pk=1;");
  128. rs.next();
  129. int newval = rs.getInt(1);
  130. setLatestValue(newval);
  131. rs.close();
  132. stmt.close();
  133. return newval;
  134. }
  135.  
  136. static void queryAndReport(NodeState nodeState, String pk) throws SQLException {
  137. try {
  138. if( nodeState.conn == null )
  139. nodeState.conn = getConnection(nodeState.addr);
  140.  
  141. Statement stmt = nodeState.conn.createStatement();
  142.  
  143. // Checking the session variable wsrep_sync_wait before actually making a query.
  144. ResultSet rs1 = stmt.executeQuery("select @@session.wsrep_sync_wait;");
  145. String syncVariable = "";
  146. while(rs1.next()) {
  147. syncVariable = rs1.getString(1);
  148. }
  149.  
  150. if(syncVariable.trim().equals("0")) {
  151. System.err.println("Session variable 'wsrep_sync_wait' is NOT set to 1");
  152. }
  153. rs1.close();
  154.  
  155.  
  156. rs1 = stmt.executeQuery("select @@session.wsrep_causal_reads;");
  157. while(rs1.next()) {
  158. syncVariable = rs1.getString(1);
  159. }
  160.  
  161. if(syncVariable.trim().equals("0")) {
  162. System.err.println("Session variable 'wsrep_causal_reads' is NOT ON");
  163. }
  164. rs1.close();
  165.  
  166.  
  167. // Read from source first
  168. int valFromSource = readFromSource(sourceNode);
  169. ResultSet rs = stmt.executeQuery("select TIMESTAMPDIFF(SECOND, ts, NOW()), val from snc where pk=1");
  170. rs.next();
  171. int tsDiff = rs.getInt(1);
  172. int currentVal = rs.getInt(2);
  173. int valDiff = valFromSource - currentVal;
  174. rs.close();
  175.  
  176. rs = stmt.executeQuery(
  177. "select lower(variable_name) as name, variable_value as val "+
  178. "from information_schema.session_status "+
  179. "where variable_name in('WSREP_LOCAL_RECV_QUEUE', 'WSREP_LOCAL_SEND_QUEUE', 'WSREP_LOCAL_REPLAYS', "+
  180. "'WSREP_LOCAL_RECV_QUEUE_MAX', 'WSREP_LOCAL_SEND_QUEUE_MAX', 'WSREP_FLOW_CONTROL_PAUSED') "+
  181. "order by variable_name");
  182. Map<String, Integer> sesStat = new HashMap<String, Integer>(10);
  183. while(rs.next()) {
  184. String name = rs.getString(1);
  185. int val = name.equals("wsrep_flow_control_paused") ? Math.round(rs.getFloat(2) * 1000000) : rs.getInt(2);
  186. sesStat.put(name, val);
  187. }
  188. rs.close();
  189.  
  190. // Report
  191. nodeState.maxTsDiff = Math.max(nodeState.maxTsDiff, tsDiff);
  192. if( valDiff > 0 ) {
  193. nodeState.maxValDiff = Math.max(nodeState.maxValDiff, valDiff);
  194. printf("%15s: val diff: %3d, src: %d, curr: %d, max ts diff %7.3f sec, recvQ max: %3d, sendQ max: %2d, pause %1.6f\n",
  195. nodeState.addr,
  196. valDiff,
  197. valFromSource,
  198. currentVal,
  199. nodeState.maxTsDiff / 1000000f,
  200. sesStat.get("wsrep_local_recv_queue_max"),
  201. sesStat.get("wsrep_local_send_queue_max"),
  202. sesStat.get("wsrep_flow_control_paused") / 1000000f
  203. );
  204. System.exit(1);
  205. }
  206.  
  207. } catch(SQLException e) {
  208. printf("++ exception on %s: %s\n", nodeState.addr, e.getMessage());
  209. if( nodeState.conn != null ) {
  210. nodeState.conn.close();
  211. nodeState.conn = null;
  212. }
  213. }
  214. }
  215.  
  216. static Cfg parseArgs(String[] args) {
  217. Cfg cfg = new Cfg();
  218. boolean help = false, err = false;
  219.  
  220. for(int k=0; k < args.length; k++) {
  221. if( args[k].equals("-h") || args[k].equals("--help") ) { help = true; }
  222. else if( args[k].equals("-s") || args[k].equals("--sleep") ) { cfg.delay = Integer.parseInt(args[++k]); }
  223. else if( args[k].equals("-n") || args[k].equals("--nodes") ) { cfg.nodes = args[++k].split(","); }
  224. else if( args[k].equals("-b") || args[k].equals("--db") ) { cfg.dbName = args[++k]; }
  225. else if( args[k].equals("-u") || args[k].equals("--user") ) { cfg.user = args[++k]; }
  226. else if( args[k].equals("-p") || args[k].equals("--pass") ) { cfg.pass = args[++k]; }
  227. else if( args[k].equals("-w") || args[k].equals("--sw") ) { cfg.wsrep_sync_wait = args[++k]; }
  228. else if( args[k].equals("-k") || args[k].equals("--pk") ) { cfg.pk = args[++k]; }
  229. else {
  230. printfRaw("Unknown parameter %s\n", args[k]);
  231. err = true;
  232. }
  233. }
  234.  
  235. if( help ) {
  236. printfRaw("Sync write tester (simplified)\n");
  237. printfRaw("Params:\n");
  238. printfRaw(" -h, --help: this help\n");
  239. printfRaw(" -n, --nodes: comma-separated list of cluster node hostnames or ips, for ex.: node1,node2,node3\n");
  240. printfRaw(" -d, --db: database name, mandatory\n");
  241. printfRaw(" -u, --user: user name, default is %s\n", cfg.user);
  242. printfRaw(" -p, --pass: passord, mandatory\n");
  243. printfRaw(" -s, --sleep: sleep delay between cycles in milliseconds, default is %d\n", cfg.delay);
  244. printfRaw(" -w, --sw: wsrep_sync_wait parameter, 0 to 7, default is %s\n", cfg.wsrep_sync_wait);
  245. printfRaw(" -k, --pk: PK of the record to update/query, default is %s\n", cfg.pk);
  246. return null;
  247. }
  248.  
  249. if( !err ) {
  250. if( cfg.dbName == null || cfg.dbName.length() == 0 ) { printfRaw("Mandatory parameter --db is missing\n"); err = true; }
  251. if( cfg.pass == null || cfg.pass.length() == 0 ) { printfRaw("Mandatory parameter --pass is missing\n"); err = true; }
  252. if( cfg.nodes == null || cfg.nodes.length == 0 ) { printfRaw("Mandatory parameter --nodes is missing\n"); err = true; }
  253. if( cfg.wsrep_sync_wait.compareTo("0") < 0 || cfg.wsrep_sync_wait.compareTo("7") > 0 ) { printfRaw("Bad --sw value, expecting 0 to 7\n"); err = true; }
  254. }
  255.  
  256. return err ? null : cfg;
  257. }
  258. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement