Guest User

Untitled

a guest
Feb 18th, 2019
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.65 KB | None | 0 0
  1. // Copyright 2018 Parity Technologies (UK) Ltd.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a
  4. // copy of this software and associated documentation files (the "Software"),
  5. // to deal in the Software without restriction, including without limitation
  6. // the rights to use, copy, modify, merge, publish, distribute, sublicense,
  7. // and/or sell copies of the Software, and to permit persons to whom the
  8. // Software is furnished to do so, subject to the following conditions:
  9. //
  10. // The above copyright notice and this permission notice shall be included in
  11. // all copies or substantial portions of the Software.
  12. //
  13. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  14. // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  18. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  19. // DEALINGS IN THE SOFTWARE.
  20.  
  21. //! A basic chat application demonstrating libp2p and the mDNS and floodsub protocols.
  22. //!
  23. //! Using two terminal windows, start two instances. If you local network allows mDNS,
  24. //! they will automatically connect. Type a message in either terminal and hit return: the
  25. //! message is sent and printed in the other terminal. Close with Ctrl-c.
  26. //!
  27. //! You can of course open more terminal windows and add more participants.
  28. //! Dialing any of the other peers will propagate the new participant to all
  29. //! chat members and everyone will receive all messages.
  30. //!
  31. //! # If they don't automatically connect
  32. //!
  33. //! If the nodes don't automatically connect, take note of the listening address of the first
  34. //! instance and start the second with this address as the first argument. In the first terminal
  35. //! window, run:
  36. //!
  37. //! ```sh
  38. //! cargo run --example chat
  39. //! ```
  40. //!
  41. //! It will print the PeerId and the listening address, e.g. `Listening on
  42. //! "/ip4/0.0.0.0/tcp/24915"`
  43. //!
  44. //! In the second terminal window, start a new instance of the example with:
  45. //!
  46. //! ```sh
  47. //! cargo run --example chat -- /ip4/127.0.0.1/tcp/24915
  48. //! ```
  49. //!
  50. //! The two nodes then connect.
  51.  
  52. use futures::prelude::*;
  53. use libp2p::{
  54. NetworkBehaviour,
  55. secio,
  56. tokio_codec::{FramedRead, LinesCodec}
  57. };
  58.  
  59. extern crate tokio;
  60. use tokio::prelude::*;
  61.  
  62. extern crate env_logger;
  63.  
  64. extern crate tokio_stdin_stdout;
  65.  
  66. fn main() {
  67. env_logger::init();
  68.  
  69. // Create a random PeerId
  70. let local_key = secio::SecioKeyPair::ed25519_generated().unwrap();
  71. let local_peer_id = local_key.to_peer_id();
  72. println!("Local peer id: {:?}", local_peer_id);
  73.  
  74. // Set up a an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols
  75. let transport = libp2p::build_development_transport(local_key);
  76.  
  77. // Create a Floodsub topic
  78. let floodsub_topic = libp2p::floodsub::TopicBuilder::new("chat").build();
  79.  
  80. // We create a custom network behaviour that combines floodsub and mDNS.
  81. // In the future, we want to improve libp2p to make this easier to do.
  82. #[derive(NetworkBehaviour)]
  83. struct MyBehaviour<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> {
  84. floodsub: libp2p::floodsub::Floodsub<TSubstream>,
  85. mdns: libp2p::mdns::Mdns<TSubstream>,
  86. }
  87.  
  88. impl<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> libp2p::core::swarm::NetworkBehaviourEventProcess<libp2p::mdns::MdnsEvent> for MyBehaviour<TSubstream> {
  89. fn inject_event(&mut self, event: libp2p::mdns::MdnsEvent) {
  90. match event {
  91. libp2p::mdns::MdnsEvent::Discovered(list) => {
  92. for (peer, _) in list {
  93. self.floodsub.add_node_to_partial_view(peer);
  94. }
  95. },
  96. libp2p::mdns::MdnsEvent::Expired(list) => {
  97. for (peer, _) in list {
  98. if !self.mdns.has_node(&peer) {
  99. self.floodsub.remove_node_from_partial_view(&peer);
  100. }
  101. }
  102. }
  103. }
  104. }
  105. }
  106.  
  107. impl<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> libp2p::core::swarm::NetworkBehaviourEventProcess<libp2p::floodsub::FloodsubEvent> for MyBehaviour<TSubstream> {
  108. // Called when `floodsub` produces an event.
  109. fn inject_event(&mut self, message: libp2p::floodsub::FloodsubEvent) {
  110. if let libp2p::floodsub::FloodsubEvent::Message(message) = message {
  111. println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source);
  112. }
  113. }
  114. }
  115.  
  116. // Create a Swarm to manage peers and events
  117. let mut swarm = {
  118. let mut behaviour = MyBehaviour {
  119. floodsub: libp2p::floodsub::Floodsub::new(local_peer_id.clone()),
  120. mdns: libp2p::mdns::Mdns::new().expect("Failed to create mDNS service"),
  121. };
  122.  
  123. behaviour.floodsub.subscribe(floodsub_topic.clone());
  124. libp2p::Swarm::new(transport, behaviour, local_peer_id)
  125. };
  126.  
  127. // Listen on all interfaces and whatever port the OS assigns
  128. let addr = libp2p::Swarm::listen_on(&mut swarm, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
  129. println!("Listening on {:?}", addr);
  130.  
  131. // Reach out to another node if specified
  132. if let Some(to_dial) = std::env::args().nth(1) {
  133. let dialing = to_dial.clone();
  134. match to_dial.parse() {
  135. Ok(to_dial) => {
  136. match libp2p::Swarm::dial_addr(&mut swarm, to_dial) {
  137. Ok(_) => println!("Dialed {:?}", dialing),
  138. Err(e) => println!("Dial {:?} failed: {:?}", dialing, e)
  139. }
  140. },
  141. Err(err) => println!("Failed to parse address to dial: {:?}", err),
  142. }
  143. }
  144.  
  145. // Read full lines from stdin
  146. let stdin = tokio_stdin_stdout::stdin(0);
  147. let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new());
  148.  
  149. // Kick it off
  150. tokio::run(futures::future::poll_fn(move || -> Result<_, ()> {
  151. loop {
  152. match framed_stdin.poll().expect("Error while polling stdin") {
  153. Async::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()),
  154. Async::Ready(None) => panic!("Stdin closed"),
  155. Async::NotReady => break,
  156. };
  157. }
  158.  
  159. loop {
  160. match swarm.poll().expect("Error while polling swarm") {
  161. Async::Ready(Some(_)) => {
  162.  
  163. },
  164. Async::Ready(None) | Async::NotReady => break,
  165. }
  166. }
  167.  
  168. Ok(Async::NotReady)
  169. }));
  170. }
Add Comment
Please, Sign In to add comment