Advertisement
Guest User

Untitled

a guest
Feb 25th, 2010
1,697
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 17.75 KB | None | 0 0
  1. """
  2. Example to create a videoconferencing application using Farsight throught
  3. Telepathy
  4.  
  5. @author: Fabien LOUIS, flouis@viotech.net
  6. @date: December 2009
  7.  
  8. Copyright (C) 2009 Viotech Communications
  9.  
  10. This library is free software; you can redistribute it and/or
  11. modify it under the terms of the GNU Lesser General Public
  12. License as published by the Free Software Foundation; either
  13. version 2 of the License, or (at your option) any later version.
  14.  
  15. This library is distributed in the hope that it will be useful,
  16. but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  18. Lesser General Public License for more details.
  19.  
  20. You should have received a copy of the GNU Lesser General Public
  21. License along with this library; if not, write to the
  22. Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  23. Boston, MA 02111-1307, USA.
  24.  
  25. @Usage:
  26. Just run two clients, one with "slave" parameter and seconds with "master"
  27. parameter (master account initiate the call or slave accept the call).
  28.  
  29. @Scenario:
  30. We start by connect each accounts (first the slave and then the master).
  31.    (see function on_status_changed_user)
  32. When accounts are connected, we enable CHANNEL_TYPE_STREAMED_MEDIA capabilities
  33.    (see function function enable_capabilities)
  34. on it and MASTER create a telepathy account.
  35.    (see function function create_telepathy_channel)
  36.  
  37. When the channel CHANNEL_TYPE_STREAMED_MEDIA is created and ready
  38. (received by each contacts), we create an TfListener. But only the MASTER
  39. offer a stream on it for SLAVE. SLAVE look if some streams already exists
  40. on the telepathy channel and in this case, we accept the stream.
  41.  
  42. TfListener is a component which encapsulate farsight management (session
  43. creation, stream creation, src pad adding, etc.).
  44.  
  45. We create a tfChannel linked with the telepathy channel path and connect it
  46. with some farsight signals.
  47.  
  48. Once the stream is created and accepted, the farsight part started.
  49. First, we get 'session-created' signal which call __on_session_created
  50.    On signal "session-created", we create the pipeline and add conference
  51.    on it. We set pipeline to PLAYING and we transfer all bus message to it.
  52.    
  53. Second, we get 'stream-get-codec-config' signal which call __on_stream_get_codec_config
  54.    On signal "stream-get-codec-config", we returns our codec configuration
  55.  
  56. Third, we get 'stream-created' which call __on_stream_created
  57.    On signal "stream-created", connect signals on stream and add source.
  58.    Then we link the stream's sink-pad  to source's src-pad.
  59.  
  60. If all works (codec negotiation and other things), we get 'src-pad-added' which
  61. call __on_src_pad_added.
  62.    On signal "src-pad-added", we display stream's view
  63. """
  64.  
  65. ##################################################
  66. MASTER_ACCOUNT = "fabien.louis@hobbes"
  67. MASTER_PASSWORD = "homeb"
  68.  
  69. SLAVE_ACCOUNT = "julien.pauty@hobbes"
  70. SLAVE_PASSWORD = "homeb"
  71. ##################################################
  72.  
  73. import pygst
  74. pygst.require('0.10')
  75.  
  76. import sys, os
  77. import gobject, dbus.glib
  78. import tpfarsight
  79. import farsight, gst
  80.  
  81. os.environ["GST_PLUGIN_PATH"] = "/usr/local/lib/gstreamer-0.10"
  82.  
  83. ##
  84. ## Debugging
  85. ##
  86. #os.environ["GST_DEBUG"] = "fsrtp*:5"
  87. #os.environ["GST_DEBUG"] = "1,fs*:2,rtp*:1"
  88. #os.environ["GST_DEBUG"] = "*PAD*:5,*bin*:5"
  89.  
  90. import telepathy
  91. from telepathy.interfaces import (
  92.     CONNECTION,
  93.     CONNECTION_MANAGER,
  94.     CONNECTION_INTERFACE_CAPABILITIES,
  95.     CHANNEL_TYPE_STREAMED_MEDIA,
  96.     CONNECTION_INTERFACE_REQUESTS,
  97.     CHANNEL_INTERFACE_GROUP,
  98. )
  99. from telepathy.constants import (
  100.     CONNECTION_STATUS_CONNECTED,
  101.     HANDLE_TYPE_CONTACT,
  102.     MEDIA_STREAM_TYPE_AUDIO,
  103.     MEDIA_STREAM_TYPE_VIDEO,
  104. )
  105.  
  106. ##################################################
  107. def debug_callback(self, *args, **kwargs):
  108.     """
  109.    Debug function for unused signal    
  110.    """
  111.    
  112.     print "debug_callback"
  113.     # Print all kwarg
  114.     for arg in args:
  115.         print "\t[arg] %s" % str(arg)      
  116.     # Print all kwarg
  117.     for kwarg in kwargs:
  118.         print "\t[kwarg]%s: %s" % (kwarg, kwargs[kwarg])
  119.        
  120. def on_status_changed_user(state, reason):
  121.     """
  122.    When a user status changed
  123.    """
  124.  
  125.     if state == CONNECTION_STATUS_CONNECTED:
  126.         print "Connected"
  127.         gobject.timeout_add(2000, enable_capabilities)
  128.         if mode == "master":
  129.             print "Creating channel"
  130.             gobject.timeout_add(5000, create_telepathy_channel)
  131.         else:
  132.             print "Waiting streams..."
  133.  
  134. def enable_capabilities():
  135.     """
  136.    Enable CHANNEL_TYPE_STREAMED_MEDIA capabilities
  137.    """
  138.    
  139.     conn[CONNECTION_INTERFACE_CAPABILITIES].AdvertiseCapabilities(
  140.                             [(CHANNEL_TYPE_STREAMED_MEDIA, 15)], [])
  141.  
  142. def create_telepathy_channel():
  143.     """
  144.    Request the handle for SLAVE and create a telepathy channel between contacts
  145.    """
  146.    
  147.     # Request contact handle
  148.     contact_handle = conn[CONNECTION].RequestHandles(HANDLE_TYPE_CONTACT,
  149.                                                      [SLAVE_ACCOUNT])[0]    
  150.     # Create CHANNEL_TYPE_STREAMED_MEDIA with handle
  151.     conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({
  152.                 "org.freedesktop.Telepathy.Channel.ChannelType":
  153.                     CHANNEL_TYPE_STREAMED_MEDIA,
  154.                 "org.freedesktop.Telepathy.Channel.TargetHandleType":
  155.                     HANDLE_TYPE_CONTACT,
  156.                 "org.freedesktop.Telepathy.Channel.TargetHandle":
  157.                     contact_handle})
  158.            
  159. def on_new_channel_user(object_path, channel_type, handle_type,
  160.                          handle, suppress_handler):
  161.     """
  162.    When the channel CHANNEL_TYPE_STREAMED_MEDIA is created and ready
  163.    (received by each contacts), we create an TfListener. But only the MASTER
  164.    offer a stream on it for SLAVE. SLAVE look if some streams already exists
  165.    on the telepathy channel and in this case, we accept the stream.
  166.    """
  167.    
  168.     if channel_type == CHANNEL_TYPE_STREAMED_MEDIA:
  169.         print "New %s channel" % (channel_type)
  170.        
  171.         # Get telepathy channel
  172.         channel = telepathy.client.channel.Channel(conn.service_name,
  173.                                                    object_path,
  174.                                                    conn.bus)
  175.  
  176.         # Create a tf listener
  177.         TfListener(conn, object_path)
  178.        
  179.         if mode == "master":    # Request a stream
  180.             # Get contact handle
  181.             contact_handle = conn[CONNECTION].RequestHandles(
  182.                                                             HANDLE_TYPE_CONTACT,
  183.                                                             [SLAVE_ACCOUNT])[0]
  184.             # Request stream
  185.             channel[CHANNEL_TYPE_STREAMED_MEDIA].RequestStreams(
  186.                                                 contact_handle,
  187.                                                 [MEDIA_STREAM_TYPE_VIDEO
  188. #                                                ,MEDIA_STREAM_TYPE_AUDIO
  189.                                                 ])
  190.         else:
  191.             # Regarding if streams are already in, and accepted them
  192.             if channel[CHANNEL_TYPE_STREAMED_MEDIA].ListStreams() != []:
  193.                 # Accept the stream by adding  owner
  194.                 channel[CHANNEL_INTERFACE_GROUP].AddMembers(
  195.                                         [conn[CONNECTION].GetSelfHandle()],
  196.                                         "")
  197.  
  198. ##################################################        
  199. class TfListener(object):
  200.     """
  201.    TfListener is a component which encapsulate farsight management (session
  202.    creation, stream creation, src pad adding, etc.).
  203.    
  204.    We create a tfChannel linked with the telepathy channel path and connect it
  205.    with some farsight signals.
  206.    """
  207.    
  208.     def __init__(self, connection, chan_object_path):
  209.         """
  210.        Init
  211.        
  212.        @param connection: current connection
  213.        @type connection: Telepathy Connection
  214.        
  215.        @param chan_object_path: Object path of the StreamedMedia channel
  216.        @type chan_object_path: String
  217.        """
  218.        
  219.         super(TfListener, self).__init__()        
  220.         self.pipeline = None
  221.         self.conn = connection
  222.        
  223.         # We create a tfChannel linked with the telepathy channel path.
  224.         self.tf_channel = tpfarsight.Channel(
  225.                                      connection_busname=self.conn.service_name,
  226.                                      connection_path=self.conn.object_path,
  227.                                      channel_path=chan_object_path)
  228.         # Connect to several signals
  229.         print "connecting to channel", self.tf_channel
  230.         self.tf_channel.connect('session-created', self.__on_session_created)
  231.         self.tf_channel.connect('stream-created', self.__on_stream_created)
  232.         self.tf_channel.connect('stream-get-codec-config',
  233.                                 self.__on_stream_get_codec_config)
  234.  
  235.     def __on_session_created(self, channel, conference, participant):
  236.         """
  237.        On signal "session-created", we create the pipeline and add conference
  238.        on it. We set pipeline to PLAYING and we transfer all bus message to it.
  239.        """
  240.        
  241.         print
  242.         print "=== %s __on_session_created ===" % self
  243.         print
  244.        
  245.         self.pipeline = gst.Pipeline()        
  246.         self.pipeline.add(conference)
  247.         self.pipeline.set_state(gst.STATE_PLAYING)
  248.        
  249.         # Transfer all bus message
  250.         self.pipeline.get_bus().add_watch(self.__async_handler)
  251.  
  252.     def __on_stream_get_codec_config(self, channel, stream_id, media_type,
  253.                                      direction):
  254.         """
  255.        On signal "stream-get-codec-config", we returns our codec configuration
  256.        """
  257.        
  258.         print
  259.         print "=== %s __on_stream_get_codec_config ===" % self
  260.         print
  261.        
  262.         if media_type == farsight.MEDIA_TYPE_VIDEO:
  263.             codecs = [farsight.Codec(farsight.CODEC_ID_ANY, "H264",
  264.                                      farsight.MEDIA_TYPE_VIDEO, 0)]
  265.            
  266.             if self.conn.GetProtocol() == "sip" :
  267.                 codecs += [ farsight.Codec(farsight.CODEC_ID_DISABLE, "THEORA",
  268.                                         farsight.MEDIA_TYPE_VIDEO, 0) ]
  269.             else:
  270.                 codecs += [ farsight.Codec(farsight.CODEC_ID_ANY, "THEORA",
  271.                                         farsight.MEDIA_TYPE_VIDEO, 0) ]
  272.            
  273.             codecs += [
  274.                 farsight.Codec(farsight.CODEC_ID_ANY, "H263",
  275.                                         farsight.MEDIA_TYPE_VIDEO, 0),
  276.                 farsight.Codec(farsight.CODEC_ID_ANY, "JPEG",
  277.                                         farsight.MEDIA_TYPE_VIDEO, 0),
  278.                 farsight.Codec(farsight.CODEC_ID_ANY, "MPV",
  279.                                         farsight.MEDIA_TYPE_VIDEO, 0),
  280.                 farsight.Codec(farsight.CODEC_ID_DISABLE, "DV",
  281.                                         farsight.MEDIA_TYPE_VIDEO, 0),
  282.             ]
  283.  
  284.             return codecs
  285.         else:
  286.             return None
  287.  
  288.     def __on_stream_created(self, channel, stream):
  289.         """
  290.        On signal "stream-created">, connect signals on stream and add source.
  291.        Then we link the stream's sink-pad  to source's src-pad.
  292.        """
  293.        
  294.         print
  295.         print "=== %s __on_stream_created ===" % self
  296.         print
  297.        
  298.         stream.connect('src-pad-added', self.__on_src_pad_added)
  299.         stream.connect('closed', debug_callback, "closed")          # Not used
  300.         stream.connect('error', debug_callback, "error")            # Not used
  301.         stream.connect('free-resource', debug_callback, "free")     # Not used
  302.        
  303.         # creating src pipes
  304.         type = stream.get_property ("media-type")
  305.         if type == farsight.MEDIA_TYPE_AUDIO:
  306.             source = gst.parse_bin_from_description (
  307.                             "audiotestsrc", True)
  308.        
  309.         elif type == farsight.MEDIA_TYPE_VIDEO:
  310.             if mode == "master":
  311.                 source = gst.parse_bin_from_description (
  312. #                            "v4l2src device='/dev/video0' ! ffmpegcolorspace ! videoscale ! video/x-raw-yuv,width=320,height=240  ! ffmpegcolorspace ! warptv ! ffmpegcolorspace ", True)
  313.                                                         "uridecodebin uri=file:///home/fabien/Bureau/Video/test.avi ! identity sync=True ! ffmpegcolorspace ! videoscale ! video/x-raw-yuv,width=320,height=240  ! ffmpegcolorspace ! timeoverlay", True)
  314. #                                                       "videotestsrc ! identity sync=True ! ffmpegcolorspace ! videoscale ! video/x-raw-yuv,width=320,height=240  ! ffmpegcolorspace ! timeoverlay", True)
  315. #                            "videotestsrc", True)
  316.             else:
  317.                 source = gst.parse_bin_from_description (
  318. #                            "v4l2src device='/dev/video1'", True)
  319.                             "videotestsrc", True)                
  320.    
  321.         self.pipeline.add(source)        
  322.         source.get_pad("src").link(stream.get_property("sink-pad"))        
  323.         source.set_state(gst.STATE_PLAYING)
  324.    
  325.     def __on_src_pad_added (self, stream, pad, codec):
  326.         """
  327.        On signal "src-pad-added", we display stream view
  328.        """
  329.        
  330.         print
  331.         print "=== %s __src_pad_added ===" % self
  332.         print
  333.        
  334.         type = stream.get_property ("media-type")
  335.         if type == farsight.MEDIA_TYPE_AUDIO:
  336.             queue_sink = gst.parse_bin_from_description("queue ! audioconvert ! audioresample ! audioconvert ! autoaudiosink", True)
  337.            
  338.             audioadder = gst.element_factory_make("liveadder")
  339.             tee = gst.element_factory_make("tee")
  340.            
  341.             # Add & Link
  342.             self.pipeline.add(audioadder, tee, queue_sink)
  343.             gst.element_link_many(audioadder, tee, queue_sink)            
  344.             pad.link(audioadder.get_pad("sink%d"))            
  345.            
  346.             queue_sink.set_state(gst.STATE_PLAYING)
  347.             tee.set_state(gst.STATE_PLAYING)
  348.             audioadder.set_state(gst.STATE_PLAYING)
  349.                
  350.         elif type == farsight.MEDIA_TYPE_VIDEO:
  351.             queue_ff = gst.parse_bin_from_description("queue ! ffmpegcolorspace", True)
  352.  
  353.             if USE_CLUTTER_TO_DISPLAY:
  354.                 sink = cluttergst.VideoSink(video_texture)
  355.             else:
  356.                 sink = gst.parse_bin_from_description("videoscale ! video/x-raw-yuv,width=320,height=240 ! autovideosink", True)
  357.            
  358.             videofunnel = gst.element_factory_make("fsfunnel")
  359.             tee = gst.element_factory_make("tee")
  360.            
  361.             # Add & Link
  362.             self.pipeline.add(videofunnel, tee, queue_ff,  sink)
  363.             gst.element_link_many(videofunnel, tee, queue_ff, sink)
  364.             pad.link(videofunnel.get_pad("sink%d"))    
  365.                        
  366.             sink.set_state(gst.STATE_PLAYING)
  367.             queue_ff.set_state(gst.STATE_PLAYING)
  368.             tee.set_state(gst.STATE_PLAYING)
  369.             videofunnel.set_state(gst.STATE_PLAYING)
  370.             self.pipeline.set_state(gst.STATE_PLAYING)
  371.            
  372.     def __async_handler (self, bus, message):
  373.         """
  374.        Check all bus message and redirect them to tf_channel (obligatory)
  375.        """
  376.        
  377.         if self.tf_channel != None:
  378.             self.tf_channel.bus_message(message)
  379.         return True
  380.    
  381. ##################################################
  382. ##################################################
  383. if __name__ == '__main__':
  384.     # Manage parameters
  385.     if len(sys.argv) < 2:
  386.         print "Usage: %s {master|slave} [--enable clutter]" % sys.argv[0]
  387.         exit(1)
  388.    
  389.     if len(sys.argv) == 4 and sys.argv[2] == "--enable" and sys.argv[3] == "clutter":              
  390.         USE_CLUTTER_TO_DISPLAY = True
  391.     else:
  392.         USE_CLUTTER_TO_DISPLAY = False
  393.    
  394.    
  395.     if USE_CLUTTER_TO_DISPLAY:
  396.         import clutter
  397.         import cluttergst
  398.         clutter.threads_init()
  399.         clutter.init()
  400.    
  401.     ##
  402.     ## Mode
  403.     ##
  404.     mode = sys.argv[1]    
  405.     print "=== %s ===" % sys.argv[0]
  406.     print "Mode: %s" % mode
  407.    
  408.     # Enable parameters due to mode
  409.     if mode == "master":
  410.         parameters = {"account": MASTER_ACCOUNT, "password": MASTER_PASSWORD,
  411.                       "port": dbus.UInt32(5222), "server": MASTER_ACCOUNT.split("@")[-1]}
  412.     else:
  413.         parameters = {"account": SLAVE_ACCOUNT, "password": SLAVE_PASSWORD,
  414.                       "port": dbus.UInt32(5222), "server": SLAVE_ACCOUNT.split("@")[-1]}
  415.    
  416.     ##
  417.     ## Clutter
  418.     ##
  419.     if USE_CLUTTER_TO_DISPLAY:
  420.         stage = clutter.Stage(default=True)
  421.         stage.set_size(320, 240)
  422.         stage.set_color(clutter.color_from_string('DarkSlateGrey'))
  423.        
  424.         video_texture = clutter.Texture()
  425.         video_texture.show()
  426.         stage.add(video_texture)
  427.    
  428.     ##
  429.     ## Telepathy
  430.     ##
  431.     # Get the requested manager
  432.     reg = telepathy.client.ManagerRegistry()
  433.     reg.LoadManagers()    
  434.     manager = reg.GetManager("gabble")
  435.    
  436.     # Request connection
  437.     conn_bus_name, conn_object_path = manager[CONNECTION_MANAGER].\
  438.                                             RequestConnection("jabber", parameters)            
  439.     conn = telepathy.client.Connection(conn_bus_name, conn_object_path)
  440.    
  441.     # Listen signals
  442.     conn[CONNECTION].connect_to_signal("NewChannel", on_new_channel_user)
  443.     conn[CONNECTION].connect_to_signal("StatusChanged", on_status_changed_user)
  444.    
  445.     # Connect
  446.     conn[CONNECTION].Connect()
  447.    
  448.     ##
  449.     ## Test
  450.     ##
  451.     if USE_CLUTTER_TO_DISPLAY:
  452.         stage.show_all()
  453.         clutter.main()
  454.     else:
  455.         loop = gobject.MainLoop()
  456.         try:
  457.             loop.run()
  458.         except KeyboardInterrupt:
  459.             print 'interrupted'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement