Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- $connection = new AMQPStreamConnection('localhost', 5672, '*', '*');
- $channel = $connection->channel();
- $channel->queue_declare('rpc_queue', false, false, false, false);
- $callback = function($req) {
- $request = json_decode($req->body);
- $someclass = new SomeClass();
- $response = $someclass->someFunction($request);
- $msg = new AMQPMessage(json_encode($response), array('correlation_id' => $req->get('correlation_id')));
- $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
- $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
- };
- $channel->basic_qos(null, 1, null);
- $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
- while(count($channel->callbacks)) {
- $channel->wait();
- }
- $channel->close();
- $connection->close();
- class RPCClient {
- private IConnection connection;
- private IModel channel;
- private string replyQueueName;
- private QueueingBasicConsumer consumer;
- public RPCClient(){
- try{
- var factory = new ConnectionFactory() { HostName = Conf.hostName, UserName = Conf.userName, Password = Conf.password, RequestedHeartbeat = Conf.heartbeat, AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true };
- connection = factory.CreateConnection();
- channel = connection.CreateModel();
- replyQueueName = channel.QueueDeclare().QueueName;
- consumer = new QueueingBasicConsumer(channel);
- channel.BasicConsume(queue: replyQueueName, noAck: true, consumer: consumer);
- } catch (Exception e){}
- }
- public string Call(string message) {
- var corrId = Guid.NewGuid().ToString();
- var props = channel.CreateBasicProperties();
- props.ReplyTo = replyQueueName;
- props.CorrelationId = corrId;
- var messageBytes = Encoding.UTF8.GetBytes(message);
- channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes);
- while (true){
- var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
- if (ea.BasicProperties.CorrelationId == corrId){
- return Encoding.UTF8.GetString(ea.Body);
- }
- }
- }
- public void Close(){
- connection.Close();
- }
Add Comment
Please, Sign In to add comment