Guest User

Untitled

a guest
Jul 16th, 2018
129
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.15 KB | None | 0 0
  1. $connection = new AMQPStreamConnection('localhost', 5672, '*', '*');
  2. $channel = $connection->channel();
  3. $channel->queue_declare('rpc_queue', false, false, false, false);
  4.  
  5. $callback = function($req) {
  6.  
  7. $request = json_decode($req->body);
  8.  
  9. $someclass = new SomeClass();
  10. $response = $someclass->someFunction($request);
  11.  
  12. $msg = new AMQPMessage(json_encode($response), array('correlation_id' => $req->get('correlation_id')));
  13. $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
  14. $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
  15.  
  16. };
  17.  
  18. $channel->basic_qos(null, 1, null);
  19. $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
  20.  
  21. while(count($channel->callbacks)) {
  22. $channel->wait();
  23. }
  24.  
  25. $channel->close();
  26. $connection->close();
  27.  
  28. class RPCClient {
  29.  
  30. private IConnection connection;
  31. private IModel channel;
  32. private string replyQueueName;
  33. private QueueingBasicConsumer consumer;
  34.  
  35. public RPCClient(){
  36. try{
  37. var factory = new ConnectionFactory() { HostName = Conf.hostName, UserName = Conf.userName, Password = Conf.password, RequestedHeartbeat = Conf.heartbeat, AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true };
  38. connection = factory.CreateConnection();
  39. channel = connection.CreateModel();
  40. replyQueueName = channel.QueueDeclare().QueueName;
  41. consumer = new QueueingBasicConsumer(channel);
  42. channel.BasicConsume(queue: replyQueueName, noAck: true, consumer: consumer);
  43. } catch (Exception e){}
  44. }
  45.  
  46. public string Call(string message) {
  47.  
  48. var corrId = Guid.NewGuid().ToString();
  49. var props = channel.CreateBasicProperties();
  50. props.ReplyTo = replyQueueName;
  51. props.CorrelationId = corrId;
  52.  
  53. var messageBytes = Encoding.UTF8.GetBytes(message);
  54. channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes);
  55.  
  56. while (true){
  57. var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
  58.  
  59. if (ea.BasicProperties.CorrelationId == corrId){
  60. return Encoding.UTF8.GetString(ea.Body);
  61. }
  62. }
  63. }
  64.  
  65. public void Close(){
  66. connection.Close();
  67. }
Add Comment
Please, Sign In to add comment