Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Jul 17th, 2012  |  syntax: Java  |  size: 17.72 KB  |  hits: 14  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
This paste has a previous version, view the difference. Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hama.bsp;
  19.  
  20. import java.io.IOException;
  21. import java.net.InetAddress;
  22. import java.net.InetSocketAddress;
  23. import java.util.ArrayList;
  24. import java.util.HashMap;
  25. import java.util.Iterator;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.util.Map.Entry;
  29.  
  30. import junit.framework.TestCase;
  31.  
  32. import org.apache.commons.logging.Log;
  33. import org.apache.commons.logging.LogFactory;
  34. import org.apache.hadoop.conf.Configuration;
  35. import org.apache.hadoop.fs.FSDataInputStream;
  36. import org.apache.hadoop.fs.FileSystem;
  37. import org.apache.hadoop.fs.Path;
  38. import org.apache.hadoop.io.BytesWritable;
  39. import org.apache.hadoop.io.IntWritable;
  40. import org.apache.hadoop.io.LongWritable;
  41. import org.apache.hadoop.io.NullWritable;
  42. import org.apache.hadoop.io.Text;
  43. import org.apache.hadoop.io.Writable;
  44. import org.apache.hadoop.ipc.RPC;
  45. import org.apache.hadoop.ipc.Server;
  46. import org.apache.hama.Constants;
  47. import org.apache.hama.HamaConfiguration;
  48. import org.apache.hama.bsp.Counters.Counter;
  49. import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer;
  50. import org.apache.hama.bsp.ft.CheckpointService;
  51. import org.apache.hama.bsp.ft.IFaultTolerantPeerService;
  52. import org.apache.hama.bsp.message.HadoopMessageManager;
  53. import org.apache.hama.bsp.message.MessageManager;
  54. import org.apache.hama.bsp.message.MessageManagerFactory;
  55. import org.apache.hama.bsp.message.MessageQueue;
  56. import org.apache.hama.bsp.message.type.ByteMessage;
  57. import org.apache.hama.bsp.sync.BSPPeerSyncClient;
  58. import org.apache.hama.bsp.sync.PeerSyncClient;
  59. import org.apache.hama.bsp.sync.SyncClient;
  60. import org.apache.hama.bsp.sync.SyncEvent;
  61. import org.apache.hama.bsp.sync.SyncEventListener;
  62. import org.apache.hama.bsp.sync.SyncException;
  63. import org.apache.hama.bsp.sync.SyncServiceFactory;
  64. import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
  65. import org.apache.hama.ipc.BSPPeerProtocol;
  66. import org.apache.hama.ipc.HamaRPCProtocolVersion;
  67. import org.apache.hama.util.BSPNetUtils;
  68. import org.apache.hama.util.KeyValuePair;
  69.  
  70. public class TestCheckpoint extends TestCase {
  71.  
  72.   public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
  73.  
  74.   static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
  75.  
  76.   public static class TestMessageManager implements MessageManager<Text> {
  77.  
  78.     List<BSPMessageBundle<Text>> messageQueue = new ArrayList<BSPMessageBundle<Text>>();
  79.  
  80.     @Override
  81.     public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
  82.         Configuration conf, InetSocketAddress peerAddress) {
  83.       // TODO Auto-generated method stub
  84.  
  85.     }
  86.  
  87.     @Override
  88.     public void close() {
  89.       // TODO Auto-generated method stub
  90.  
  91.     }
  92.  
  93.     @Override
  94.     public Text getCurrentMessage() throws IOException {
  95.       return this.messageQueue.get(0).getMessages().get(0);
  96.     }
  97.  
  98.     @Override
  99.     public void send(String peerName, Text msg) throws IOException {
  100.     }
  101.  
  102.     @Override
  103.     public void finishSendPhase() throws IOException {
  104.     }
  105.  
  106.     @Override
  107.     public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() {
  108.       return null;
  109.     }
  110.  
  111.     @Override
  112.     public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle)
  113.         throws IOException {
  114.       // TODO Auto-generated method stub
  115.  
  116.     }
  117.  
  118.     @Override
  119.     public void clearOutgoingQueues() {
  120.       // TODO Auto-generated method stub
  121.  
  122.     }
  123.  
  124.     @Override
  125.     public int getNumCurrentMessages() {
  126.       // TODO Auto-generated method stub
  127.       return this.messageQueue.size();
  128.     }
  129.  
  130.     @Override
  131.     public void loopBackMessage(BSPMessageBundle<? extends Writable> bundle) {
  132.       // TODO Auto-generated method stub
  133.  
  134.     }
  135.  
  136.     @Override
  137.     public void replayMessages() {
  138.       // TODO Auto-generated method stub
  139.  
  140.     }
  141.  
  142.     public void addMessage(BSPMessageBundle<Text> message) {
  143.       this.messageQueue.add(message);
  144.     }
  145.  
  146.   }
  147.  
  148.   public static class TestBSPPeer implements
  149.       BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> {
  150.  
  151.     Configuration conf;
  152.     long superstepCount;
  153.     IFaultTolerantPeerService<Text> fService;
  154.  
  155.     public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId,
  156.         Counters counters, long superstep, BSPPeerSyncClient syncClient,
  157.         MessageManager<Text> messenger) {
  158.       this.conf = conf;
  159.       superstepCount = 0;
  160.  
  161.       try {
  162.         fService = (new CheckpointService<Text>()).constructPeerFaultTolerance(
  163.             job, (BSPPeer) this, (BSPPeerSyncClient) syncClient, null, taskId,
  164.             superstep, conf, messenger);
  165.       } catch (Exception e) {
  166.         // TODO Auto-generated catch block
  167.         e.printStackTrace();
  168.       }
  169.     }
  170.  
  171.     @Override
  172.     public void send(String peerName, Text msg) throws IOException {
  173.     }
  174.  
  175.     @Override
  176.     public Text getCurrentMessage() throws IOException {
  177.       return new Text("data");
  178.     }
  179.  
  180.     @Override
  181.     public int getNumCurrentMessages() {
  182.       return 1;
  183.     }
  184.  
  185.     @Override
  186.     public void sync() throws IOException, SyncException, InterruptedException {
  187.       ++superstepCount;
  188.       try {
  189.         this.fService.afterBarrier();
  190.       } catch (Exception e) {
  191.         e.printStackTrace();
  192.       }
  193.       LOG.info("After barrier " + superstepCount);
  194.     }
  195.  
  196.     @Override
  197.     public long getSuperstepCount() {
  198.       return superstepCount;
  199.     }
  200.  
  201.     @Override
  202.     public String getPeerName() {
  203.       return null;
  204.     }
  205.  
  206.     @Override
  207.     public String getPeerName(int index) {
  208.       return null;
  209.     }
  210.  
  211.     @Override
  212.     public int getPeerIndex() {
  213.       return 1;
  214.     }
  215.  
  216.     @Override
  217.     public String[] getAllPeerNames() {
  218.       return null;
  219.     }
  220.  
  221.     @Override
  222.     public int getNumPeers() {
  223.       return 0;
  224.     }
  225.  
  226.     @Override
  227.     public void clear() {
  228.  
  229.     }
  230.  
  231.     @Override
  232.     public void write(NullWritable key, NullWritable value) throws IOException {
  233.  
  234.     }
  235.  
  236.     @Override
  237.     public boolean readNext(NullWritable key, NullWritable value)
  238.         throws IOException {
  239.       return false;
  240.     }
  241.  
  242.     @Override
  243.     public KeyValuePair<NullWritable, NullWritable> readNext()
  244.         throws IOException {
  245.       return null;
  246.     }
  247.  
  248.     @Override
  249.     public void reopenInput() throws IOException {
  250.  
  251.     }
  252.  
  253.     @Override
  254.     public Configuration getConfiguration() {
  255.       return null;
  256.     }
  257.  
  258.     @Override
  259.     public Counter getCounter(Enum<?> name) {
  260.       return null;
  261.     }
  262.  
  263.     @Override
  264.     public Counter getCounter(String group, String name) {
  265.       return null;
  266.     }
  267.  
  268.     @Override
  269.     public void incrementCounter(Enum<?> key, long amount) {
  270.  
  271.     }
  272.  
  273.     @Override
  274.     public void incrementCounter(String group, String counter, long amount) {
  275.  
  276.     }
  277.  
  278.   }
  279.  
  280.   public static class TempSyncClient extends BSPPeerSyncClient {
  281.  
  282.     Map<String, Writable> valueMap = new HashMap<String, Writable>();
  283.  
  284.     @Override
  285.     public String constructKey(BSPJobID jobId, String... args) {
  286.       StringBuffer buffer = new StringBuffer(100);
  287.       buffer.append(jobId.toString()).append("/");
  288.       for (String arg : args) {
  289.         buffer.append(arg).append("/");
  290.       }
  291.       return buffer.toString();
  292.     }
  293.  
  294.     @Override
  295.     public boolean storeInformation(String key, Writable value,
  296.         boolean permanent, SyncEventListener listener) {
  297.       LOG.info("Storing value = " + value.toString() + " for key " + key);
  298.       valueMap.put(key, value);
  299.       return true;
  300.     }
  301.  
  302.     @Override
  303.     public Writable getInformation(String key,
  304.         Class<? extends Writable> classType) {
  305.       LOG.info("Getting value for key " + key);
  306.       return valueMap.get(key);
  307.     }
  308.  
  309.     @Override
  310.     public boolean addKey(String key, boolean permanent,
  311.         SyncEventListener listener) {
  312.       valueMap.put(key, NullWritable.get());
  313.       return true;
  314.     }
  315.  
  316.     @Override
  317.     public boolean hasKey(String key) {
  318.       return valueMap.containsKey(key);
  319.     }
  320.  
  321.     @Override
  322.     public String[] getChildKeySet(String key, SyncEventListener listener) {
  323.       List<String> list = new ArrayList<String>();
  324.       Iterator<String> keyIter = valueMap.keySet().iterator();
  325.       while (keyIter.hasNext()) {
  326.         String keyVal = keyIter.next();
  327.         if (keyVal.startsWith(key + "/")) {
  328.           list.add(keyVal);
  329.         }
  330.       }
  331.       String[] arr = new String[list.size()];
  332.       list.toArray(arr);
  333.       return arr;
  334.     }
  335.  
  336.     @Override
  337.     public boolean registerListener(String key, SyncEvent event,
  338.         SyncEventListener listener) {
  339.       return false;
  340.     }
  341.  
  342.     @Override
  343.     public boolean remove(String key, SyncEventListener listener) {
  344.       valueMap.remove(key);
  345.       return false;
  346.     }
  347.  
  348.     @Override
  349.     public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
  350.         throws Exception {
  351.     }
  352.  
  353.     @Override
  354.     public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
  355.         long superstep) throws SyncException {
  356.       LOG.info("Enter barrier called - " + superstep);
  357.     }
  358.  
  359.     @Override
  360.     public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
  361.         long superstep) throws SyncException {
  362.       LOG.info("Exit barrier called - " + superstep);
  363.     }
  364.  
  365.     @Override
  366.     public void register(BSPJobID jobId, TaskAttemptID taskId,
  367.         String hostAddress, long port) {
  368.     }
  369.  
  370.     @Override
  371.     public String[] getAllPeerNames(TaskAttemptID taskId) {
  372.       return null;
  373.     }
  374.  
  375.     @Override
  376.     public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
  377.         String hostAddress, long port) {
  378.     }
  379.  
  380.     @Override
  381.     public void stopServer() {
  382.     }
  383.  
  384.     @Override
  385.     public void close() throws IOException {
  386.     }
  387.  
  388.   }
  389.  
  390.   public void testCheckpoint() throws Exception {
  391.     Configuration config = new Configuration();
  392.     config.set(SyncServiceFactory.SYNC_PEER_CLASS,
  393.         TempSyncClient.class.getName());
  394.     config.set(Constants.FAULT_TOLERANCE_CLASS,
  395.         CheckpointService.class.getName());
  396.     config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
  397.     int port = BSPNetUtils.getFreePort(12502);
  398.     LOG.info("Got port = " + port);
  399.  
  400.     config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
  401.     config.setInt(Constants.PEER_PORT, port);
  402.  
  403.     config.set("bsp.output.dir", "/tmp/hama-test_out");
  404.     FileSystem dfs = FileSystem.get(config);
  405.     BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
  406.     TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
  407.     // BSPPeerImpl bspTask = new BSPPeerImpl(job, config, dfs, taskId);
  408.     TestMessageManager messenger = new TestMessageManager();
  409.     PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
  410.         .getPeerSyncClient(config);
  411.     BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
  412.         (BSPPeerSyncClient) syncClient, messenger);
  413.  
  414.     assertNotNull("BSPPeerImpl should not be null.", bspTask);
  415.     if (dfs.mkdirs(new Path("/checkpoint"))) {
  416.       if (dfs.mkdirs(new Path("/checkpoint/job_checkpttest_0001"))) {
  417.         if (dfs.mkdirs(new Path("/checkpoint/job_checkpttest_0001/1"))) {
  418.           if (dfs.mkdirs(new Path("/checkpoint/job_checkpttest_0001/1/1")))
  419.             ;
  420.         }
  421.       }
  422.     }
  423.  
  424.     LOG.info("Created bsp peer and other parameters");
  425.  
  426.     IFaultTolerantPeerService<Text> service = null;
  427.     boolean initialized = false;
  428.     try {
  429.       service = (new CheckpointService<Text>()).constructPeerFaultTolerance(
  430.           job, (BSPPeer) bspTask, syncClient, new InetSocketAddress(port),
  431.           taskId, -1L, config, messenger);
  432.       initialized = true;
  433.     } catch (Exception e) {
  434.       e.printStackTrace();
  435.     }
  436.     assertTrue(initialized);
  437.     LOG.info("Initialized correctly");
  438.  
  439.     assertTrue("Make sure directory is created.",
  440.         dfs.exists(new Path(checkpointedDir)));
  441.  
  442.     Text txtMessage = new Text("data");
  443.     BSPMessageBundle<Text> bundle = new BSPMessageBundle<Text>();
  444.     bundle.addMessage(txtMessage);
  445.     messenger.addMessage(bundle);
  446.  
  447.     assertNotNull("Message bundle can not be null.", bundle);
  448.     assertNotNull("Configuration should not be null.", config);
  449.  
  450.     bspTask.sync();
  451.  
  452.     LOG.info("out of sync");
  453.  
  454.     LongWritable superstepNo = (LongWritable) syncClient.getInformation(
  455.         syncClient.constructKey(job.getJobID(), "checkpoint",
  456.             "" + bspTask.getPeerIndex()), LongWritable.class);
  457.  
  458.     LOG.info(superstepNo);
  459.  
  460.     assertEquals(superstepNo.get(), 1L);
  461.  
  462.     String expectedPath = "/checkpoint/job_checkpttest_0001/1/1";
  463.     FSDataInputStream in = dfs.open(new Path(expectedPath));
  464.     BSPMessageBundle<Text> bundleRead = new BSPMessageBundle<Text>();
  465.     bundleRead.readFields(in);
  466.     in.close();
  467.     List<Text> readMessages = bundleRead.getMessages();
  468.     int size = in.readInt();
  469.     assertEquals(1, size);
  470.     assertEquals(1, readMessages.size());
  471.     assertEquals("data", readMessages.get(0));
  472.  
  473.     // Path path = new Path()
  474.  
  475.     // bspTask.checkpointSentMessages(checkpointedDir +
  476.     // "/attempt_201110302255_0001_000000_0",
  477.     // bundle);
  478.  
  479.     // FSDataInputStream in = dfs.open(new Path(checkpointedDir
  480.     // + "/attempt_201110302255_0001_000000_0"));
  481.     // BSPMessageBundle bundleRead = new BSPMessageBundle();
  482.     // bundleRead.readFields(in);
  483.     // in.close();
  484.     // ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0);
  485.     // String content = new String(byteMsg.getData());
  486.     // LOG.info("Saved checkpointed content is " + content);
  487.     // assertTrue("Message content should be the same.",
  488.     // "data".equals(content));
  489.     // dfs.delete(new Path("checkpoint"), true);
  490.   }
  491.  
  492.   // public void testCheckpointInterval() throws Exception {
  493.   //
  494.   // Configuration conf = new Configuration();
  495.   // conf.set("bsp.output.dir", "/tmp/hama-test_out");
  496.   // conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
  497.   // LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
  498.   //
  499.   // conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
  500.   //
  501.   // int port = BSPNetUtils.getFreePort(5000);
  502.   // InetSocketAddress inetAddress = new InetSocketAddress(port);
  503.   // MinimalGroomServer groom = new MinimalGroomServer(conf);
  504.   // Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),
  505.   // inetAddress.getPort(), conf);
  506.   // workerServer.start();
  507.   //
  508.   // LOG.info("Started RPC server");
  509.   // conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
  510.   // conf.setInt("bsp.peers.num", 1);
  511.   //
  512.   // BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
  513.   // BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress,
  514.   // conf);
  515.   // LOG.info("Started the proxy connections");
  516.   //
  517.   // TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
  518.   // "job_201110102255", 1), 1), 1);
  519.   //
  520.   // try {
  521.   // BSPJob job = new BSPJob(new HamaConfiguration(conf));
  522.   // job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
  523.   // job.setOutputFormat(TextOutputFormat.class);
  524.   // final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
  525.   // BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
  526.   // new InetSocketAddress("127.0.0.1", port), conf);
  527.   //
  528.   // BSPTask task = new BSPTask();
  529.   // task.setConf(job);
  530.   //
  531.   // @SuppressWarnings("rawtypes")
  532.   // BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid,
  533.   // proto, 0, null, null, new Counters());
  534.   //
  535.   // bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f,
  536.   // TaskStatus.State.RUNNING, "running", "127.0.0.1",
  537.   // TaskStatus.Phase.STARTING, new Counters()));
  538.   //
  539.   // assertEquals(bspPeer.isReadyToCheckpoint(), false);
  540.   //
  541.   // conf.setBoolean(Constants.CHECKPOINT_ENABLED, true);
  542.   // conf.setInt(Constants.CHECKPOINT_INTERVAL, 3);
  543.   //
  544.   // bspPeer.sync();
  545.   //
  546.   // LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
  547.   // + bspPeer.getSuperstepCount());
  548.   // assertEquals(bspPeer.isReadyToCheckpoint(), false);
  549.   // bspPeer.sync();
  550.   // LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
  551.   // + bspPeer.getSuperstepCount());
  552.   // assertEquals(bspPeer.isReadyToCheckpoint(), false);
  553.   // bspPeer.sync();
  554.   // LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
  555.   // + bspPeer.getSuperstepCount());
  556.   // assertEquals(bspPeer.isReadyToCheckpoint(), true);
  557.   //
  558.   // job.setCheckPointInterval(5);
  559.   // bspPeer.sync();
  560.   // LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
  561.   // + bspPeer.getSuperstepCount());
  562.   // assertEquals(bspPeer.isReadyToCheckpoint(), false);
  563.   // bspPeer.sync();
  564.   // LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
  565.   // + bspPeer.getSuperstepCount());
  566.   // assertEquals(bspPeer.isReadyToCheckpoint(), false);
  567.   //
  568.   // } catch (Exception e) {
  569.   // LOG.error("Error testing BSPPeer.", e);
  570.   // } finally {
  571.   // umbilical.close();
  572.   // Thread.sleep(2000);
  573.   // workerServer.stop();
  574.   // Thread.sleep(2000);
  575.   // }
  576.   //
  577.   // }
  578. }