Advertisement
Guest User

rtmpflow.c

a guest
Apr 21st, 2010
432
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 33.12 KB | None | 0 0
  1. /*  RTMP Proxy Server
  2.  *  Copyright (C) 2009 Andrej Stepanchuk
  3.  *  Copyright (C) 2009 Howard Chu
  4.  *
  5.  *  This Program is free software; you can redistribute it and/or modify
  6.  *  it under the terms of the GNU General Public License as published by
  7.  *  the Free Software Foundation; either version 2, or (at your option)
  8.  *  any later version.
  9.  *
  10.  *  This Program is distributed in the hope that it will be useful,
  11.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  12.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13.  *  GNU General Public License for more details.
  14.  *
  15.  *  You should have received a copy of the GNU General Public License
  16.  *  along with RTMPDump; see the file COPYING.  If not, write to
  17.  *  the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  18.  *  http://www.gnu.org/copyleft/gpl.html
  19.  *
  20.  */
  21.  
  22. /* This is a Proxy Server that displays the connection parameters from a
  23.  * client and then saves any data streamed to the client.
  24.  */
  25.  
  26. #include <stdlib.h>
  27. #include <string.h>
  28. #include <math.h>
  29. #include <limits.h>
  30.  
  31. #include <signal.h>
  32. #include <getopt.h>
  33.  
  34. #include <assert.h>
  35.  
  36. #include "librtmp/rtmp_sys.h"
  37. #include "librtmp/log.h"
  38.  
  39. #include "thread.h"
  40.  
  41. #ifdef linux
  42. #include <linux/netfilter_ipv4.h>
  43. #endif
  44.  
  45. #define RD_SUCCESS      0
  46. #define RD_FAILED       1
  47. #define RD_INCOMPLETE       2
  48.  
  49. #define PACKET_SIZE 1024*1024
  50.  
  51. #ifdef WIN32
  52. #define InitSockets()   {\
  53.         WORD version;           \
  54.         WSADATA wsaData;        \
  55.                     \
  56.         version = MAKEWORD(1,1);    \
  57.         WSAStartup(version, &wsaData);  }
  58.  
  59. #define CleanupSockets()    WSACleanup()
  60. #else
  61. #define InitSockets()
  62. #define CleanupSockets()
  63. #endif
  64.  
  65. enum
  66. {
  67.   STREAMING_ACCEPTING,
  68.   STREAMING_IN_PROGRESS,
  69.   STREAMING_STOPPING,
  70.   STREAMING_STOPPED
  71. };
  72.  
  73. typedef struct Flist
  74. {
  75.   struct Flist *f_next;
  76.   FILE *f_file;
  77.   AVal f_path;
  78. } Flist;
  79.  
  80. typedef struct Plist
  81. {
  82.   struct Plist *p_next;
  83.   RTMPPacket p_pkt;
  84. } Plist;
  85.  
  86. typedef struct
  87. {
  88.   int socket;
  89.   int state;
  90.   uint32_t stamp;
  91.   RTMP rs;
  92.   RTMP rc;
  93.   Plist *rs_pkt[2]; /* head, tail */
  94.   Plist *rc_pkt[2]; /* head, tail */
  95.   Flist *f_head, *f_tail;
  96.   Flist *f_cur;
  97.  
  98. } STREAMING_SERVER;
  99.  
  100. STREAMING_SERVER *rtmpServer = 0;   // server structure pointer
  101.  
  102. STREAMING_SERVER *startStreaming(const char *address, int port);
  103. void stopStreaming(STREAMING_SERVER * server);
  104.  
  105. #define STR2AVAL(av,str)    av.av_val = str; av.av_len = strlen(av.av_val)
  106.  
  107. #ifdef _DEBUG
  108. uint32_t debugTS = 0;
  109.  
  110. int pnum = 0;
  111.  
  112. FILE *netstackdump = NULL;
  113. FILE *netstackdump_read = NULL;
  114. #endif
  115.  
  116. #define BUFFERTIME  (4*60*60*1000)  /* 4 hours */
  117.  
  118. #define SAVC(x) static const AVal av_##x = AVC(#x)
  119.  
  120. SAVC(app);
  121. SAVC(connect);
  122. SAVC(flashVer);
  123. SAVC(swfUrl);
  124. SAVC(pageUrl);
  125. SAVC(tcUrl);
  126. SAVC(fpad);
  127. SAVC(capabilities);
  128. SAVC(audioCodecs);
  129. SAVC(videoCodecs);
  130. SAVC(videoFunction);
  131. SAVC(objectEncoding);
  132. SAVC(_result);
  133. SAVC(createStream);
  134. SAVC(play);
  135. SAVC(closeStream);
  136. SAVC(fmsVer);
  137. SAVC(mode);
  138. SAVC(level);
  139. SAVC(code);
  140. SAVC(secureToken);
  141. SAVC(onStatus);
  142. SAVC(close);
  143. static const AVal av_NetStream_Failed = AVC("NetStream.Failed");
  144. static const AVal av_NetStream_Play_Failed = AVC("NetStream.Play.Failed");
  145. static const AVal av_NetStream_Play_StreamNotFound =
  146. AVC("NetStream.Play.StreamNotFound");
  147. static const AVal av_NetConnection_Connect_InvalidApp =
  148. AVC("NetConnection.Connect.InvalidApp");
  149. static const AVal av_NetStream_Play_Start = AVC("NetStream.Play.Start");
  150. static const AVal av_NetStream_Play_Complete = AVC("NetStream.Play.Complete");
  151. static const AVal av_NetStream_Play_Stop = AVC("NetStream.Play.Stop");
  152.  
  153. static const char *cst[] = { "client", "server" };
  154.  
  155. // Returns 0 for OK/Failed/error, 1 for 'Stop or Complete'
  156. int
  157. ServeInvoke(STREAMING_SERVER *server, int which, RTMPPacket *pack, const char *body)
  158. {
  159.   int ret = 0, nRes;
  160.   int nBodySize = pack->m_nBodySize;
  161.  
  162.   if (body > pack->m_body)
  163.     nBodySize--;
  164.  
  165.   if (body[0] != 0x02)      // make sure it is a string method name we start with
  166.     {
  167.       RTMP_Log(RTMP_LOGWARNING, "%s, Sanity failed. no string method in invoke packet",
  168.       __FUNCTION__);
  169.       return 0;
  170.     }
  171.  
  172.   AMFObject obj;
  173.   nRes = AMF_Decode(&obj, body, nBodySize, false);
  174.   if (nRes < 0)
  175.     {
  176.       RTMP_Log(RTMP_LOGERROR, "%s, error decoding invoke packet", __FUNCTION__);
  177.       return 0;
  178.     }
  179.  
  180.   AMF_Dump(&obj);
  181.   AVal method;
  182.   AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);
  183.   RTMP_Log(RTMP_LOGDEBUG, "%s, %s invoking <%s>", __FUNCTION__, cst[which], method.av_val);
  184.  
  185.   if (AVMATCH(&method, &av_connect))
  186.     {
  187.       AMFObject cobj;
  188.       AVal pname, pval;
  189.       int i;
  190.       AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &cobj);
  191.       RTMP_LogPrintf("Processing connect\n");
  192.       for (i=0; i<cobj.o_num; i++)
  193.         {
  194.           pname = cobj.o_props[i].p_name;
  195.           pval.av_val = NULL;
  196.           pval.av_len = 0;
  197.           if (cobj.o_props[i].p_type == AMF_STRING)
  198.             {
  199.               pval = cobj.o_props[i].p_vu.p_aval;
  200.               RTMP_LogPrintf("%.*s: %.*s\n", pname.av_len, pname.av_val, pval.av_len, pval.av_val);
  201.             }
  202.           if (AVMATCH(&pname, &av_app))
  203.             {
  204.               server->rc.Link.app = pval;
  205.               pval.av_val = NULL;
  206.             }
  207.           else if (AVMATCH(&pname, &av_flashVer))
  208.             {
  209.               server->rc.Link.flashVer = pval;
  210.               pval.av_val = NULL;
  211.             }
  212.           else if (AVMATCH(&pname, &av_swfUrl))
  213.             {
  214. #ifdef CRYPTO
  215.               if (pval.av_val)
  216.             RTMP_HashSWF(pval.av_val, &server->rc.Link.SWFSize,
  217.           (unsigned char *)server->rc.Link.SWFHash, 30);
  218. #endif
  219.               server->rc.Link.swfUrl = pval;
  220.               pval.av_val = NULL;
  221.             }
  222.           else if (AVMATCH(&pname, &av_tcUrl))
  223.             {
  224.               char *r1 = NULL, *r2;
  225.               int len;
  226.  
  227.               server->rc.Link.tcUrl = pval;
  228.               if ((pval.av_val[0] | 0x40) == 'r' &&
  229.                   (pval.av_val[1] | 0x40) == 't' &&
  230.                   (pval.av_val[2] | 0x40) == 'm' &&
  231.                   (pval.av_val[3] | 0x40) == 'p')
  232.                 {
  233.                   if (pval.av_val[4] == ':')
  234.                     {
  235.                       server->rc.Link.protocol = RTMP_PROTOCOL_RTMP;
  236.                       r1 = pval.av_val+7;
  237.                     }
  238.                   else if ((pval.av_val[4] | 0x40) == 'e' && pval.av_val[5] == ':')
  239.                     {
  240.                       server->rc.Link.protocol = RTMP_PROTOCOL_RTMPE;
  241.                       r1 = pval.av_val+8;
  242.                     }
  243.                   r2 = strchr(r1, '/');
  244.                   len = r2 - r1;
  245.                   r2 = malloc(len+1);
  246.                   memcpy(r2, r1, len);
  247.                   r2[len] = '\0';
  248.                   server->rc.Link.hostname.av_val = r2;
  249.                   r1 = strrchr(r2, ':');
  250.                   if (r1)
  251.                     {
  252.               server->rc.Link.hostname.av_len = r1 - r2;
  253.                       *r1++ = '\0';
  254.                       server->rc.Link.port = atoi(r1);
  255.                     }
  256.                   else
  257.                     {
  258.               server->rc.Link.hostname.av_len = len;
  259.                       server->rc.Link.port = 1935;
  260.                     }
  261.                 }
  262.               pval.av_val = NULL;
  263.             }
  264.           else if (AVMATCH(&pname, &av_pageUrl))
  265.             {
  266.               server->rc.Link.pageUrl = pval;
  267.               pval.av_val = NULL;
  268.             }
  269.           else if (AVMATCH(&pname, &av_audioCodecs))
  270.             {
  271.               server->rc.m_fAudioCodecs = cobj.o_props[i].p_vu.p_number;
  272.             }
  273.           else if (AVMATCH(&pname, &av_videoCodecs))
  274.             {
  275.               server->rc.m_fVideoCodecs = cobj.o_props[i].p_vu.p_number;
  276.             }
  277.           else if (AVMATCH(&pname, &av_objectEncoding))
  278.             {
  279.               server->rc.m_fEncoding = cobj.o_props[i].p_vu.p_number;
  280.               server->rc.m_bSendEncoding = true;
  281.             }
  282.           /* Dup'd a string we didn't recognize? */
  283.           if (pval.av_val)
  284.             free(pval.av_val);
  285.         }
  286.       if (obj.o_num > 3)
  287.         {
  288.           server->rc.Link.authflag = AMFProp_GetBoolean(&obj.o_props[3]);
  289.           if (obj.o_num > 4)
  290.           {
  291.             AMFProp_GetString(&obj.o_props[4], &server->rc.Link.auth);
  292.           }
  293.         }
  294.  
  295.       if (!RTMP_Connect(&server->rc, pack))
  296.         {
  297.           /* failed */
  298.           return 1;
  299.         }
  300.       server->rc.m_bSendCounter = false;
  301.     }
  302.   else if (AVMATCH(&method, &av_play))
  303.     {
  304.       Flist *fl;
  305.       AVal av;
  306.       FILE *out;
  307.       char *file, *p, *q;
  308.       char flvHeader[] = { 'F', 'L', 'V', 0x01,
  309.          0x05,                       // video + audio, we finalize later if the value is different
  310.          0x00, 0x00, 0x00, 0x09,
  311.          0x00, 0x00, 0x00, 0x00      // first prevTagSize=0
  312.        };
  313.       int count = 0, flen;
  314.  
  315.       server->rc.m_stream_id = pack->m_nInfoField2;
  316.       AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &av);
  317.       server->rc.Link.playpath = av;
  318.       if (!av.av_val)
  319.         goto out;
  320.  
  321.       /* check for duplicates */
  322.       for (fl = server->f_head; fl; fl=fl->f_next)
  323.         {
  324.           if (AVMATCH(&av, &fl->f_path))
  325.             count++;
  326.         }
  327.       /* strip trailing URL parameters */
  328.       q = memchr(av.av_val, '?', av.av_len);
  329.       if (q)
  330.         av.av_len = q - av.av_val;
  331.       /* strip leading slash components */
  332.       for (p=av.av_val+av.av_len-1; p>=av.av_val; p--)
  333.         if (*p == '/')
  334.           {
  335.             p++;
  336.             av.av_len -= p - av.av_val;
  337.             av.av_val = p;
  338.             break;
  339.           }
  340.       /* skip leading dot */
  341.       if (av.av_val[0] == '.')
  342.         {
  343.           av.av_val++;
  344.           av.av_len--;
  345.         }
  346.     av.av_val=(char *)"/dev/null";
  347.     av.av_len=10;
  348.       flen = av.av_len;
  349.       /* hope there aren't more than 255 dups */
  350.       if (count)
  351.         flen += 2;
  352.       file = malloc(flen+1);
  353.  
  354.       memcpy(file, av.av_val, av.av_len);
  355.       if (count)
  356.         sprintf(file+av.av_len, "%02x", count);
  357.       else
  358.         file[av.av_len] = '\0';
  359.       for (p=file; *p; p++)
  360.         if (*p == ':')
  361.           *p = '_';
  362.       RTMP_LogPrintf("Playpath: %.*s\nSaving as: %s\n",
  363.         server->rc.Link.playpath.av_len, server->rc.Link.playpath.av_val,
  364.         file);
  365.       out = fopen(file, "wb");
  366.       free(file);
  367.       if (!out)
  368.         ret = 1;
  369.       else
  370.         {
  371.           fwrite(flvHeader, 1, sizeof(flvHeader), out);
  372.         server->rc.Link.playpath.av_val=(char *)"/dev/null";
  373.           av = server->rc.Link.playpath;
  374.           fl = malloc(sizeof(Flist)+av.av_len+1);
  375.           fl->f_file = out;
  376.           fl->f_path.av_len = av.av_len;
  377.           fl->f_path.av_val = (char *)(fl+1);
  378.           memcpy(fl->f_path.av_val, av.av_val, av.av_len);
  379.           fl->f_path.av_val[av.av_len] = '\0';
  380.           fl->f_next = NULL;
  381.           if (server->f_tail)
  382.             server->f_tail->f_next = fl;
  383.           else
  384.             server->f_head = fl;
  385.           server->f_tail = fl;
  386.         }
  387.     }
  388.   else if (AVMATCH(&method, &av_onStatus))
  389.     {
  390.       AMFObject obj2;
  391.       AVal code, level;
  392.       AMFProp_GetObject(AMF_GetProp(&obj, NULL, 3), &obj2);
  393.       AMFProp_GetString(AMF_GetProp(&obj2, &av_code, -1), &code);
  394.       AMFProp_GetString(AMF_GetProp(&obj2, &av_level, -1), &level);
  395.  
  396.       RTMP_Log(RTMP_LOGDEBUG, "%s, onStatus: %s", __FUNCTION__, code.av_val);
  397.       if (AVMATCH(&code, &av_NetStream_Failed)
  398.       || AVMATCH(&code, &av_NetStream_Play_Failed)
  399.       || AVMATCH(&code, &av_NetStream_Play_StreamNotFound)
  400.       || AVMATCH(&code, &av_NetConnection_Connect_InvalidApp))
  401.     {
  402.       ret = 1;
  403.     }
  404.  
  405.       if (AVMATCH(&code, &av_NetStream_Play_Start))
  406.     {
  407.           /* set up the next stream */
  408.           if (server->f_cur)
  409.             {
  410.               if (server->f_cur->f_next)
  411.                 server->f_cur = server->f_cur->f_next;
  412.             }
  413.           else
  414.             {
  415.               for (server->f_cur = server->f_head; server->f_cur &&
  416.                     !server->f_cur->f_file; server->f_cur = server->f_cur->f_next) ;
  417.             }
  418.       server->rc.m_bPlaying = true;
  419.     }
  420.  
  421.       // Return 1 if this is a Play.Complete or Play.Stop
  422.       if (AVMATCH(&code, &av_NetStream_Play_Complete)
  423.       || AVMATCH(&code, &av_NetStream_Play_Stop))
  424.     {
  425.       ret = 1;
  426.     }
  427.     }
  428.   else if (AVMATCH(&method, &av_closeStream))
  429.     {
  430.       ret = 1;
  431.     }
  432.   else if (AVMATCH(&method, &av_close))
  433.     {
  434.       RTMP_Close(&server->rc);
  435.       ret = 1;
  436.     }
  437. out:
  438.   AMF_Reset(&obj);
  439.   return ret;
  440. }
  441.  
  442. int
  443. ServePacket(STREAMING_SERVER *server, int which, RTMPPacket *packet)
  444. {
  445.   int ret = 0;
  446.  
  447.   RTMP_Log(RTMP_LOGDEBUG, "%s, %s sent packet type %02X, size %lu bytes", __FUNCTION__,
  448.     cst[which], packet->m_packetType, packet->m_nBodySize);
  449.  
  450.   switch (packet->m_packetType)
  451.     {
  452.     case 0x01:
  453.       // chunk size
  454. //      HandleChangeChunkSize(r, packet);
  455.       break;
  456.  
  457.     case 0x03:
  458.       // bytes read report
  459.       break;
  460.  
  461.     case 0x04:
  462.       // ctrl
  463. //      HandleCtrl(r, packet);
  464.       break;
  465.  
  466.     case 0x05:
  467.       // server bw
  468. //      HandleServerBW(r, packet);
  469.       break;
  470.  
  471.     case 0x06:
  472.       // client bw
  473.  //     HandleClientBW(r, packet);
  474.       break;
  475.  
  476.     case 0x08:
  477.       // audio data
  478.       //RTMP_Log(RTMP_LOGDEBUG, "%s, received: audio %lu bytes", __FUNCTION__, packet.m_nBodySize);
  479.       break;
  480.  
  481.     case 0x09:
  482.       // video data
  483.       //RTMP_Log(RTMP_LOGDEBUG, "%s, received: video %lu bytes", __FUNCTION__, packet.m_nBodySize);
  484.       break;
  485.  
  486.     case 0x0F:          // flex stream send
  487.       break;
  488.  
  489.     case 0x10:          // flex shared object
  490.       break;
  491.  
  492.     case 0x11:          // flex message
  493.       {
  494.     ret = ServeInvoke(server, which, packet, packet->m_body + 1);
  495.     break;
  496.       }
  497.     case 0x12:
  498.       // metadata (notify)
  499.       break;
  500.  
  501.     case 0x13:
  502.       /* shared object */
  503.       break;
  504.  
  505.     case 0x14:
  506.       // invoke
  507.       ret = ServeInvoke(server, which, packet, packet->m_body);
  508.       break;
  509.  
  510.     case 0x16:
  511.       /* flv */
  512.     break;
  513.     default:
  514.       RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,
  515.       packet->m_packetType);
  516. #ifdef _DEBUG
  517.       RTMP_LogHex(RTMP_LOGDEBUG, packet->m_body, packet->m_nBodySize);
  518. #endif
  519.     }
  520.   return ret;
  521. }
  522.  
  523. int
  524. WriteStream(char **buf, // target pointer, maybe preallocated
  525.         unsigned int *plen, // length of buffer if preallocated
  526.             uint32_t *nTimeStamp,
  527.             RTMPPacket *packet)
  528. {
  529.   uint32_t prevTagSize = 0;
  530.   int ret = -1, len = *plen;
  531.  
  532.   while (1)
  533.     {
  534.       char *packetBody = packet->m_body;
  535.       unsigned int nPacketLen = packet->m_nBodySize;
  536.  
  537.       // skip video info/command packets
  538.       if (packet->m_packetType == 0x09 &&
  539.       nPacketLen == 2 && ((*packetBody & 0xf0) == 0x50))
  540.     {
  541.       ret = 0;
  542.       break;
  543.     }
  544.  
  545.       if (packet->m_packetType == 0x09 && nPacketLen <= 5)
  546.     {
  547.       RTMP_Log(RTMP_LOGWARNING, "ignoring too small video packet: size: %d",
  548.           nPacketLen);
  549.       ret = 0;
  550.       break;
  551.     }
  552.       if (packet->m_packetType == 0x08 && nPacketLen <= 1)
  553.     {
  554.       RTMP_Log(RTMP_LOGWARNING, "ignoring too small audio packet: size: %d",
  555.           nPacketLen);
  556.       ret = 0;
  557.       break;
  558.     }
  559. #ifdef _DEBUG
  560.       RTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, TS: %d ms", packet->m_packetType,
  561.       nPacketLen, packet->m_nTimeStamp);
  562.       if (packet->m_packetType == 0x09)
  563.     RTMP_Log(RTMP_LOGDEBUG, "frametype: %02X", (*packetBody & 0xf0));
  564. #endif
  565.  
  566.       // calculate packet size and reallocate buffer if necessary
  567.       unsigned int size = nPacketLen
  568.     +
  569.     ((packet->m_packetType == 0x08 || packet->m_packetType == 0x09
  570.       || packet->m_packetType == 0x12) ? 11 : 0) + (packet->m_packetType !=
  571.                                0x16 ? 4 : 0);
  572.  
  573.       if (size + 4 > len)
  574.     {           // the extra 4 is for the case of an FLV stream without a last prevTagSize (we need extra 4 bytes to append it)
  575.       *buf = (char *) realloc(*buf, size + 4);
  576.       if (*buf == 0)
  577.         {
  578.           RTMP_Log(RTMP_LOGERROR, "Couldn't reallocate memory!");
  579.           ret = -1;     // fatal error
  580.           break;
  581.         }
  582.     }
  583.       char *ptr = *buf, *pend = ptr + size+4;
  584.  
  585.       // audio (0x08), video (0x09) or metadata (0x12) packets :
  586.       // construct 11 byte header then add rtmp packet's data
  587.       if (packet->m_packetType == 0x08 || packet->m_packetType == 0x09
  588.       || packet->m_packetType == 0x12)
  589.     {
  590.       // set data type
  591.       //*dataType |= (((packet->m_packetType == 0x08)<<2)|(packet->m_packetType == 0x09));
  592.  
  593.       (*nTimeStamp) = packet->m_nTimeStamp;
  594.       prevTagSize = 11 + nPacketLen;
  595.  
  596.       *ptr++ = packet->m_packetType;
  597.       ptr = AMF_EncodeInt24(ptr, pend, nPacketLen);
  598.       ptr = AMF_EncodeInt24(ptr, pend, *nTimeStamp);
  599.       *ptr = (char) (((*nTimeStamp) & 0xFF000000) >> 24);
  600.       ptr++;
  601.  
  602.       // stream id
  603.       ptr = AMF_EncodeInt24(ptr, pend, 0);
  604.     }
  605.  
  606.       memcpy(ptr, packetBody, nPacketLen);
  607.       unsigned int len = nPacketLen;
  608.  
  609.       // correct tagSize and obtain timestamp if we have an FLV stream
  610.       if (packet->m_packetType == 0x16)
  611.     {
  612.       unsigned int pos = 0;
  613.  
  614.       while (pos + 11 < nPacketLen)
  615.         {
  616.           uint32_t dataSize = AMF_DecodeInt24(packetBody + pos + 1);    // size without header (11) and without prevTagSize (4)
  617.           *nTimeStamp = AMF_DecodeInt24(packetBody + pos + 4);
  618.           *nTimeStamp |= (packetBody[pos + 7] << 24);
  619.  
  620.           // set data type
  621.           //*dataType |= (((*(packetBody+pos) == 0x08)<<2)|(*(packetBody+pos) == 0x09));
  622.  
  623.           if (pos + 11 + dataSize + 4 > nPacketLen)
  624.         {
  625.           if (pos + 11 + dataSize > nPacketLen)
  626.             {
  627.               RTMP_Log(RTMP_LOGERROR,
  628.               "Wrong data size (%lu), stream corrupted, aborting!",
  629.               dataSize);
  630.               ret = -2;
  631.               break;
  632.             }
  633.           RTMP_Log(RTMP_LOGWARNING, "No tagSize found, appending!");
  634.  
  635.           // we have to append a last tagSize!
  636.           prevTagSize = dataSize + 11;
  637.           AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend, prevTagSize);
  638.           size += 4;
  639.           len += 4;
  640.         }
  641.           else
  642.         {
  643.           prevTagSize =
  644.             AMF_DecodeInt32(packetBody + pos + 11 + dataSize);
  645.  
  646. #ifdef _DEBUG
  647.           RTMP_Log(RTMP_LOGDEBUG,
  648.               "FLV Packet: type %02X, dataSize: %lu, tagSize: %lu, timeStamp: %lu ms",
  649.               (unsigned char) packetBody[pos], dataSize, prevTagSize,
  650.               *nTimeStamp);
  651. #endif
  652.  
  653.           if (prevTagSize != (dataSize + 11))
  654.             {
  655. #ifdef _DEBUG
  656.               RTMP_Log(RTMP_LOGWARNING,
  657.               "Tag and data size are not consitent, writing tag size according to dataSize+11: %d",
  658.               dataSize + 11);
  659. #endif
  660.  
  661.               prevTagSize = dataSize + 11;
  662.               AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend, prevTagSize);
  663.             }
  664.         }
  665.  
  666.           pos += prevTagSize + 4;   //(11+dataSize+4);
  667.         }
  668.     }
  669.       ptr += len;
  670.  
  671.       if (packet->m_packetType != 0x16)
  672.     {           // FLV tag packets contain their own prevTagSize
  673.       AMF_EncodeInt32(ptr, pend, prevTagSize);
  674.       //ptr += 4;
  675.     }
  676.  
  677.       ret = size;
  678.       break;
  679.     }
  680.  
  681.   if (len > *plen)
  682.     *plen = len;
  683.  
  684.   return ret;           // no more media packets
  685. }
  686.  
  687. TFTYPE
  688. controlServerThread(void *unused)
  689. {
  690.   char ich;
  691.   while (1)
  692.     {
  693.       ich = getchar();
  694.       switch (ich)
  695.     {
  696.     case 'q':
  697.       RTMP_LogPrintf("Exiting\n");
  698.       stopStreaming(rtmpServer);
  699.           free(rtmpServer);
  700.       exit(0);
  701.       break;
  702.     default:
  703.       RTMP_LogPrintf("Unknown command \'%c\', ignoring\n", ich);
  704.     }
  705.     }
  706.   TFRET();
  707. }
  708.  
  709. void doServe(STREAMING_SERVER * server, // server socket and state (our listening socket)
  710.   int sockfd    // client connection socket
  711.   )
  712. {
  713.   RTMPPacket pc = { 0 }, ps = { 0 };
  714.   RTMPChunk rk = { 0 };
  715.   char *buf = NULL;
  716.   unsigned int buflen = 131072;
  717.   bool paused = false;
  718.  
  719.   // timeout for http requests
  720.   fd_set rfds;
  721.   struct timeval tv;
  722.  
  723.   server->state = STREAMING_IN_PROGRESS;
  724.  
  725.   memset(&tv, 0, sizeof(struct timeval));
  726.   tv.tv_sec = 5;
  727.  
  728.   FD_ZERO(&rfds);
  729.   FD_SET(sockfd, &rfds);
  730.  
  731.   if (select(sockfd + 1, &rfds, NULL, NULL, &tv) <= 0)
  732.     {
  733.       RTMP_Log(RTMP_LOGERROR, "Request timeout/select failed, ignoring request");
  734.       goto quit;
  735.     }
  736.   else
  737.     {
  738.       RTMP_Init(&server->rs);
  739.       RTMP_Init(&server->rc);
  740.       server->rs.m_sb.sb_socket = sockfd;
  741.       if (!RTMP_Serve(&server->rs))
  742.         {
  743.           RTMP_Log(RTMP_LOGERROR, "Handshake failed");
  744.           goto cleanup;
  745.         }
  746.     }
  747.  
  748.   buf = malloc(buflen);
  749.  
  750.   /* Just process the Connect request */
  751.   while (RTMP_IsConnected(&server->rs) && RTMP_ReadPacket(&server->rs, &ps))
  752.     {
  753.       if (!RTMPPacket_IsReady(&ps))
  754.         continue;
  755.       ServePacket(server, 0, &ps);
  756.       RTMPPacket_Free(&ps);
  757.       if (RTMP_IsConnected(&server->rc))
  758.         break;
  759.     }
  760.  
  761.   pc.m_chunk = &rk;
  762.  
  763.   /* We have our own timeout in select() */
  764.   server->rc.Link.timeout = 10;
  765.   server->rs.Link.timeout = 10;
  766.   while (RTMP_IsConnected(&server->rs) || RTMP_IsConnected(&server->rc))
  767.     {
  768.       int n;
  769.       int sr, cr;
  770.  
  771.       cr = server->rc.m_sb.sb_size;
  772.       sr = server->rs.m_sb.sb_size;
  773.  
  774.       if (cr || sr)
  775.         {
  776.         }
  777.       else
  778.         {
  779.           n = server->rs.m_sb.sb_socket;
  780.       if (server->rc.m_sb.sb_socket > n)
  781.         n = server->rc.m_sb.sb_socket;
  782.       FD_ZERO(&rfds);
  783.       if (RTMP_IsConnected(&server->rs))
  784.         FD_SET(sockfd, &rfds);
  785.       if (RTMP_IsConnected(&server->rc))
  786.         FD_SET(server->rc.m_sb.sb_socket, &rfds);
  787.  
  788.           /* give more time to start up if we're not playing yet */
  789.       tv.tv_sec = server->f_cur ? 30 : 60;
  790.       tv.tv_usec = 0;
  791.  
  792.       if (select(n + 1, &rfds, NULL, NULL, &tv) <= 0)
  793.         {
  794.               if (server->f_cur && server->rc.m_mediaChannel && !paused)
  795.                 {
  796.                   server->rc.m_pauseStamp = server->rc.m_channelTimestamp[server->rc.m_mediaChannel];
  797.                   if (RTMP_ToggleStream(&server->rc))
  798.                     {
  799.                       paused = true;
  800.                       continue;
  801.                     }
  802.                 }
  803.           RTMP_Log(RTMP_LOGERROR, "Request timeout/select failed, ignoring request");
  804.           goto cleanup;
  805.         }
  806.           if (server->rs.m_sb.sb_socket > 0 &&
  807.         FD_ISSET(server->rs.m_sb.sb_socket, &rfds))
  808.             sr = 1;
  809.           if (server->rc.m_sb.sb_socket > 0 &&
  810.         FD_ISSET(server->rc.m_sb.sb_socket, &rfds))
  811.             cr = 1;
  812.         }
  813.       if (sr)
  814.         {
  815.           while (RTMP_ReadPacket(&server->rs, &ps))
  816.             if (RTMPPacket_IsReady(&ps))
  817.               {
  818.                 /* change chunk size */
  819.                 if (ps.m_packetType == 0x01)
  820.                   {
  821.                     if (ps.m_nBodySize >= 4)
  822.                       {
  823.                         server->rs.m_inChunkSize = AMF_DecodeInt32(ps.m_body);
  824.                         RTMP_Log(RTMP_LOGDEBUG, "%s, client: chunk size change to %d", __FUNCTION__,
  825.                             server->rs.m_inChunkSize);
  826.                         server->rc.m_outChunkSize = server->rs.m_inChunkSize;
  827.                       }
  828.                   }
  829.                 /* bytes received */
  830.                 else if (ps.m_packetType == 0x03)
  831.                   {
  832.                     if (ps.m_nBodySize >= 4)
  833.                       {
  834.                         int count = AMF_DecodeInt32(ps.m_body);
  835.                         RTMP_Log(RTMP_LOGDEBUG, "%s, client: bytes received = %d", __FUNCTION__,
  836.                             count);
  837.                       }
  838.                   }
  839.                 /* ctrl */
  840.                 else if (ps.m_packetType == 0x04)
  841.                   {
  842.                     short nType = AMF_DecodeInt16(ps.m_body);
  843.                     /* UpdateBufferMS */
  844.                     if (nType == 0x03)
  845.                       {
  846.                         char *ptr = ps.m_body+2;
  847.                         int id;
  848.                         int len;
  849.                         id = AMF_DecodeInt32(ptr);
  850.                         /* Assume the interesting media is on a non-zero stream */
  851.                         if (id)
  852.                           {
  853.                             len = AMF_DecodeInt32(ptr+4);
  854. #if 1
  855.                             /* request a big buffer */
  856.                             if (len < BUFFERTIME)
  857.                               {
  858.                                 AMF_EncodeInt32(ptr+4, ptr+8, BUFFERTIME);
  859.                               }
  860. #endif
  861.                             RTMP_Log(RTMP_LOGDEBUG, "%s, client: BufferTime change in stream %d to %d", __FUNCTION__,
  862.                                 id, len);
  863.                           }
  864.                       }
  865.                   }
  866.                 else if (ps.m_packetType == 0x11 || ps.m_packetType == 0x14)
  867.                   if (ServePacket(server, 0, &ps) && server->f_cur)
  868.                     {
  869.                       fclose(server->f_cur->f_file);
  870.                       server->f_cur->f_file = NULL;
  871.                       server->f_cur = NULL;
  872.                     }
  873.                 RTMP_SendPacket(&server->rc, &ps, false);
  874.                 RTMPPacket_Free(&ps);
  875.                 break;
  876.               }
  877.         }
  878.       if (cr)
  879.         {
  880.           while (RTMP_ReadPacket(&server->rc, &pc))
  881.             {
  882.               int sendit = 1;
  883.               if (RTMPPacket_IsReady(&pc))
  884.                 {
  885.                   if (paused)
  886.                     {
  887.                       if (pc.m_nTimeStamp <= server->rc.m_mediaStamp)
  888.                         continue;
  889.                       paused = 0;
  890.                       server->rc.m_pausing = 0;
  891.                     }
  892.                   /* change chunk size */
  893.                   if (pc.m_packetType == 0x01)
  894.                     {
  895.                       if (pc.m_nBodySize >= 4)
  896.                         {
  897.                           server->rc.m_inChunkSize = AMF_DecodeInt32(pc.m_body);
  898.                           RTMP_Log(RTMP_LOGDEBUG, "%s, server: chunk size change to %d", __FUNCTION__,
  899.                               server->rc.m_inChunkSize);
  900.                           server->rs.m_outChunkSize = server->rc.m_inChunkSize;
  901.                         }
  902.                     }
  903.                   else if (pc.m_packetType == 0x04)
  904.                     {
  905.                       short nType = AMF_DecodeInt16(pc.m_body);
  906.                       /* SWFverification */
  907.                       if (nType == 0x1a)
  908. #ifdef CRYPTO
  909.                         if (server->rc.Link.SWFSize)
  910.                         {
  911.                           RTMP_SendCtrl(&server->rc, 0x1b, 0, 0);
  912.                           sendit = 0;
  913.                         }
  914. #else
  915.                         /* The session will certainly fail right after this */
  916.                         RTMP_Log(RTMP_LOGERROR, "%s, server requested SWF verification, need CRYPTO support! ", __FUNCTION__);
  917. #endif
  918.                     }
  919.                   else if (server->f_cur && (
  920.                        pc.m_packetType == 0x08 ||
  921.                        pc.m_packetType == 0x09 ||
  922.                        pc.m_packetType == 0x12 ||
  923.                        pc.m_packetType == 0x16) &&
  924.                        RTMP_ClientPacket(&server->rc, &pc))
  925.                     {
  926.                       int len = WriteStream(&buf, &buflen, &server->stamp, &pc);
  927.                       if (len > 0 && fwrite(buf, 1, len, server->f_cur->f_file) != len)
  928.                         goto cleanup;
  929.                     }
  930.                   else if ( pc.m_packetType == 0x11 || pc.m_packetType == 0x14)
  931.                     {
  932.                       if (ServePacket(server, 1, &pc) && server->f_cur)
  933.                         {
  934.                           fclose(server->f_cur->f_file);
  935.                           server->f_cur->f_file = NULL;
  936.                           server->f_cur = NULL;
  937.                         }
  938.                     }
  939.                 }
  940.               if (sendit && RTMP_IsConnected(&server->rs))
  941.                 RTMP_SendChunk(&server->rs, &rk);
  942.               if (RTMPPacket_IsReady(&pc))
  943.                   RTMPPacket_Free(&pc);
  944.               break;
  945.             }
  946.         }
  947.       if (!RTMP_IsConnected(&server->rs) && RTMP_IsConnected(&server->rc)
  948.         && !server->f_cur)
  949.         RTMP_Close(&server->rc);
  950.     }
  951.  
  952. cleanup:
  953.   RTMP_LogPrintf("Closing connection... ");
  954.   RTMP_Close(&server->rs);
  955.   RTMP_Close(&server->rc);
  956.   while (server->f_head)
  957.     {
  958.       Flist *fl = server->f_head;
  959.       server->f_head = fl->f_next;
  960.       if (fl->f_file)
  961.         fclose(fl->f_file);
  962.       free(fl);
  963.     }
  964.   server->f_tail = NULL;
  965.   server->f_cur = NULL;
  966.   free(buf);
  967.   /* Should probably be done by RTMP_Close() ... */
  968.   server->rc.Link.hostname.av_val = NULL;
  969.   server->rc.Link.tcUrl.av_val = NULL;
  970.   server->rc.Link.swfUrl.av_val = NULL;
  971.   server->rc.Link.pageUrl.av_val = NULL;
  972.   server->rc.Link.app.av_val = NULL;
  973.   server->rc.Link.auth.av_val = NULL;
  974.   server->rc.Link.flashVer.av_val = NULL;
  975.   RTMP_LogPrintf("done!\n\n");
  976.  
  977. quit:
  978.   if (server->state == STREAMING_IN_PROGRESS)
  979.     server->state = STREAMING_ACCEPTING;
  980.  
  981.   return;
  982. }
  983.  
  984. TFTYPE
  985. serverThread(void *arg)
  986. {
  987.   STREAMING_SERVER *server = arg;
  988.   server->state = STREAMING_ACCEPTING;
  989.  
  990.   while (server->state == STREAMING_ACCEPTING)
  991.     {
  992.       struct sockaddr_in addr;
  993.       socklen_t addrlen = sizeof(struct sockaddr_in);
  994.       int sockfd =
  995.     accept(server->socket, (struct sockaddr *) &addr, &addrlen);
  996.  
  997.       if (sockfd > 0)
  998.     {
  999. #ifdef linux
  1000.           struct sockaddr_in dest;
  1001.       char destch[16];
  1002.           socklen_t destlen = sizeof(struct sockaddr_in);
  1003.       getsockopt(sockfd, SOL_IP, SO_ORIGINAL_DST, &dest, &destlen);
  1004.           strcpy(destch, inet_ntoa(dest.sin_addr));
  1005.       RTMP_Log(RTMP_LOGDEBUG, "%s: accepted connection from %s to %s\n", __FUNCTION__,
  1006.           inet_ntoa(addr.sin_addr), destch);
  1007. #else
  1008.       RTMP_Log(RTMP_LOGDEBUG, "%s: accepted connection from %s\n", __FUNCTION__,
  1009.           inet_ntoa(addr.sin_addr));
  1010. #endif
  1011.       /* Create a new thread and transfer the control to that */
  1012.       doServe(server, sockfd);
  1013.       RTMP_Log(RTMP_LOGDEBUG, "%s: processed request\n", __FUNCTION__);
  1014.     }
  1015.       else
  1016.     {
  1017.       RTMP_Log(RTMP_LOGERROR, "%s: accept failed", __FUNCTION__);
  1018.     }
  1019.     }
  1020.   server->state = STREAMING_STOPPED;
  1021.   TFRET();
  1022. }
  1023.  
  1024. STREAMING_SERVER *
  1025. startStreaming(const char *address, int port)
  1026. {
  1027.   struct sockaddr_in addr;
  1028.   int sockfd, tmp;
  1029.   STREAMING_SERVER *server;
  1030.  
  1031.   sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  1032.   if (sockfd == -1)
  1033.     {
  1034.       RTMP_Log(RTMP_LOGERROR, "%s, couldn't create socket", __FUNCTION__);
  1035.       return 0;
  1036.     }
  1037.  
  1038.   tmp = 1;
  1039.   setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
  1040.                 (char *) &tmp, sizeof(tmp) );
  1041.  
  1042.   addr.sin_family = AF_INET;
  1043.   addr.sin_addr.s_addr = inet_addr(address);    //htonl(INADDR_ANY);
  1044.   addr.sin_port = htons(port);
  1045.  
  1046.   if (bind(sockfd, (struct sockaddr *) &addr, sizeof(struct sockaddr_in)) ==
  1047.       -1)
  1048.     {
  1049.       RTMP_Log(RTMP_LOGERROR, "%s, TCP bind failed for port number: %d", __FUNCTION__,
  1050.       port);
  1051.       return 0;
  1052.     }
  1053.  
  1054.   if (listen(sockfd, 10) == -1)
  1055.     {
  1056.       RTMP_Log(RTMP_LOGERROR, "%s, listen failed", __FUNCTION__);
  1057.       closesocket(sockfd);
  1058.       return 0;
  1059.     }
  1060.  
  1061.   server = (STREAMING_SERVER *) calloc(1, sizeof(STREAMING_SERVER));
  1062.   server->socket = sockfd;
  1063.  
  1064.   ThreadCreate(serverThread, server);
  1065.  
  1066.   return server;
  1067. }
  1068.  
  1069. void
  1070. stopStreaming(STREAMING_SERVER * server)
  1071. {
  1072.   assert(server);
  1073.  
  1074.   if (server->state != STREAMING_STOPPED)
  1075.     {
  1076.       int fd = server->socket;
  1077.       server->socket = 0;
  1078.       if (server->state == STREAMING_IN_PROGRESS)
  1079.     {
  1080.       server->state = STREAMING_STOPPING;
  1081.  
  1082.       // wait for streaming threads to exit
  1083.       while (server->state != STREAMING_STOPPED)
  1084.         msleep(1);
  1085.     }
  1086.  
  1087.       if (fd && closesocket(fd))
  1088.     RTMP_Log(RTMP_LOGERROR, "%s: Failed to close listening socket, error %d",
  1089.         GetSockError());
  1090.  
  1091.       server->state = STREAMING_STOPPED;
  1092.     }
  1093. }
  1094.  
  1095.  
  1096. void
  1097. sigIntHandler(int sig)
  1098. {
  1099.   RTMP_ctrlC = true;
  1100.   RTMP_LogPrintf("Caught signal: %d, cleaning up, just a second...\n", sig);
  1101.   if (rtmpServer)
  1102.     stopStreaming(rtmpServer);
  1103.   signal(SIGINT, SIG_DFL);
  1104. }
  1105.  
  1106. int
  1107. main(int argc, char **argv)
  1108. {
  1109.   int nStatus = RD_SUCCESS;
  1110.  
  1111.   // rtmp streaming server
  1112.   char DEFAULT_RTMP_STREAMING_DEVICE[] = "0.0.0.0"; // 0.0.0.0 is any device
  1113.  
  1114.   char *rtmpStreamingDevice = DEFAULT_RTMP_STREAMING_DEVICE;    // streaming device, default 0.0.0.0
  1115.   int nRtmpStreamingPort = 1935;    // port
  1116.  
  1117.   RTMP_LogPrintf("RTMP Proxy Server %s\n", RTMPDUMP_VERSION);
  1118.   RTMP_LogPrintf("(c) 2010 Andrej Stepanchuk, Howard Chu; license: GPL\n\n");
  1119.  
  1120.   RTMP_debuglevel = RTMP_LOGINFO;
  1121.  
  1122.   if (argc > 1 && !strcmp(argv[1], "-z"))
  1123.     RTMP_debuglevel = RTMP_LOGALL;
  1124.  
  1125.   signal(SIGINT, sigIntHandler);
  1126. #ifndef WIN32
  1127.   signal(SIGPIPE, SIG_IGN);
  1128. #endif
  1129.  
  1130. #ifdef _DEBUG
  1131.   netstackdump = fopen("netstackdump", "wb");
  1132.   netstackdump_read = fopen("netstackdump_read", "wb");
  1133. #endif
  1134.  
  1135.   InitSockets();
  1136.  
  1137.   // start text UI
  1138.   ThreadCreate(controlServerThread, 0);
  1139.  
  1140.   // start http streaming
  1141.   if ((rtmpServer =
  1142.        startStreaming(rtmpStreamingDevice, nRtmpStreamingPort)) == 0)
  1143.     {
  1144.       RTMP_Log(RTMP_LOGERROR, "Failed to start RTMP server, exiting!");
  1145.       return RD_FAILED;
  1146.     }
  1147.   RTMP_LogPrintf("Streaming on rtmp://%s:%d\n", rtmpStreamingDevice,
  1148.         nRtmpStreamingPort);
  1149.  
  1150.   while (rtmpServer->state != STREAMING_STOPPED)
  1151.     {
  1152.       sleep(1);
  1153.     }
  1154.   RTMP_Log(RTMP_LOGDEBUG, "Done, exiting...");
  1155.  
  1156.   free(rtmpServer);
  1157.  
  1158.   CleanupSockets();
  1159.  
  1160. #ifdef _DEBUG
  1161.   if (netstackdump != 0)
  1162.     fclose(netstackdump);
  1163.   if (netstackdump_read != 0)
  1164.     fclose(netstackdump_read);
  1165. #endif
  1166.   return nStatus;
  1167. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement