/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Counters.Counter;
import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer;
import org.apache.hama.bsp.ft.CheckpointService;
import org.apache.hama.bsp.ft.IFaultTolerantPeerService;
import org.apache.hama.bsp.message.HadoopMessageManager;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
import org.apache.hama.bsp.message.MessageQueue;
import org.apache.hama.bsp.message.type.ByteMessage;
import org.apache.hama.bsp.sync.BSPPeerSyncClient;
import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.sync.SyncClient;
import org.apache.hama.bsp.sync.SyncEvent;
import org.apache.hama.bsp.sync.SyncEventListener;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.util.KeyValuePair;
public class TestCheckpoint extends TestCase {
public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
public static class TestMessageManager implements MessageManager<Text> {
List<BSPMessageBundle<Text>> messageQueue = new ArrayList<BSPMessageBundle<Text>>();
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
Configuration conf, InetSocketAddress peerAddress) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public Text getCurrentMessage() throws IOException {
return this.messageQueue.get(0).getMessages().get(0);
}
@Override
public void send(String peerName, Text msg) throws IOException {
}
@Override
public void finishSendPhase() throws IOException {
}
@Override
public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() {
return null;
}
@Override
public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle)
throws IOException {
// TODO Auto-generated method stub
}
@Override
public void clearOutgoingQueues() {
// TODO Auto-generated method stub
}
@Override
public int getNumCurrentMessages() {
// TODO Auto-generated method stub
return this.messageQueue.size();
}
@Override
public void loopBackMessage(BSPMessageBundle<? extends Writable> bundle) {
// TODO Auto-generated method stub
}
@Override
public void replayMessages() {
// TODO Auto-generated method stub
}
public void addMessage(BSPMessageBundle<Text> message) {
this.messageQueue.add(message);
}
}
public static class TestBSPPeer implements
BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> {
Configuration conf;
long superstepCount;
IFaultTolerantPeerService<Text> fService;
public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId,
Counters counters, long superstep, BSPPeerSyncClient syncClient,
MessageManager<Text> messenger) {
this.conf = conf;
superstepCount = 0;
try {
fService = (new CheckpointService<Text>()).constructPeerFaultTolerance(
job, (BSPPeer) this, (BSPPeerSyncClient) syncClient, null, taskId,
superstep, conf, messenger);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void send(String peerName, Text msg) throws IOException {
}
@Override
public Text getCurrentMessage() throws IOException {
return new Text("data");
}
@Override
public int getNumCurrentMessages() {
return 1;
}
@Override
public void sync() throws IOException, SyncException, InterruptedException {
++superstepCount;
try {
this.fService.afterBarrier();
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("After barrier " + superstepCount);
}
@Override
public long getSuperstepCount() {
return superstepCount;
}
@Override
public String getPeerName() {
return null;
}
@Override
public String getPeerName(int index) {
return null;
}
@Override
public int getPeerIndex() {
return 1;
}
@Override
public String[] getAllPeerNames() {
return null;
}
@Override
public int getNumPeers() {
return 0;
}
@Override
public void clear() {
}
@Override
public void write(NullWritable key, NullWritable value) throws IOException {
}
@Override
public boolean readNext(NullWritable key, NullWritable value)
throws IOException {
return false;
}
@Override
public KeyValuePair<NullWritable, NullWritable> readNext()
throws IOException {
return null;
}
@Override
public void reopenInput() throws IOException {
}
@Override
public Configuration getConfiguration() {
return null;
}
@Override
public Counter getCounter(Enum<?> name) {
return null;
}
@Override
public Counter getCounter(String group, String name) {
return null;
}
@Override
public void incrementCounter(Enum<?> key, long amount) {
}
@Override
public void incrementCounter(String group, String counter, long amount) {
}
}
public static class TempSyncClient extends BSPPeerSyncClient {
Map<String, Writable> valueMap = new HashMap<String, Writable>();
@Override
public String constructKey(BSPJobID jobId, String... args) {
StringBuffer buffer = new StringBuffer(100);
buffer.append(jobId.toString()).append("/");
for (String arg : args) {
buffer.append(arg).append("/");
}
return buffer.toString();
}
@Override
public boolean storeInformation(String key, Writable value,
boolean permanent, SyncEventListener listener) {
LOG.info("Storing value = " + value.toString() + " for key " + key);
valueMap.put(key, value);
return true;
}
@Override
public Writable getInformation(String key,
Class<? extends Writable> classType) {
LOG.info("Getting value for key " + key);
return valueMap.get(key);
}
@Override
public boolean addKey(String key, boolean permanent,
SyncEventListener listener) {
valueMap.put(key, NullWritable.get());
return true;
}
@Override
public boolean hasKey(String key) {
return valueMap.containsKey(key);
}
@Override
public String[] getChildKeySet(String key, SyncEventListener listener) {
List<String> list = new ArrayList<String>();
Iterator<String> keyIter = valueMap.keySet().iterator();
while (keyIter.hasNext()) {
String keyVal = keyIter.next();
if (keyVal.startsWith(key + "/")) {
list.add(keyVal);
}
}
String[] arr = new String[list.size()];
list.toArray(arr);
return arr;
}
@Override
public boolean registerListener(String key, SyncEvent event,
SyncEventListener listener) {
return false;
}
@Override
public boolean remove(String key, SyncEventListener listener) {
valueMap.remove(key);
return false;
}
@Override
public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
throws Exception {
}
@Override
public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
long superstep) throws SyncException {
LOG.info("Enter barrier called - " + superstep);
}
@Override
public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
long superstep) throws SyncException {
LOG.info("Exit barrier called - " + superstep);
}
@Override
public void register(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
}
@Override
public String[] getAllPeerNames(TaskAttemptID taskId) {
return null;
}
@Override
public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
}
@Override
public void stopServer() {
}
@Override
public void close() throws IOException {
}
}
public void testCheckpoint() throws Exception {
Configuration config = new Configuration();
config.set(SyncServiceFactory.SYNC_PEER_CLASS,
TempSyncClient.class.getName());
config.set(Constants.FAULT_TOLERANCE_CLASS,
CheckpointService.class.getName());
config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
int port = BSPNetUtils.getFreePort(12502);
LOG.info("Got port = " + port);
config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
config.setInt(Constants.PEER_PORT, port);
config.set("bsp.output.dir", "/tmp/hama-test_out");
FileSystem dfs = FileSystem.get(config);
BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
// BSPPeerImpl bspTask = new BSPPeerImpl(job, config, dfs, taskId);
TestMessageManager messenger = new TestMessageManager();
PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
.getPeerSyncClient(config);
BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
(BSPPeerSyncClient) syncClient, messenger);
assertNotNull("BSPPeerImpl should not be null.", bspTask);
if (dfs.mkdirs(new Path("/checkpoint"))) {
if (dfs.mkdirs(new Path("/checkpoint/job_checkpttest_0001"))) {
if (dfs.mkdirs(new Path("/checkpoint/job_checkpttest_0001/1"))) {
if (dfs.mkdirs(new Path("/checkpoint/job_checkpttest_0001/1/1")))
;
}
}
}
LOG.info("Created bsp peer and other parameters");
IFaultTolerantPeerService<Text> service = null;
boolean initialized = false;
try {
service = (new CheckpointService<Text>()).constructPeerFaultTolerance(
job, (BSPPeer) bspTask, syncClient, new InetSocketAddress(port),
taskId, -1L, config, messenger);
initialized = true;
} catch (Exception e) {
e.printStackTrace();
}
assertTrue(initialized);
LOG.info("Initialized correctly");
assertTrue("Make sure directory is created.",
dfs.exists(new Path(checkpointedDir)));
Text txtMessage = new Text("data");
BSPMessageBundle<Text> bundle = new BSPMessageBundle<Text>();
bundle.addMessage(txtMessage);
messenger.addMessage(bundle);
assertNotNull("Message bundle can not be null.", bundle);
assertNotNull("Configuration should not be null.", config);
bspTask.sync();
LOG.info("out of sync");
LongWritable superstepNo = (LongWritable) syncClient.getInformation(
syncClient.constructKey(job.getJobID(), "checkpoint",
"" + bspTask.getPeerIndex()), LongWritable.class);
LOG.info(superstepNo);
assertEquals(superstepNo.get(), 1L);
String expectedPath = "/checkpoint/job_checkpttest_0001/1/1";
FSDataInputStream in = dfs.open(new Path(expectedPath));
BSPMessageBundle<Text> bundleRead = new BSPMessageBundle<Text>();
bundleRead.readFields(in);
in.close();
List<Text> readMessages = bundleRead.getMessages();
int size = in.readInt();
assertEquals(1, size);
assertEquals(1, readMessages.size());
assertEquals("data", readMessages.get(0));
// Path path = new Path()
// bspTask.checkpointSentMessages(checkpointedDir +
// "/attempt_201110302255_0001_000000_0",
// bundle);
// FSDataInputStream in = dfs.open(new Path(checkpointedDir
// + "/attempt_201110302255_0001_000000_0"));
// BSPMessageBundle bundleRead = new BSPMessageBundle();
// bundleRead.readFields(in);
// in.close();
// ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0);
// String content = new String(byteMsg.getData());
// LOG.info("Saved checkpointed content is " + content);
// assertTrue("Message content should be the same.",
// "data".equals(content));
// dfs.delete(new Path("checkpoint"), true);
}
// public void testCheckpointInterval() throws Exception {
//
// Configuration conf = new Configuration();
// conf.set("bsp.output.dir", "/tmp/hama-test_out");
// conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
// LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
//
// conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
//
// int port = BSPNetUtils.getFreePort(5000);
// InetSocketAddress inetAddress = new InetSocketAddress(port);
// MinimalGroomServer groom = new MinimalGroomServer(conf);
// Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),
// inetAddress.getPort(), conf);
// workerServer.start();
//
// LOG.info("Started RPC server");
// conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
// conf.setInt("bsp.peers.num", 1);
//
// BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
// BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress,
// conf);
// LOG.info("Started the proxy connections");
//
// TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
// "job_201110102255", 1), 1), 1);
//
// try {
// BSPJob job = new BSPJob(new HamaConfiguration(conf));
// job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
// job.setOutputFormat(TextOutputFormat.class);
// final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
// BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
// new InetSocketAddress("127.0.0.1", port), conf);
//
// BSPTask task = new BSPTask();
// task.setConf(job);
//
// @SuppressWarnings("rawtypes")
// BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid,
// proto, 0, null, null, new Counters());
//
// bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f,
// TaskStatus.State.RUNNING, "running", "127.0.0.1",
// TaskStatus.Phase.STARTING, new Counters()));
//
// assertEquals(bspPeer.isReadyToCheckpoint(), false);
//
// conf.setBoolean(Constants.CHECKPOINT_ENABLED, true);
// conf.setInt(Constants.CHECKPOINT_INTERVAL, 3);
//
// bspPeer.sync();
//
// LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
// + bspPeer.getSuperstepCount());
// assertEquals(bspPeer.isReadyToCheckpoint(), false);
// bspPeer.sync();
// LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
// + bspPeer.getSuperstepCount());
// assertEquals(bspPeer.isReadyToCheckpoint(), false);
// bspPeer.sync();
// LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
// + bspPeer.getSuperstepCount());
// assertEquals(bspPeer.isReadyToCheckpoint(), true);
//
// job.setCheckPointInterval(5);
// bspPeer.sync();
// LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
// + bspPeer.getSuperstepCount());
// assertEquals(bspPeer.isReadyToCheckpoint(), false);
// bspPeer.sync();
// LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
// + bspPeer.getSuperstepCount());
// assertEquals(bspPeer.isReadyToCheckpoint(), false);
//
// } catch (Exception e) {
// LOG.error("Error testing BSPPeer.", e);
// } finally {
// umbilical.close();
// Thread.sleep(2000);
// workerServer.stop();
// Thread.sleep(2000);
// }
//
// }
}